assign_file_id.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package operation
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/valyala/fasthttp"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. type VolumeAssignRequest struct {
  14. Count uint64
  15. Replication string
  16. Collection string
  17. Ttl string
  18. DataCenter string
  19. Rack string
  20. DataNode string
  21. WritableVolumeCount uint32
  22. }
  23. type AssignResult struct {
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. PublicUrl string `json:"publicUrl,omitempty"`
  27. Count uint64 `json:"count,omitempty"`
  28. Error string `json:"error,omitempty"`
  29. Auth security.EncodedJwt `json:"auth,omitempty"`
  30. }
  31. func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  32. var requests []*VolumeAssignRequest
  33. requests = append(requests, primaryRequest)
  34. requests = append(requests, alternativeRequests...)
  35. var lastError error
  36. ret := &AssignResult{}
  37. for i, request := range requests {
  38. if request == nil {
  39. continue
  40. }
  41. lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
  42. req := &master_pb.AssignRequest{
  43. Count: primaryRequest.Count,
  44. Replication: primaryRequest.Replication,
  45. Collection: primaryRequest.Collection,
  46. Ttl: primaryRequest.Ttl,
  47. DataCenter: primaryRequest.DataCenter,
  48. Rack: primaryRequest.Rack,
  49. DataNode: primaryRequest.DataNode,
  50. WritableVolumeCount: primaryRequest.WritableVolumeCount,
  51. }
  52. resp, grpcErr := masterClient.Assign(context.Background(), req)
  53. if grpcErr != nil {
  54. return grpcErr
  55. }
  56. ret.Count = resp.Count
  57. ret.Fid = resp.Fid
  58. ret.Url = resp.Url
  59. ret.PublicUrl = resp.PublicUrl
  60. ret.Error = resp.Error
  61. ret.Auth = security.EncodedJwt(resp.Auth)
  62. return nil
  63. })
  64. if lastError != nil {
  65. continue
  66. }
  67. if ret.Count <= 0 {
  68. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  69. continue
  70. }
  71. }
  72. return ret, lastError
  73. }
  74. func LookupJwt(master string, fileId string) security.EncodedJwt {
  75. tokenStr := ""
  76. lookupUrl := fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)
  77. err := util.Head(lookupUrl, func(header *fasthttp.ResponseHeader) {
  78. bearer := header.Peek("Authorization")
  79. if len(bearer) > 7 && string(bytes.ToUpper(bearer[0:6])) == "BEARER" {
  80. tokenStr = string(bearer[7:])
  81. }
  82. })
  83. if err != nil {
  84. glog.V(0).Infof("failed to lookup jwt %s: %v", lookupUrl, err)
  85. }
  86. return security.EncodedJwt(tokenStr)
  87. }