volume_grpc_remote.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  8. "github.com/seaweedfs/seaweedfs/weed/security"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "sync"
  12. "time"
  13. )
  14. func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
  15. resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
  16. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  17. if v == nil {
  18. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  19. }
  20. remoteConf := req.RemoteConf
  21. client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
  22. if getClientErr != nil {
  23. return nil, fmt.Errorf("get remote client: %v", getClientErr)
  24. }
  25. remoteStorageLocation := req.RemoteLocation
  26. data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
  27. if ReadRemoteErr != nil {
  28. return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
  29. }
  30. var wg sync.WaitGroup
  31. wg.Add(1)
  32. go func() {
  33. defer wg.Done()
  34. n := new(needle.Needle)
  35. n.Id = types.NeedleId(req.NeedleId)
  36. n.Cookie = types.Cookie(req.Cookie)
  37. n.Data, n.DataSize = data, uint32(len(data))
  38. // copied from *Needle.prepareWriteBuffer()
  39. n.Size = 4 + types.Size(n.DataSize) + 1
  40. n.Checksum = needle.NewCRC(n.Data)
  41. n.LastModified = uint64(time.Now().Unix())
  42. n.SetHasLastModifiedDate()
  43. if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
  44. if err == nil {
  45. err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, localWriteErr)
  46. }
  47. } else {
  48. resp.ETag = n.Etag()
  49. }
  50. }()
  51. if len(req.Replicas) > 0 {
  52. fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
  53. for _, replica := range req.Replicas {
  54. wg.Add(1)
  55. go func(targetVolumeServer string) {
  56. defer wg.Done()
  57. uploadOption := &operation.UploadOption{
  58. UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
  59. Filename: "",
  60. Cipher: false,
  61. IsInputCompressed: false,
  62. MimeType: "",
  63. PairMap: nil,
  64. Jwt: security.EncodedJwt(req.Auth),
  65. }
  66. uploader, uploaderErr := operation.NewUploader()
  67. if uploaderErr != nil && err == nil {
  68. err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
  69. return
  70. }
  71. if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
  72. err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
  73. }
  74. }(replica.Url)
  75. }
  76. }
  77. wg.Wait()
  78. return resp, err
  79. }