123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package weed_server
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/protobuf/proto"
- )
- func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
- // load all mappings
- mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
- if err != nil {
- return nil, err
- }
- mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
- if err != nil {
- return nil, err
- }
- // find mapping
- var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
- var localMountedDir string
- for k, loc := range mappings.Mappings {
- if strings.HasPrefix(req.Directory, k) {
- localMountedDir, remoteStorageMountedLocation = k, loc
- }
- }
- if localMountedDir == "" {
- return nil, fmt.Errorf("%s is not mounted", req.Directory)
- }
- // find storage configuration
- storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
- if err != nil {
- return nil, err
- }
- storageConf := &remote_pb.RemoteConf{}
- if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
- return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
- }
- // find the entry
- entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
- if err == filer_pb.ErrNotFound {
- return nil, err
- }
- resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
- if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
- return resp, nil
- }
- // detect storage option
- so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
- if err != nil {
- return resp, err
- }
- assignRequest, altRequest := so.ToAssignRequests(1)
- // find a good chunk size
- chunkSize := int64(5 * 1024 * 1024)
- chunkCount := entry.Remote.RemoteSize/chunkSize + 1
- for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
- chunkSize *= 2
- chunkCount = entry.Remote.RemoteSize/chunkSize + 1
- }
- dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
- var chunks []*filer_pb.FileChunk
- var fetchAndWriteErr error
- var wg sync.WaitGroup
- limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
- for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
- localOffset := offset
- wg.Add(1)
- limitedConcurrentExecutor.Execute(func() {
- defer wg.Done()
- size := chunkSize
- if localOffset+chunkSize > entry.Remote.RemoteSize {
- size = entry.Remote.RemoteSize - localOffset
- }
- // assign one volume server
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
- if err != nil {
- fetchAndWriteErr = err
- return
- }
- if assignResult.Error != "" {
- fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
- return
- }
- fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
- if assignResult.Error != "" {
- fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
- return
- }
- var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
- for _, r := range assignResult.Replicas {
- replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
- Url: r.Url,
- PublicUrl: r.PublicUrl,
- GrpcPort: int32(r.GrpcPort),
- })
- }
- // tell filer to tell volume server to download into needles
- assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
- var etag string
- err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
- VolumeId: uint32(fileId.VolumeId),
- NeedleId: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- Offset: localOffset,
- Size: size,
- Replicas: replicas,
- Auth: string(assignResult.Auth),
- RemoteConf: storageConf,
- RemoteLocation: &remote_pb.RemoteStorageLocation{
- Name: remoteStorageMountedLocation.Name,
- Bucket: remoteStorageMountedLocation.Bucket,
- Path: string(dest),
- },
- })
- if fetchAndWriteErr != nil {
- return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
- } else {
- etag = resp.ETag
- }
- return nil
- })
- if err != nil && fetchAndWriteErr == nil {
- fetchAndWriteErr = err
- return
- }
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: localOffset,
- Size: uint64(size),
- ModifiedTsNs: time.Now().UnixNano(),
- ETag: etag,
- Fid: &filer_pb.FileId{
- VolumeId: uint32(fileId.VolumeId),
- FileKey: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- },
- })
- })
- }
- wg.Wait()
- if fetchAndWriteErr != nil {
- return nil, fetchAndWriteErr
- }
- garbage := entry.GetChunks()
- newEntry := entry.ShallowClone()
- newEntry.Chunks = chunks
- newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
- newEntry.Remote.LastLocalSyncTsNs = time.Now().UnixNano()
- // this skips meta data log events
- if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
- fs.filer.DeleteUncommittedChunks(chunks)
- return nil, err
- }
- fs.filer.DeleteChunks(entry.FullPath, garbage)
- fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
- resp.Entry = newEntry.ToProtoEntry()
- return resp, nil
- }
|