filer_sink.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "github.com/chrislusf/seaweedfs/weed/security"
  7. "github.com/chrislusf/seaweedfs/weed/filer2"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. type FilerSink struct {
  15. filerSource *source.FilerSource
  16. grpcAddress string
  17. dir string
  18. replication string
  19. collection string
  20. ttlSec int32
  21. dataCenter string
  22. grpcDialOption grpc.DialOption
  23. }
  24. func init() {
  25. sink.Sinks = append(sink.Sinks, &FilerSink{})
  26. }
  27. func (fs *FilerSink) GetName() string {
  28. return "filer"
  29. }
  30. func (fs *FilerSink) GetSinkToDirectory() string {
  31. return fs.dir
  32. }
  33. func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
  34. return fs.initialize(
  35. configuration.GetString(prefix+"grpcAddress"),
  36. configuration.GetString(prefix+"directory"),
  37. configuration.GetString(prefix+"replication"),
  38. configuration.GetString(prefix+"collection"),
  39. configuration.GetInt(prefix+"ttlSec"),
  40. )
  41. }
  42. func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
  43. fs.filerSource = s
  44. }
  45. func (fs *FilerSink) initialize(grpcAddress string, dir string,
  46. replication string, collection string, ttlSec int) (err error) {
  47. fs.grpcAddress = grpcAddress
  48. fs.dir = dir
  49. fs.replication = replication
  50. fs.collection = collection
  51. fs.ttlSec = int32(ttlSec)
  52. fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  53. return nil
  54. }
  55. func (fs *FilerSink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
  56. return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  57. dir, name := filer2.FullPath(key).DirAndName()
  58. request := &filer_pb.DeleteEntryRequest{
  59. Directory: dir,
  60. Name: name,
  61. IsDeleteData: deleteIncludeChunks,
  62. }
  63. glog.V(1).Infof("delete entry: %v", request)
  64. _, err := client.DeleteEntry(ctx, request)
  65. if err != nil {
  66. glog.V(0).Infof("delete entry %s: %v", key, err)
  67. return fmt.Errorf("delete entry %s: %v", key, err)
  68. }
  69. return nil
  70. })
  71. }
  72. func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
  73. return fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  74. dir, name := filer2.FullPath(key).DirAndName()
  75. // look up existing entry
  76. lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
  77. Directory: dir,
  78. Name: name,
  79. }
  80. glog.V(1).Infof("lookup: %v", lookupRequest)
  81. if resp, err := client.LookupDirectoryEntry(ctx, lookupRequest); err == nil {
  82. if filer2.ETag(resp.Entry.Chunks) == filer2.ETag(entry.Chunks) {
  83. glog.V(0).Infof("already replicated %s", key)
  84. return nil
  85. }
  86. }
  87. replicatedChunks, err := fs.replicateChunks(ctx, entry.Chunks)
  88. if err != nil {
  89. glog.V(0).Infof("replicate entry chunks %s: %v", key, err)
  90. return fmt.Errorf("replicate entry chunks %s: %v", key, err)
  91. }
  92. glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
  93. request := &filer_pb.CreateEntryRequest{
  94. Directory: dir,
  95. Entry: &filer_pb.Entry{
  96. Name: name,
  97. IsDirectory: entry.IsDirectory,
  98. Attributes: entry.Attributes,
  99. Chunks: replicatedChunks,
  100. },
  101. }
  102. glog.V(1).Infof("create: %v", request)
  103. if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
  104. glog.V(0).Infof("create entry %s: %v", key, err)
  105. return fmt.Errorf("create entry %s: %v", key, err)
  106. }
  107. return nil
  108. })
  109. }
  110. func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
  111. dir, name := filer2.FullPath(key).DirAndName()
  112. // read existing entry
  113. var existingEntry *filer_pb.Entry
  114. err = fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  115. request := &filer_pb.LookupDirectoryEntryRequest{
  116. Directory: dir,
  117. Name: name,
  118. }
  119. glog.V(4).Infof("lookup entry: %v", request)
  120. resp, err := client.LookupDirectoryEntry(ctx, request)
  121. if err != nil {
  122. glog.V(0).Infof("lookup %s: %v", key, err)
  123. return err
  124. }
  125. existingEntry = resp.Entry
  126. return nil
  127. })
  128. if err != nil {
  129. return false, fmt.Errorf("lookup %s: %v", key, err)
  130. }
  131. glog.V(0).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
  132. if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
  133. // skip if already changed
  134. // this usually happens when the messages are not ordered
  135. glog.V(0).Infof("late updates %s", key)
  136. } else if filer2.ETag(newEntry.Chunks) == filer2.ETag(existingEntry.Chunks) {
  137. // skip if no change
  138. // this usually happens when retrying the replication
  139. glog.V(0).Infof("already replicated %s", key)
  140. } else {
  141. // find out what changed
  142. deletedChunks, newChunks := compareChunks(oldEntry, newEntry)
  143. // delete the chunks that are deleted from the source
  144. if deleteIncludeChunks {
  145. // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
  146. existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
  147. }
  148. // replicate the chunks that are new in the source
  149. replicatedChunks, err := fs.replicateChunks(ctx, newChunks)
  150. if err != nil {
  151. return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
  152. }
  153. existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...)
  154. }
  155. // save updated meta data
  156. return true, fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  157. request := &filer_pb.UpdateEntryRequest{
  158. Directory: newParentPath,
  159. Entry: existingEntry,
  160. }
  161. if _, err := client.UpdateEntry(ctx, request); err != nil {
  162. return fmt.Errorf("update existingEntry %s: %v", key, err)
  163. }
  164. return nil
  165. })
  166. }
  167. func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
  168. deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
  169. newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
  170. return
  171. }