filer_sink.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/wdclient"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. "github.com/chrislusf/seaweedfs/weed/filer"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  13. "github.com/chrislusf/seaweedfs/weed/replication/source"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. type FilerSink struct {
  17. filerSource *source.FilerSource
  18. grpcAddress string
  19. dir string
  20. replication string
  21. collection string
  22. ttlSec int32
  23. diskType string
  24. dataCenter string
  25. grpcDialOption grpc.DialOption
  26. address string
  27. writeChunkByFiler bool
  28. isIncremental bool
  29. }
  30. func init() {
  31. sink.Sinks = append(sink.Sinks, &FilerSink{})
  32. }
  33. func (fs *FilerSink) GetName() string {
  34. return "filer"
  35. }
  36. func (fs *FilerSink) GetSinkToDirectory() string {
  37. return fs.dir
  38. }
  39. func (fs *FilerSink) IsIncremental() bool {
  40. return fs.isIncremental
  41. }
  42. func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
  43. fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
  44. return fs.DoInitialize(
  45. "",
  46. configuration.GetString(prefix+"grpcAddress"),
  47. configuration.GetString(prefix+"directory"),
  48. configuration.GetString(prefix+"replication"),
  49. configuration.GetString(prefix+"collection"),
  50. configuration.GetInt(prefix+"ttlSec"),
  51. configuration.GetString(prefix+"disk"),
  52. security.LoadClientTLS(util.GetViper(), "grpc.client"),
  53. false)
  54. }
  55. func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
  56. fs.filerSource = s
  57. }
  58. func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
  59. replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
  60. fs.address = address
  61. if fs.address == "" {
  62. fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
  63. }
  64. fs.grpcAddress = grpcAddress
  65. fs.dir = dir
  66. fs.replication = replication
  67. fs.collection = collection
  68. fs.ttlSec = int32(ttlSec)
  69. fs.diskType = diskType
  70. fs.grpcDialOption = grpcDialOption
  71. fs.writeChunkByFiler = writeChunkByFiler
  72. return nil
  73. }
  74. func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  75. dir, name := util.FullPath(key).DirAndName()
  76. glog.V(4).Infof("delete entry: %v", key)
  77. err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
  78. if err != nil {
  79. glog.V(0).Infof("delete entry %s: %v", key, err)
  80. return fmt.Errorf("delete entry %s: %v", key, err)
  81. }
  82. return nil
  83. }
  84. func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  85. return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  86. dir, name := util.FullPath(key).DirAndName()
  87. // look up existing entry
  88. lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
  89. Directory: dir,
  90. Name: name,
  91. }
  92. glog.V(1).Infof("lookup: %v", lookupRequest)
  93. if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
  94. if filer.ETag(resp.Entry) == filer.ETag(entry) {
  95. glog.V(3).Infof("already replicated %s", key)
  96. return nil
  97. }
  98. }
  99. replicatedChunks, err := fs.replicateChunks(entry.Chunks, key)
  100. if err != nil {
  101. // only warning here since the source chunk may have been deleted already
  102. glog.Warningf("replicate entry chunks %s: %v", key, err)
  103. }
  104. glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
  105. request := &filer_pb.CreateEntryRequest{
  106. Directory: dir,
  107. Entry: &filer_pb.Entry{
  108. Name: name,
  109. IsDirectory: entry.IsDirectory,
  110. Attributes: entry.Attributes,
  111. Chunks: replicatedChunks,
  112. Content: entry.Content,
  113. },
  114. IsFromOtherCluster: true,
  115. Signatures: signatures,
  116. }
  117. glog.V(3).Infof("create: %v", request)
  118. if err := filer_pb.CreateEntry(client, request); err != nil {
  119. glog.V(0).Infof("create entry %s: %v", key, err)
  120. return fmt.Errorf("create entry %s: %v", key, err)
  121. }
  122. return nil
  123. })
  124. }
  125. func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  126. dir, name := util.FullPath(key).DirAndName()
  127. // read existing entry
  128. var existingEntry *filer_pb.Entry
  129. err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  130. request := &filer_pb.LookupDirectoryEntryRequest{
  131. Directory: dir,
  132. Name: name,
  133. }
  134. glog.V(4).Infof("lookup entry: %v", request)
  135. resp, err := filer_pb.LookupEntry(client, request)
  136. if err != nil {
  137. glog.V(0).Infof("lookup %s: %v", key, err)
  138. return err
  139. }
  140. existingEntry = resp.Entry
  141. return nil
  142. })
  143. if err != nil {
  144. return false, fmt.Errorf("lookup %s: %v", key, err)
  145. }
  146. glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
  147. if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
  148. // skip if already changed
  149. // this usually happens when the messages are not ordered
  150. glog.V(2).Infof("late updates %s", key)
  151. } else if filer.ETag(newEntry) == filer.ETag(existingEntry) {
  152. // skip if no change
  153. // this usually happens when retrying the replication
  154. glog.V(3).Infof("already replicated %s", key)
  155. } else {
  156. // find out what changed
  157. deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
  158. if err != nil {
  159. return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err)
  160. }
  161. // delete the chunks that are deleted from the source
  162. if deleteIncludeChunks {
  163. // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
  164. existingEntry.Chunks = filer.DoMinusChunks(existingEntry.Chunks, deletedChunks)
  165. }
  166. // replicate the chunks that are new in the source
  167. replicatedChunks, err := fs.replicateChunks(newChunks, key)
  168. if err != nil {
  169. return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
  170. }
  171. existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...)
  172. }
  173. // save updated meta data
  174. return true, fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  175. request := &filer_pb.UpdateEntryRequest{
  176. Directory: newParentPath,
  177. Entry: existingEntry,
  178. IsFromOtherCluster: true,
  179. Signatures: signatures,
  180. }
  181. if _, err := client.UpdateEntry(context.Background(), request); err != nil {
  182. return fmt.Errorf("update existingEntry %s: %v", key, err)
  183. }
  184. return nil
  185. })
  186. }
  187. func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
  188. aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
  189. if aErr != nil {
  190. return nil, nil, aErr
  191. }
  192. bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks)
  193. if bErr != nil {
  194. return nil, nil, bErr
  195. }
  196. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...)
  197. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...)
  198. newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...)
  199. newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...)
  200. return
  201. }