123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- package command
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
- "github.com/chrislusf/seaweedfs/weed/replication/source"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
- "google.golang.org/grpc"
- "strings"
- "time"
- )
- type RemoteSyncOptions struct {
- filerAddress *string
- grpcDialOption grpc.DialOption
- readChunkFromFiler *bool
- debug *bool
- timeAgo *time.Duration
- dir *string
- }
- const (
- RemoteSyncKeyPrefix = "remote.sync."
- )
- var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
- func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- return fn(client)
- })
- }
- func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
- return location.Url
- }
- var (
- remoteSyncOptions RemoteSyncOptions
- )
- func init() {
- cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
- remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
- remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
- remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
- remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
- remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
- }
- var cmdFilerRemoteSynchronize = &Command{
- UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
- Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage",
- Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage
- filer.remote.sync listens on filer update events.
- If any mounted remote file is updated, it will fetch the updated content,
- and write to the remote storage.
- `,
- }
- func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
- util.LoadConfiguration("security", false)
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- remoteSyncOptions.grpcDialOption = grpcDialOption
- // read filer remote storage mount mappings
- mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
- if readErr != nil {
- fmt.Printf("read mount mapping: %v", readErr)
- return false
- }
- filerSource := &source.FilerSource{}
- filerSource.DoInitialize(
- *remoteSyncOptions.filerAddress,
- pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
- "/", // does not matter
- *remoteSyncOptions.readChunkFromFiler,
- )
- var found bool
- for dir, remoteStorageMountLocation := range mappings.Mappings {
- if *remoteSyncOptions.dir == dir {
- found = true
- storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
- if readErr != nil {
- fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
- continue
- }
- fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
- if err := util.Retry("filer.remote.sync "+dir, func() error {
- return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
- }); err != nil {
- fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
- }
- break
- }
- }
- if !found {
- fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
- return false
- }
- return true
- }
- func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
- dirHash := util.HashStringToLong(mountedDir)
- // 1. specified by timeAgo
- // 2. last offset timestamp for this directory
- // 3. directory creation time
- var lastOffsetTs time.Time
- if *option.timeAgo == 0 {
- mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
- if err != nil {
- return fmt.Errorf("lookup %s: %v", mountedDir, err)
- }
- lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
- } else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
- }
- } else {
- lastOffsetTs = time.Now().Add(-*option.timeAgo)
- }
- client, err := remote_storage.GetRemoteStorage(remoteStorage)
- if err != nil {
- return err
- }
- eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
- if message.OldEntry == nil && message.NewEntry != nil {
- if len(message.NewEntry.Chunks) == 0 {
- return nil
- }
- fmt.Printf("create: %+v\n", resp)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping creating: %+v\n", resp)
- return nil
- }
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- if message.OldEntry != nil && message.NewEntry == nil {
- fmt.Printf("delete: %+v\n", resp)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- return client.DeleteFile(dest)
- }
- if message.OldEntry != nil && message.NewEntry != nil {
- oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping updating: %+v\n", resp)
- return nil
- }
- if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
- if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
- fmt.Printf("update meta: %+v\n", resp)
- return client.UpdateFileMetadata(dest, message.NewEntry)
- }
- }
- fmt.Printf("update: %+v\n", resp)
- if err := client.DeleteFile(oldDest); err != nil {
- return err
- }
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- return nil
- }
- processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
- lastTime := time.Unix(0, lastTsNs)
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
- })
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
- "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
- }
- func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
- var dest string
- source := string(sourcePath[len(mountDir):])
- if strings.HasSuffix(remoteMountLocation.Path, "/") {
- dest = remoteMountLocation.Path + source[1:]
- } else {
- dest = remoteMountLocation.Path + source
- }
- return &filer_pb.RemoteStorageLocation{
- Name: remoteMountLocation.Name,
- Bucket: remoteMountLocation.Bucket,
- Path: dest,
- }
- }
- func isSameChunks(a, b []*filer_pb.FileChunk) bool {
- if len(a) != len(b) {
- return false
- }
- for i := 0; i < len(a); i++ {
- x, y := a[i], b[i]
- if !proto.Equal(x, y) {
- return false
- }
- }
- return true
- }
- func shouldSendToRemote(entry *filer_pb.Entry) bool {
- if entry.RemoteEntry == nil {
- return true
- }
- if entry.RemoteEntry.Size != int64(filer.FileSize(entry)) {
- return true
- }
- if entry.RemoteEntry.LastModifiedAt < entry.Attributes.Mtime {
- return true
- }
- return false
- }
- func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
- entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
- return err
- })
- }
|