local_sink.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package localsink
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
  7. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  8. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. )
  15. type LocalSink struct {
  16. Dir string
  17. filerSource *source.FilerSource
  18. isIncremental bool
  19. }
  20. func init() {
  21. sink.Sinks = append(sink.Sinks, &LocalSink{})
  22. }
  23. func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
  24. localsink.filerSource = s
  25. }
  26. func (localsink *LocalSink) GetName() string {
  27. return "local"
  28. }
  29. func (localsink *LocalSink) isMultiPartEntry(key string) bool {
  30. return strings.HasSuffix(key, ".part") && strings.Contains(key, "/"+s3_constants.MultipartUploadsFolder+"/")
  31. }
  32. func (localsink *LocalSink) initialize(dir string, isIncremental bool) error {
  33. localsink.Dir = dir
  34. localsink.isIncremental = isIncremental
  35. return nil
  36. }
  37. func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
  38. dir := configuration.GetString(prefix + "directory")
  39. isIncremental := configuration.GetBool(prefix + "is_incremental")
  40. glog.V(4).Infof("sink.local.directory: %v", dir)
  41. return localsink.initialize(dir, isIncremental)
  42. }
  43. func (localsink *LocalSink) GetSinkToDirectory() string {
  44. return localsink.Dir
  45. }
  46. func (localsink *LocalSink) IsIncremental() bool {
  47. return localsink.isIncremental
  48. }
  49. func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  50. if localsink.isMultiPartEntry(key) {
  51. return nil
  52. }
  53. glog.V(4).Infof("Delete Entry key: %s", key)
  54. if err := os.Remove(key); err != nil {
  55. glog.V(0).Infof("remove entry key %s: %s", key, err)
  56. }
  57. return nil
  58. }
  59. func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  60. if entry.IsDirectory || localsink.isMultiPartEntry(key) {
  61. return nil
  62. }
  63. glog.V(4).Infof("Create Entry key: %s", key)
  64. totalSize := filer.FileSize(entry)
  65. chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
  66. dir := filepath.Dir(key)
  67. if _, err := os.Stat(dir); os.IsNotExist(err) {
  68. glog.V(4).Infof("Create Directory key: %s", dir)
  69. if err = os.MkdirAll(dir, 0755); err != nil {
  70. return err
  71. }
  72. }
  73. if entry.IsDirectory {
  74. return os.Mkdir(key, os.FileMode(entry.Attributes.FileMode))
  75. }
  76. mode := os.FileMode(entry.Attributes.FileMode)
  77. dstFile, err := os.OpenFile(util.ToShortFileName(key), os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
  78. if err != nil {
  79. return err
  80. }
  81. defer dstFile.Close()
  82. fi, err := dstFile.Stat()
  83. if err != nil {
  84. return err
  85. }
  86. if fi.Mode() != mode {
  87. glog.V(4).Infof("Modify file mode: %o -> %o", fi.Mode(), mode)
  88. if err := dstFile.Chmod(mode); err != nil {
  89. return err
  90. }
  91. }
  92. writeFunc := func(data []byte) error {
  93. _, writeErr := dstFile.Write(data)
  94. return writeErr
  95. }
  96. if len(entry.Content) > 0 {
  97. return writeFunc(entry.Content)
  98. }
  99. if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
  100. return err
  101. }
  102. return nil
  103. }
  104. func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  105. if localsink.isMultiPartEntry(key) {
  106. return true, nil
  107. }
  108. glog.V(4).Infof("Update Entry key: %s", key)
  109. // do delete and create
  110. foundExistingEntry = util.FileExists(key)
  111. err = localsink.CreateEntry(key, newEntry, signatures)
  112. return
  113. }