local_sink.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package localsink
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/filer"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
  7. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  8. "github.com/chrislusf/seaweedfs/weed/replication/source"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "io/ioutil"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. )
  15. type LocalSink struct {
  16. Dir string
  17. filerSource *source.FilerSource
  18. }
  19. func init() {
  20. sink.Sinks = append(sink.Sinks, &LocalSink{})
  21. }
  22. func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
  23. localsink.filerSource = s
  24. }
  25. func (localsink *LocalSink) GetName() string {
  26. return "local"
  27. }
  28. func (localsink *LocalSink) isMultiPartEntry(key string) bool {
  29. return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
  30. }
  31. func (localsink *LocalSink) initialize(dir string) error {
  32. localsink.Dir = dir
  33. return nil
  34. }
  35. func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
  36. dir := configuration.GetString(prefix + "directory")
  37. glog.V(4).Infof("sink.local.directory: %v", dir)
  38. return localsink.initialize(dir)
  39. }
  40. func (localsink *LocalSink) GetSinkToDirectory() string {
  41. return localsink.Dir
  42. }
  43. func (localsink *LocalSink) IsIncremental() bool {
  44. return true
  45. }
  46. func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  47. if localsink.isMultiPartEntry(key) {
  48. return nil
  49. }
  50. glog.V(4).Infof("Delete Entry key: %s", key)
  51. if err := os.Remove(key); err != nil {
  52. glog.V(0).Infof("remove entry key %s: %s", key, err)
  53. }
  54. return nil
  55. }
  56. func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  57. if entry.IsDirectory || localsink.isMultiPartEntry(key) {
  58. return nil
  59. }
  60. glog.V(4).Infof("Create Entry key: %s", key)
  61. totalSize := filer.FileSize(entry)
  62. chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
  63. dir := filepath.Dir(key)
  64. if _, err := os.Stat(dir); os.IsNotExist(err) {
  65. glog.V(4).Infof("Create Direcotry key: %s", dir)
  66. if err = os.MkdirAll(dir, 0755); err != nil {
  67. return err
  68. }
  69. }
  70. writeFunc := func(data []byte) error {
  71. writeErr := ioutil.WriteFile(key, data, 0755)
  72. return writeErr
  73. }
  74. if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  80. if localsink.isMultiPartEntry(key) {
  81. return true, nil
  82. }
  83. glog.V(4).Infof("Update Entry key: %s", key)
  84. // do delete and create
  85. return false, nil
  86. }