assign_file_id.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. type VolumeAssignRequest struct {
  13. Count uint64
  14. Replication string
  15. Collection string
  16. Ttl string
  17. DataCenter string
  18. Rack string
  19. DataNode string
  20. WritableVolumeCount uint32
  21. }
  22. type AssignResult struct {
  23. Fid string `json:"fid,omitempty"`
  24. Url string `json:"url,omitempty"`
  25. PublicUrl string `json:"publicUrl,omitempty"`
  26. Count uint64 `json:"count,omitempty"`
  27. Error string `json:"error,omitempty"`
  28. Auth security.EncodedJwt `json:"auth,omitempty"`
  29. }
  30. func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  31. var requests []*VolumeAssignRequest
  32. requests = append(requests, primaryRequest)
  33. requests = append(requests, alternativeRequests...)
  34. var lastError error
  35. ret := &AssignResult{}
  36. for i, request := range requests {
  37. if request == nil {
  38. continue
  39. }
  40. lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  41. req := &master_pb.AssignRequest{
  42. Count: primaryRequest.Count,
  43. Replication: primaryRequest.Replication,
  44. Collection: primaryRequest.Collection,
  45. Ttl: primaryRequest.Ttl,
  46. DataCenter: primaryRequest.DataCenter,
  47. Rack: primaryRequest.Rack,
  48. DataNode: primaryRequest.DataNode,
  49. WritableVolumeCount: primaryRequest.WritableVolumeCount,
  50. }
  51. resp, grpcErr := masterClient.Assign(context.Background(), req)
  52. if grpcErr != nil {
  53. return grpcErr
  54. }
  55. ret.Count = resp.Count
  56. ret.Fid = resp.Fid
  57. ret.Url = resp.Url
  58. ret.PublicUrl = resp.PublicUrl
  59. ret.Error = resp.Error
  60. ret.Auth = security.EncodedJwt(resp.Auth)
  61. return nil
  62. })
  63. if lastError != nil {
  64. continue
  65. }
  66. if ret.Count <= 0 {
  67. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  68. continue
  69. }
  70. }
  71. return ret, lastError
  72. }
  73. func LookupJwt(master string, fileId string) security.EncodedJwt {
  74. tokenStr := ""
  75. if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil {
  76. bearer := h.Get("Authorization")
  77. if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
  78. tokenStr = bearer[7:]
  79. }
  80. }
  81. return security.EncodedJwt(tokenStr)
  82. }
  83. type StorageOption struct {
  84. Replication string
  85. Collection string
  86. DataCenter string
  87. Rack string
  88. TtlSeconds int32
  89. Fsync bool
  90. }
  91. func (so *StorageOption) TtlString() string {
  92. return needle.SecondsToTTL(so.TtlSeconds)
  93. }
  94. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  95. ar = &VolumeAssignRequest{
  96. Count: uint64(count),
  97. Replication: so.Replication,
  98. Collection: so.Collection,
  99. Ttl: so.TtlString(),
  100. DataCenter: so.DataCenter,
  101. Rack: so.Rack,
  102. }
  103. if so.DataCenter != "" || so.Rack != "" {
  104. altRequest = &VolumeAssignRequest{
  105. Count: uint64(count),
  106. Replication: so.Replication,
  107. Collection: so.Collection,
  108. Ttl: so.TtlString(),
  109. DataCenter: "",
  110. Rack: "",
  111. }
  112. }
  113. return
  114. }