filer_sink.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  7. "math"
  8. "google.golang.org/grpc"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  14. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type FilerSink struct {
  18. filerSource *source.FilerSource
  19. grpcAddress string
  20. dir string
  21. replication string
  22. collection string
  23. ttlSec int32
  24. diskType string
  25. dataCenter string
  26. grpcDialOption grpc.DialOption
  27. address string
  28. writeChunkByFiler bool
  29. isIncremental bool
  30. executor *util.LimitedConcurrentExecutor
  31. }
  32. func init() {
  33. sink.Sinks = append(sink.Sinks, &FilerSink{})
  34. }
  35. func (fs *FilerSink) GetName() string {
  36. return "filer"
  37. }
  38. func (fs *FilerSink) GetSinkToDirectory() string {
  39. return fs.dir
  40. }
  41. func (fs *FilerSink) IsIncremental() bool {
  42. return fs.isIncremental
  43. }
  44. func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
  45. fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
  46. fs.dataCenter = configuration.GetString(prefix + "dataCenter")
  47. return fs.DoInitialize(
  48. "",
  49. configuration.GetString(prefix+"grpcAddress"),
  50. configuration.GetString(prefix+"directory"),
  51. configuration.GetString(prefix+"replication"),
  52. configuration.GetString(prefix+"collection"),
  53. configuration.GetInt(prefix+"ttlSec"),
  54. configuration.GetString(prefix+"disk"),
  55. security.LoadClientTLS(util.GetViper(), "grpc.client"),
  56. false)
  57. }
  58. func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
  59. fs.filerSource = s
  60. }
  61. func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
  62. replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
  63. fs.address = address
  64. if fs.address == "" {
  65. fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
  66. }
  67. fs.grpcAddress = grpcAddress
  68. fs.dir = dir
  69. fs.replication = replication
  70. fs.collection = collection
  71. fs.ttlSec = int32(ttlSec)
  72. fs.diskType = diskType
  73. fs.grpcDialOption = grpcDialOption
  74. fs.writeChunkByFiler = writeChunkByFiler
  75. fs.executor = util.NewLimitedConcurrentExecutor(32)
  76. return nil
  77. }
  78. func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  79. dir, name := util.FullPath(key).DirAndName()
  80. glog.V(4).Infof("delete entry: %v", key)
  81. err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
  82. if err != nil {
  83. glog.V(0).Infof("delete entry %s: %v", key, err)
  84. return fmt.Errorf("delete entry %s: %v", key, err)
  85. }
  86. return nil
  87. }
  88. func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  89. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  90. dir, name := util.FullPath(key).DirAndName()
  91. // look up existing entry
  92. lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
  93. Directory: dir,
  94. Name: name,
  95. }
  96. glog.V(1).Infof("lookup: %v", lookupRequest)
  97. if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
  98. if filer.ETag(resp.Entry) == filer.ETag(entry) {
  99. glog.V(3).Infof("already replicated %s", key)
  100. return nil
  101. }
  102. }
  103. replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key)
  104. if err != nil {
  105. // only warning here since the source chunk may have been deleted already
  106. glog.Warningf("replicate entry chunks %s: %v", key, err)
  107. }
  108. glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
  109. request := &filer_pb.CreateEntryRequest{
  110. Directory: dir,
  111. Entry: &filer_pb.Entry{
  112. Name: name,
  113. IsDirectory: entry.IsDirectory,
  114. Attributes: entry.Attributes,
  115. Extended: entry.Extended,
  116. Chunks: replicatedChunks,
  117. Content: entry.Content,
  118. RemoteEntry: entry.RemoteEntry,
  119. },
  120. IsFromOtherCluster: true,
  121. Signatures: signatures,
  122. }
  123. glog.V(3).Infof("create: %v", request)
  124. if err := filer_pb.CreateEntry(client, request); err != nil {
  125. glog.V(0).Infof("create entry %s: %v", key, err)
  126. return fmt.Errorf("create entry %s: %v", key, err)
  127. }
  128. return nil
  129. })
  130. }
  131. func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  132. dir, name := util.FullPath(key).DirAndName()
  133. // read existing entry
  134. var existingEntry *filer_pb.Entry
  135. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  136. request := &filer_pb.LookupDirectoryEntryRequest{
  137. Directory: dir,
  138. Name: name,
  139. }
  140. glog.V(4).Infof("lookup entry: %v", request)
  141. resp, err := filer_pb.LookupEntry(client, request)
  142. if err != nil {
  143. glog.V(0).Infof("lookup %s: %v", key, err)
  144. return err
  145. }
  146. existingEntry = resp.Entry
  147. return nil
  148. })
  149. if err != nil {
  150. return false, fmt.Errorf("lookup %s: %v", key, err)
  151. }
  152. glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
  153. if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
  154. // skip if already changed
  155. // this usually happens when the messages are not ordered
  156. glog.V(2).Infof("late updates %s", key)
  157. } else {
  158. // find out what changed
  159. deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
  160. if err != nil {
  161. return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err)
  162. }
  163. // delete the chunks that are deleted from the source
  164. if deleteIncludeChunks {
  165. // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
  166. existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.GetChunks(), deletedChunks)
  167. }
  168. // replicate the chunks that are new in the source
  169. replicatedChunks, err := fs.replicateChunks(newChunks, key)
  170. if err != nil {
  171. return true, fmt.Errorf("replicate %s chunks error: %v", key, err)
  172. }
  173. existingEntry.Chunks = append(existingEntry.GetChunks(), replicatedChunks...)
  174. existingEntry.Attributes = newEntry.Attributes
  175. existingEntry.Extended = newEntry.Extended
  176. existingEntry.HardLinkId = newEntry.HardLinkId
  177. existingEntry.HardLinkCounter = newEntry.HardLinkCounter
  178. existingEntry.Content = newEntry.Content
  179. existingEntry.RemoteEntry = newEntry.RemoteEntry
  180. }
  181. // save updated meta data
  182. return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  183. request := &filer_pb.UpdateEntryRequest{
  184. Directory: newParentPath,
  185. Entry: existingEntry,
  186. IsFromOtherCluster: true,
  187. Signatures: signatures,
  188. }
  189. if _, err := client.UpdateEntry(context.Background(), request); err != nil {
  190. return fmt.Errorf("update existingEntry %s: %v", key, err)
  191. }
  192. return nil
  193. })
  194. }
  195. func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
  196. aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
  197. if aErr != nil {
  198. return nil, nil, aErr
  199. }
  200. bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
  201. if bErr != nil {
  202. return nil, nil, bErr
  203. }
  204. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...)
  205. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...)
  206. newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...)
  207. newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...)
  208. return
  209. }