filer_source.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package source
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. type ReplicationSource interface {
  15. ReadPart(part string) io.ReadCloser
  16. }
  17. type FilerSource struct {
  18. grpcAddress string
  19. grpcDialOption grpc.DialOption
  20. Dir string
  21. }
  22. func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
  23. return fs.initialize(
  24. configuration.GetString(prefix+"grpcAddress"),
  25. configuration.GetString(prefix+"directory"),
  26. )
  27. }
  28. func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
  29. fs.grpcAddress = grpcAddress
  30. fs.Dir = dir
  31. fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  32. return nil
  33. }
  34. func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrl string, err error) {
  35. vid2Locations := make(map[string]*filer_pb.Locations)
  36. vid := volumeId(part)
  37. err = fs.withFilerClient(ctx, fs.grpcDialOption, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  38. glog.V(4).Infof("read lookup volume id locations: %v", vid)
  39. resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
  40. VolumeIds: []string{vid},
  41. })
  42. if err != nil {
  43. return err
  44. }
  45. vid2Locations = resp.LocationsMap
  46. return nil
  47. })
  48. if err != nil {
  49. glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
  50. return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
  51. }
  52. locations := vid2Locations[vid]
  53. if locations == nil || len(locations.Locations) == 0 {
  54. glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
  55. return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
  56. }
  57. fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
  58. return
  59. }
  60. func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) {
  61. fileUrl, err := fs.LookupFileId(ctx, part)
  62. if err != nil {
  63. return "", nil, nil, err
  64. }
  65. filename, header, readCloser, err = util.DownloadFile(fileUrl)
  66. return filename, header, readCloser, err
  67. }
  68. func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
  69. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  70. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  71. return fn(ctx2, client)
  72. }, fs.grpcAddress, fs.grpcDialOption)
  73. }
  74. func volumeId(fileId string) string {
  75. lastCommaIndex := strings.LastIndex(fileId, ",")
  76. if lastCommaIndex > 0 {
  77. return fileId[:lastCommaIndex]
  78. }
  79. return fileId
  80. }