volume_grpc_tier_upload.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package weed_server
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. )
  10. // VolumeTierMoveDatToRemote copy dat file to a remote tier
  11. func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
  12. // find existing volume
  13. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  14. if v == nil {
  15. return fmt.Errorf("volume %d not found", req.VolumeId)
  16. }
  17. // verify the collection
  18. if v.Collection != req.Collection {
  19. return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  20. }
  21. // locate the disk file
  22. diskFile, ok := v.DataBackend.(*backend.DiskFile)
  23. if !ok {
  24. return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
  25. }
  26. // check valid storage backend type
  27. backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
  28. if !found {
  29. var keys []string
  30. for key := range backend.BackendStorages {
  31. keys = append(keys, key)
  32. }
  33. return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
  34. }
  35. // check whether the existing backend storage is the same as requested
  36. // if same, skip
  37. backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
  38. for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
  39. if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
  40. return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
  41. }
  42. }
  43. startTime := time.Now()
  44. fn := func(progressed int64, percentage float32) error {
  45. now := time.Now()
  46. if now.Sub(startTime) < time.Second {
  47. return nil
  48. }
  49. startTime = now
  50. return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
  51. Processed: progressed,
  52. ProcessedPercentage: percentage,
  53. })
  54. }
  55. // remember the file original source
  56. attributes := make(map[string]string)
  57. attributes["volumeId"] = v.Id.String()
  58. attributes["collection"] = v.Collection
  59. attributes["ext"] = ".dat"
  60. // copy the data file
  61. key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
  62. if err != nil {
  63. return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
  64. }
  65. // save the remote file to volume tier info
  66. v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
  67. BackendType: backendType,
  68. BackendId: backendId,
  69. Key: key,
  70. Offset: 0,
  71. FileSize: uint64(size),
  72. ModifiedTime: uint64(time.Now().Unix()),
  73. Extension: ".dat",
  74. })
  75. if err := v.SaveVolumeInfo(); err != nil {
  76. return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
  77. }
  78. if err := v.LoadRemoteFile(); err != nil {
  79. return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
  80. }
  81. if !req.KeepLocalDatFile {
  82. os.Remove(v.FileName() + ".dat")
  83. }
  84. return nil
  85. }