volume_grpc_tier_download.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. )
  9. // VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
  10. func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
  11. // find existing volume
  12. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  13. if v == nil {
  14. return fmt.Errorf("volume %d not found", req.VolumeId)
  15. }
  16. // verify the collection
  17. if v.Collection != req.Collection {
  18. return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  19. }
  20. // locate the disk file
  21. storageName, storageKey := v.RemoteStorageNameKey()
  22. if storageName == "" || storageKey == "" {
  23. return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
  24. }
  25. // check whether the local .dat already exists
  26. _, ok := v.DataBackend.(*backend.DiskFile)
  27. if ok {
  28. return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
  29. }
  30. // check valid storage backend type
  31. backendStorage, found := backend.BackendStorages[storageName]
  32. if !found {
  33. var keys []string
  34. for key := range backend.BackendStorages {
  35. keys = append(keys, key)
  36. }
  37. return fmt.Errorf("remote storage %s not found from supported: %v", storageName, keys)
  38. }
  39. startTime := time.Now()
  40. fn := func(progressed int64, percentage float32) error {
  41. now := time.Now()
  42. if now.Sub(startTime) < time.Second {
  43. return nil
  44. }
  45. startTime = now
  46. return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
  47. Processed: progressed,
  48. ProcessedPercentage: percentage,
  49. })
  50. }
  51. // copy the data file
  52. _, err := backendStorage.DownloadFile(v.FileName(".dat"), storageKey, fn)
  53. if err != nil {
  54. return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName(".dat"), err)
  55. }
  56. if req.KeepRemoteDatFile {
  57. return nil
  58. }
  59. // remove remote file
  60. if err := backendStorage.DeleteFile(storageKey); err != nil {
  61. return fmt.Errorf("volume %d failed to delete remote file %s: %v", v.Id, storageKey, err)
  62. }
  63. // forget remote file
  64. v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
  65. if err := v.SaveVolumeInfo(); err != nil {
  66. return fmt.Errorf("volume %d failed to save remote file info: %v", v.Id, err)
  67. }
  68. v.DataBackend.Close()
  69. v.DataBackend = nil
  70. return nil
  71. }