12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package weed_server
- import (
- "fmt"
- "time"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- )
- // VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
- func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
- // find existing volume
- v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
- if v == nil {
- return fmt.Errorf("volume %d not found", req.VolumeId)
- }
- // verify the collection
- if v.Collection != req.Collection {
- return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
- }
- // locate the disk file
- storageName, storageKey := v.RemoteStorageNameKey()
- if storageName == "" || storageKey == "" {
- return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
- }
- // check whether the local .dat already exists
- _, ok := v.DataBackend.(*backend.DiskFile)
- if ok {
- return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
- }
- // check valid storage backend type
- backendStorage, found := backend.BackendStorages[storageName]
- if !found {
- var keys []string
- for key := range backend.BackendStorages {
- keys = append(keys, key)
- }
- return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys)
- }
- startTime := time.Now()
- fn := func(progressed int64, percentage float32) error {
- now := time.Now()
- if now.Sub(startTime) < time.Second {
- return nil
- }
- startTime = now
- return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
- Processed: progressed,
- ProcessedPercentage: percentage,
- })
- }
- // copy the data file
- _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
- if err != nil {
- return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
- }
- if req.KeepRemoteDatFile {
- return nil
- }
- // remove remote file
- if err := backendStorage.DeleteFile(storageKey); err != nil {
- return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err)
- }
- // forget remote file
- v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
- if err := v.SaveVolumeInfo(); err != nil {
- return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
- }
- v.DataBackend.Close()
- v.DataBackend = nil
- return nil
- }
|