volume_grpc_remote.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/operation"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. "sync"
  12. "time"
  13. )
  14. func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
  15. resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
  16. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  17. if v == nil {
  18. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  19. }
  20. remoteConf := req.RemoteConf
  21. client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
  22. if getClientErr != nil {
  23. return nil, fmt.Errorf("get remote client: %v", getClientErr)
  24. }
  25. remoteStorageLocation := req.RemoteLocation
  26. data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
  27. if ReadRemoteErr != nil {
  28. return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
  29. }
  30. var wg sync.WaitGroup
  31. wg.Add(1)
  32. go func() {
  33. defer wg.Done()
  34. n := new(needle.Needle)
  35. n.Id = types.NeedleId(req.NeedleId)
  36. n.Cookie = types.Cookie(req.Cookie)
  37. n.Data, n.DataSize = data, uint32(len(data))
  38. // copied from *Needle.prepareWriteBuffer()
  39. n.Size = 4 + types.Size(n.DataSize) + 1
  40. n.Checksum = needle.NewCRC(n.Data)
  41. n.LastModified = uint64(time.Now().Unix())
  42. n.SetHasLastModifiedDate()
  43. if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
  44. if err == nil {
  45. err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err)
  46. }
  47. }
  48. }()
  49. if len(req.Replicas) > 0 {
  50. fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
  51. for _, replica := range req.Replicas {
  52. wg.Add(1)
  53. go func(targetVolumeServer string) {
  54. defer wg.Done()
  55. uploadOption := &operation.UploadOption{
  56. UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
  57. Filename: "",
  58. Cipher: false,
  59. IsInputCompressed: false,
  60. MimeType: "",
  61. PairMap: nil,
  62. Jwt: security.EncodedJwt(req.Auth),
  63. }
  64. if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
  65. if err == nil {
  66. err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
  67. }
  68. }
  69. }(replica.Url)
  70. }
  71. }
  72. wg.Wait()
  73. return resp, err
  74. }