123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package command
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "os"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
- "github.com/seaweedfs/seaweedfs/weed/remote_storage"
- "github.com/seaweedfs/seaweedfs/weed/replication/source"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "google.golang.org/protobuf/proto"
- )
- func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
- // read filer remote storage mount mappings
- _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
- if detectErr != nil {
- return fmt.Errorf("read mount info: %v", detectErr)
- }
- eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
- if err != nil {
- return err
- }
- processor := NewMetadataProcessor(eachEntryFunc, 128)
- var lastLogTsNs = time.Now().UnixNano()
- processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
- if resp.EventNotification.NewEntry != nil {
- if *option.storageClass == "" {
- if _, ok := resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass]; ok {
- delete(resp.EventNotification.NewEntry.Extended, s3_constants.AmzStorageClass)
- }
- } else {
- resp.EventNotification.NewEntry.Extended[s3_constants.AmzStorageClass] = []byte(*option.storageClass)
- }
- }
- processor.AddSyncJob(resp)
- return nil
- }, 3*time.Second, func(counter int64, lastTsNs int64) error {
- if processor.processedTsWatermark == 0 {
- return nil
- }
- // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
- now := time.Now().UnixNano()
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
- lastLogTsNs = now
- return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, processor.processedTsWatermark)
- })
- lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
- option.clientEpoch++
- metadataFollowOption := &pb.MetadataFollowOption{
- ClientName: "filer.remote.sync",
- ClientId: option.clientId,
- ClientEpoch: option.clientEpoch,
- SelfSignature: 0,
- PathPrefix: mountedDir,
- AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
- DirectoriesToWatch: nil,
- StartTsNs: lastOffsetTs.UnixNano(),
- StopTsNs: 0,
- EventErrorType: pb.TrivialOnError,
- }
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
- }
- func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
- client, err := remote_storage.GetRemoteStorage(remoteStorage)
- if err != nil {
- return nil, err
- }
- handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if message.NewEntry == nil {
- return nil
- }
- if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
- mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
- if readErr != nil {
- return fmt.Errorf("unmarshal mappings: %v", readErr)
- }
- if remoteLoc, found := mappings.Mappings[mountedDir]; found {
- if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
- glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
- }
- } else {
- glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
- os.Exit(0)
- }
- }
- if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
- conf := &remote_pb.RemoteConf{}
- if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
- return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
- }
- remoteStorage = conf
- if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
- client = newClient
- } else {
- return err
- }
- }
- return nil
- }
- eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
- return handleEtcRemoteChanges(resp)
- }
- if filer_pb.IsEmpty(resp) {
- return nil
- }
- if filer_pb.IsCreate(resp) {
- if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
- return nil
- }
- if !filer.HasData(message.NewEntry) {
- return nil
- }
- glog.V(2).Infof("create: %+v", resp)
- if !shouldSendToRemote(message.NewEntry) {
- glog.V(2).Infof("skipping creating: %+v", resp)
- return nil
- }
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if message.NewEntry.IsDirectory {
- glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
- return client.WriteDirectory(dest, message.NewEntry)
- }
- glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
- remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- if filer_pb.IsDelete(resp) {
- glog.V(2).Infof("delete: %+v", resp)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- if message.OldEntry.IsDirectory {
- glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
- return client.RemoveDirectory(dest)
- }
- glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
- return client.DeleteFile(dest)
- }
- if message.OldEntry != nil && message.NewEntry != nil {
- oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if !shouldSendToRemote(message.NewEntry) {
- glog.V(2).Infof("skipping updating: %+v", resp)
- return nil
- }
- if message.NewEntry.IsDirectory {
- return client.WriteDirectory(dest, message.NewEntry)
- }
- if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
- if filer.IsSameData(message.OldEntry, message.NewEntry) {
- glog.V(2).Infof("update meta: %+v", resp)
- return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
- }
- }
- glog.V(2).Infof("update: %+v", resp)
- glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
- if err := client.DeleteFile(oldDest); err != nil {
- if isMultipartUploadFile(resp.Directory, message.OldEntry.Name) {
- return nil
- }
- }
- remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- return nil
- }
- return eachEntryFunc, nil
- }
- func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
- var writeErr error
- err = util.Retry("writeFile", func() error {
- reader := filer.NewFileReader(filerSource, newEntry)
- glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
- remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return nil
- })
- if err != nil {
- glog.Errorf("write to %s: %v", dest, err)
- }
- return
- }
- func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time {
- // 1. specified by timeAgo
- // 2. last offset timestamp for this directory
- // 3. directory creation time
- var lastOffsetTs time.Time
- if timeAgo == 0 {
- mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir))
- if err != nil {
- glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
- return time.Now()
- }
- lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir)
- if mountedDirEntry != nil {
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
- } else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
- }
- } else {
- lastOffsetTs = time.Now()
- }
- } else {
- lastOffsetTs = time.Now().Add(-timeAgo)
- }
- return lastOffsetTs
- }
- func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
- source := string(sourcePath[len(mountDir):])
- dest := util.FullPath(remoteMountLocation.Path).Child(source)
- return &remote_pb.RemoteStorageLocation{
- Name: remoteMountLocation.Name,
- Bucket: remoteMountLocation.Bucket,
- Path: string(dest),
- }
- }
- func shouldSendToRemote(entry *filer_pb.Entry) bool {
- if entry.RemoteEntry == nil {
- return true
- }
- if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
- return true
- }
- return false
- }
- func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
- remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
- entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
- return err
- })
- }
- func isMultipartUploadFile(dir string, name string) bool {
- return strings.HasPrefix(dir, "/buckets/") &&
- strings.Contains(dir, "/"+s3_constants.MultipartUploadsFolder+"/") &&
- strings.HasSuffix(name, ".part")
- }
|