lookup.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package operation
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "google.golang.org/grpc"
  8. "math/rand"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. )
  13. type Location struct {
  14. Url string `json:"url,omitempty"`
  15. PublicUrl string `json:"publicUrl,omitempty"`
  16. GrpcPort int `json:"grpcPort,omitempty"`
  17. }
  18. func (l *Location) ServerAddress() pb.ServerAddress {
  19. return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
  20. }
  21. type LookupResult struct {
  22. VolumeOrFileId string `json:"volumeOrFileId,omitempty"`
  23. Locations []Location `json:"locations,omitempty"`
  24. Jwt string `json:"jwt,omitempty"`
  25. Error string `json:"error,omitempty"`
  26. }
  27. func (lr *LookupResult) String() string {
  28. return fmt.Sprintf("VolumeOrFileId:%s, Locations:%v, Error:%s", lr.VolumeOrFileId, lr.Locations, lr.Error)
  29. }
  30. var (
  31. vc VidCache // caching of volume locations, re-check if after 10 minutes
  32. )
  33. func LookupFileId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (fullUrl string, jwt string, err error) {
  34. parts := strings.Split(fileId, ",")
  35. if len(parts) != 2 {
  36. return "", jwt, errors.New("Invalid fileId " + fileId)
  37. }
  38. lookup, lookupError := LookupVolumeId(masterFn, grpcDialOption, parts[0])
  39. if lookupError != nil {
  40. return "", jwt, lookupError
  41. }
  42. if len(lookup.Locations) == 0 {
  43. return "", jwt, errors.New("File Not Found")
  44. }
  45. return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, lookup.Jwt, nil
  46. }
  47. func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error) {
  48. results, err := LookupVolumeIds(masterFn, grpcDialOption, []string{vid})
  49. return results[vid], err
  50. }
  51. // LookupVolumeIds find volume locations by cache and actual lookup
  52. func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error) {
  53. ret := make(map[string]*LookupResult)
  54. var unknown_vids []string
  55. //check vid cache first
  56. for _, vid := range vids {
  57. locations, cacheErr := vc.Get(vid)
  58. if cacheErr == nil {
  59. ret[vid] = &LookupResult{VolumeOrFileId: vid, Locations: locations}
  60. } else {
  61. unknown_vids = append(unknown_vids, vid)
  62. }
  63. }
  64. //return success if all volume ids are known
  65. if len(unknown_vids) == 0 {
  66. return ret, nil
  67. }
  68. //only query unknown_vids
  69. err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  70. req := &master_pb.LookupVolumeRequest{
  71. VolumeOrFileIds: unknown_vids,
  72. }
  73. resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
  74. if grpcErr != nil {
  75. return grpcErr
  76. }
  77. //set newly checked vids to cache
  78. for _, vidLocations := range resp.VolumeIdLocations {
  79. var locations []Location
  80. for _, loc := range vidLocations.Locations {
  81. locations = append(locations, Location{
  82. Url: loc.Url,
  83. PublicUrl: loc.PublicUrl,
  84. GrpcPort: int(loc.GrpcPort),
  85. })
  86. }
  87. if vidLocations.Error != "" {
  88. vc.Set(vidLocations.VolumeOrFileId, locations, 10*time.Minute)
  89. }
  90. ret[vidLocations.VolumeOrFileId] = &LookupResult{
  91. VolumeOrFileId: vidLocations.VolumeOrFileId,
  92. Locations: locations,
  93. Jwt: vidLocations.Auth,
  94. Error: vidLocations.Error,
  95. }
  96. }
  97. return nil
  98. })
  99. if err != nil {
  100. return nil, err
  101. }
  102. return ret, nil
  103. }