fetch_write.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package filersink
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc"
  6. "strings"
  7. "sync"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) {
  15. if len(sourceChunks) == 0 {
  16. return
  17. }
  18. var wg sync.WaitGroup
  19. for _, sourceChunk := range sourceChunks {
  20. wg.Add(1)
  21. go func(chunk *filer_pb.FileChunk) {
  22. defer wg.Done()
  23. replicatedChunk, e := fs.replicateOneChunk(ctx, chunk)
  24. if e != nil {
  25. err = e
  26. }
  27. replicatedChunks = append(replicatedChunks, replicatedChunk)
  28. }(sourceChunk)
  29. }
  30. wg.Wait()
  31. return
  32. }
  33. func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) {
  34. fileId, err := fs.fetchAndWrite(ctx, sourceChunk)
  35. if err != nil {
  36. return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
  37. }
  38. return &filer_pb.FileChunk{
  39. FileId: fileId,
  40. Offset: sourceChunk.Offset,
  41. Size: sourceChunk.Size,
  42. Mtime: sourceChunk.Mtime,
  43. ETag: sourceChunk.ETag,
  44. SourceFileId: sourceChunk.GetFileIdString(),
  45. }, nil
  46. }
  47. func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
  48. filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString())
  49. if err != nil {
  50. return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
  51. }
  52. defer readCloser.Close()
  53. var host string
  54. var auth security.EncodedJwt
  55. if err := fs.withFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  56. request := &filer_pb.AssignVolumeRequest{
  57. Count: 1,
  58. Replication: fs.replication,
  59. Collection: fs.collection,
  60. TtlSec: fs.ttlSec,
  61. DataCenter: fs.dataCenter,
  62. }
  63. resp, err := client.AssignVolume(ctx, request)
  64. if err != nil {
  65. glog.V(0).Infof("assign volume failure %v: %v", request, err)
  66. return err
  67. }
  68. fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
  69. return nil
  70. }); err != nil {
  71. return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
  72. }
  73. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  74. glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
  75. uploadResult, err := operation.Upload(fileUrl, filename, readCloser,
  76. "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
  77. if err != nil {
  78. glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
  79. return "", fmt.Errorf("upload data: %v", err)
  80. }
  81. if uploadResult.Error != "" {
  82. glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
  83. return "", fmt.Errorf("upload result: %v", uploadResult.Error)
  84. }
  85. return
  86. }
  87. func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
  88. return util.WithCachedGrpcClient(ctx, func(ctx context.Context, grpcConnection *grpc.ClientConn) error {
  89. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  90. return fn(ctx, client)
  91. }, fs.grpcAddress, fs.grpcDialOption)
  92. }
  93. func volumeId(fileId string) string {
  94. lastCommaIndex := strings.LastIndex(fileId, ",")
  95. if lastCommaIndex > 0 {
  96. return fileId[:lastCommaIndex]
  97. }
  98. return fileId
  99. }