123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package filersink
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
- "math"
- "google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/replication/sink"
- "github.com/chrislusf/seaweedfs/weed/replication/source"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- type FilerSink struct {
- filerSource *source.FilerSource
- grpcAddress string
- dir string
- replication string
- collection string
- ttlSec int32
- diskType string
- dataCenter string
- grpcDialOption grpc.DialOption
- address string
- writeChunkByFiler bool
- isIncremental bool
- }
- func init() {
- sink.Sinks = append(sink.Sinks, &FilerSink{})
- }
- func (fs *FilerSink) GetName() string {
- return "filer"
- }
- func (fs *FilerSink) GetSinkToDirectory() string {
- return fs.dir
- }
- func (fs *FilerSink) IsIncremental() bool {
- return fs.isIncremental
- }
- func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
- fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
- return fs.DoInitialize(
- "",
- configuration.GetString(prefix+"grpcAddress"),
- configuration.GetString(prefix+"directory"),
- configuration.GetString(prefix+"replication"),
- configuration.GetString(prefix+"collection"),
- configuration.GetInt(prefix+"ttlSec"),
- configuration.GetString(prefix+"disk"),
- security.LoadClientTLS(util.GetViper(), "grpc.client"),
- false)
- }
- func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
- fs.filerSource = s
- }
- func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
- replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
- fs.address = address
- if fs.address == "" {
- fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
- }
- fs.grpcAddress = grpcAddress
- fs.dir = dir
- fs.replication = replication
- fs.collection = collection
- fs.ttlSec = int32(ttlSec)
- fs.diskType = diskType
- fs.grpcDialOption = grpcDialOption
- fs.writeChunkByFiler = writeChunkByFiler
- return nil
- }
- func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
- dir, name := util.FullPath(key).DirAndName()
- glog.V(4).Infof("delete entry: %v", key)
- err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
- if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
- return fmt.Errorf("delete entry %s: %v", key, err)
- }
- return nil
- }
- func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
- return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- dir, name := util.FullPath(key).DirAndName()
- // look up existing entry
- lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- }
- glog.V(1).Infof("lookup: %v", lookupRequest)
- if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
- if filer.ETag(resp.Entry) == filer.ETag(entry) {
- glog.V(3).Infof("already replicated %s", key)
- return nil
- }
- }
- replicatedChunks, err := fs.replicateChunks(entry.Chunks, key)
- if err != nil {
- // only warning here since the source chunk may have been deleted already
- glog.Warningf("replicate entry chunks %s: %v", key, err)
- }
- glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
- request := &filer_pb.CreateEntryRequest{
- Directory: dir,
- Entry: &filer_pb.Entry{
- Name: name,
- IsDirectory: entry.IsDirectory,
- Attributes: entry.Attributes,
- Chunks: replicatedChunks,
- Content: entry.Content,
- RemoteEntry: entry.RemoteEntry,
- },
- IsFromOtherCluster: true,
- Signatures: signatures,
- }
- glog.V(3).Infof("create: %v", request)
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("create entry %s: %v", key, err)
- return fmt.Errorf("create entry %s: %v", key, err)
- }
- return nil
- })
- }
- func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
- dir, name := util.FullPath(key).DirAndName()
- // read existing entry
- var existingEntry *filer_pb.Entry
- err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- }
- glog.V(4).Infof("lookup entry: %v", request)
- resp, err := filer_pb.LookupEntry(client, request)
- if err != nil {
- glog.V(0).Infof("lookup %s: %v", key, err)
- return err
- }
- existingEntry = resp.Entry
- return nil
- })
- if err != nil {
- return false, fmt.Errorf("lookup %s: %v", key, err)
- }
- glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
- if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
- // skip if already changed
- // this usually happens when the messages are not ordered
- glog.V(2).Infof("late updates %s", key)
- } else if filer.ETag(newEntry) == filer.ETag(existingEntry) {
- // skip if no change
- // this usually happens when retrying the replication
- glog.V(3).Infof("already replicated %s", key)
- } else {
- // find out what changed
- deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
- if err != nil {
- return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err)
- }
- // delete the chunks that are deleted from the source
- if deleteIncludeChunks {
- // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.Chunks, deletedChunks)
- }
- // replicate the chunks that are new in the source
- replicatedChunks, err := fs.replicateChunks(newChunks, key)
- if err != nil {
- return true, fmt.Errorf("replicte %s chunks error: %v", key, err)
- }
- existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...)
- }
- // save updated meta data
- return true, fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.UpdateEntryRequest{
- Directory: newParentPath,
- Entry: existingEntry,
- IsFromOtherCluster: true,
- Signatures: signatures,
- }
- if _, err := client.UpdateEntry(context.Background(), request); err != nil {
- return fmt.Errorf("update existingEntry %s: %v", key, err)
- }
- return nil
- })
- }
- func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks, 0, math.MaxInt64)
- if aErr != nil {
- return nil, nil, aErr
- }
- bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks, 0, math.MaxInt64)
- if bErr != nil {
- return nil, nil, bErr
- }
- deletedChunks = append(deletedChunks, filer.DoMinusChunks(aData, bData)...)
- deletedChunks = append(deletedChunks, filer.DoMinusChunks(aMeta, bMeta)...)
- newChunks = append(newChunks, filer.DoMinusChunks(bData, aData)...)
- newChunks = append(newChunks, filer.DoMinusChunks(bMeta, aMeta)...)
- return
- }
|