filer_sink.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  7. "math"
  8. "google.golang.org/grpc"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  14. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. type FilerSink struct {
  18. filerSource *source.FilerSource
  19. grpcAddress string
  20. dir string
  21. replication string
  22. collection string
  23. ttlSec int32
  24. diskType string
  25. dataCenter string
  26. grpcDialOption grpc.DialOption
  27. address string
  28. writeChunkByFiler bool
  29. isIncremental bool
  30. executor *util.LimitedConcurrentExecutor
  31. signature int32
  32. }
  33. func init() {
  34. sink.Sinks = append(sink.Sinks, &FilerSink{})
  35. }
  36. func (fs *FilerSink) GetName() string {
  37. return "filer"
  38. }
  39. func (fs *FilerSink) GetSinkToDirectory() string {
  40. return fs.dir
  41. }
  42. func (fs *FilerSink) IsIncremental() bool {
  43. return fs.isIncremental
  44. }
  45. func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
  46. fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
  47. fs.dataCenter = configuration.GetString(prefix + "dataCenter")
  48. fs.signature = util.RandomInt32()
  49. return fs.DoInitialize(
  50. "",
  51. configuration.GetString(prefix+"grpcAddress"),
  52. configuration.GetString(prefix+"directory"),
  53. configuration.GetString(prefix+"replication"),
  54. configuration.GetString(prefix+"collection"),
  55. configuration.GetInt(prefix+"ttlSec"),
  56. configuration.GetString(prefix+"disk"),
  57. security.LoadClientTLS(util.GetViper(), "grpc.client"),
  58. false)
  59. }
  60. func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
  61. fs.filerSource = s
  62. }
  63. func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
  64. replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
  65. fs.address = address
  66. if fs.address == "" {
  67. fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
  68. }
  69. fs.grpcAddress = grpcAddress
  70. fs.dir = dir
  71. fs.replication = replication
  72. fs.collection = collection
  73. fs.ttlSec = int32(ttlSec)
  74. fs.diskType = diskType
  75. fs.grpcDialOption = grpcDialOption
  76. fs.writeChunkByFiler = writeChunkByFiler
  77. fs.executor = util.NewLimitedConcurrentExecutor(32)
  78. return nil
  79. }
  80. func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  81. dir, name := util.FullPath(key).DirAndName()
  82. glog.V(4).Infof("delete entry: %v", key)
  83. err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
  84. if err != nil {
  85. glog.V(0).Infof("delete entry %s: %v", key, err)
  86. return fmt.Errorf("delete entry %s: %v", key, err)
  87. }
  88. return nil
  89. }
  90. func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  91. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  92. dir, name := util.FullPath(key).DirAndName()
  93. // look up existing entry
  94. lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
  95. Directory: dir,
  96. Name: name,
  97. }
  98. // glog.V(1).Infof("lookup: %v", lookupRequest)
  99. if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
  100. if filer.ETag(resp.Entry) == filer.ETag(entry) {
  101. glog.V(3).Infof("already replicated %s", key)
  102. return nil
  103. }
  104. if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime {
  105. glog.V(3).Infof("skip overwriting %s", key)
  106. return nil
  107. }
  108. }
  109. replicatedChunks, err := fs.replicateChunks(entry.GetChunks(), key)
  110. if err != nil {
  111. // only warning here since the source chunk may have been deleted already
  112. glog.Warningf("replicate entry chunks %s: %v", key, err)
  113. return nil
  114. }
  115. // glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
  116. request := &filer_pb.CreateEntryRequest{
  117. Directory: dir,
  118. Entry: &filer_pb.Entry{
  119. Name: name,
  120. IsDirectory: entry.IsDirectory,
  121. Attributes: entry.Attributes,
  122. Extended: entry.Extended,
  123. Chunks: replicatedChunks,
  124. Content: entry.Content,
  125. RemoteEntry: entry.RemoteEntry,
  126. },
  127. IsFromOtherCluster: true,
  128. Signatures: signatures,
  129. }
  130. glog.V(3).Infof("create: %v", request)
  131. if err := filer_pb.CreateEntry(client, request); err != nil {
  132. glog.V(0).Infof("create entry %s: %v", key, err)
  133. return fmt.Errorf("create entry %s: %v", key, err)
  134. }
  135. return nil
  136. })
  137. }
  138. func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  139. dir, name := util.FullPath(key).DirAndName()
  140. // read existing entry
  141. var existingEntry *filer_pb.Entry
  142. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  143. request := &filer_pb.LookupDirectoryEntryRequest{
  144. Directory: dir,
  145. Name: name,
  146. }
  147. glog.V(4).Infof("lookup entry: %v", request)
  148. resp, err := filer_pb.LookupEntry(client, request)
  149. if err != nil {
  150. glog.V(0).Infof("lookup %s: %v", key, err)
  151. return err
  152. }
  153. existingEntry = resp.Entry
  154. return nil
  155. })
  156. if err != nil {
  157. return false, fmt.Errorf("lookup %s: %v", key, err)
  158. }
  159. glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
  160. if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
  161. // skip if already changed
  162. // this usually happens when the messages are not ordered
  163. glog.V(2).Infof("late updates %s", key)
  164. } else {
  165. // find out what changed
  166. deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
  167. if err != nil {
  168. return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err)
  169. }
  170. // delete the chunks that are deleted from the source
  171. if deleteIncludeChunks {
  172. // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
  173. existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.GetChunks(), deletedChunks)
  174. }
  175. // replicate the chunks that are new in the source
  176. replicatedChunks, err := fs.replicateChunks(newChunks, key)
  177. if err != nil {
  178. glog.Warningf("replicate entry chunks %s: %v", key, err)
  179. return true, nil
  180. }
  181. existingEntry.Chunks = append(existingEntry.GetChunks(), replicatedChunks...)
  182. existingEntry.Attributes = newEntry.Attributes
  183. existingEntry.Extended = newEntry.Extended
  184. existingEntry.HardLinkId = newEntry.HardLinkId
  185. existingEntry.HardLinkCounter = newEntry.HardLinkCounter
  186. existingEntry.Content = newEntry.Content
  187. existingEntry.RemoteEntry = newEntry.RemoteEntry
  188. }
  189. // save updated meta data
  190. return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  191. request := &filer_pb.UpdateEntryRequest{
  192. Directory: newParentPath,
  193. Entry: existingEntry,
  194. IsFromOtherCluster: true,
  195. Signatures: signatures,
  196. }
  197. if _, err := client.UpdateEntry(context.Background(), request); err != nil {
  198. return fmt.Errorf("update existingEntry %s: %v", key, err)
  199. }
  200. return nil
  201. })
  202. }
  203. func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
  204. aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
  205. if aErr != nil {
  206. return nil, nil, aErr
  207. }
  208. bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
  209. if bErr != nil {
  210. return nil, nil, bErr
  211. }
  212. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...)
  213. deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...)
  214. newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...)
  215. newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...)
  216. return
  217. }