assign_file_id.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. type VolumeAssignRequest struct {
  13. Count uint64
  14. Replication string
  15. Collection string
  16. Ttl string
  17. DiskType string
  18. DataCenter string
  19. Rack string
  20. DataNode string
  21. WritableVolumeCount uint32
  22. }
  23. type AssignResult struct {
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. PublicUrl string `json:"publicUrl,omitempty"`
  27. Count uint64 `json:"count,omitempty"`
  28. Error string `json:"error,omitempty"`
  29. Auth security.EncodedJwt `json:"auth,omitempty"`
  30. }
  31. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  32. var requests []*VolumeAssignRequest
  33. requests = append(requests, primaryRequest)
  34. requests = append(requests, alternativeRequests...)
  35. var lastError error
  36. ret := &AssignResult{}
  37. for i, request := range requests {
  38. if request == nil {
  39. continue
  40. }
  41. lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  42. req := &master_pb.AssignRequest{
  43. Count: request.Count,
  44. Replication: request.Replication,
  45. Collection: request.Collection,
  46. Ttl: request.Ttl,
  47. DiskType: request.DiskType,
  48. DataCenter: request.DataCenter,
  49. Rack: request.Rack,
  50. DataNode: request.DataNode,
  51. WritableVolumeCount: request.WritableVolumeCount,
  52. }
  53. resp, grpcErr := masterClient.Assign(context.Background(), req)
  54. if grpcErr != nil {
  55. return grpcErr
  56. }
  57. ret.Count = resp.Count
  58. ret.Fid = resp.Fid
  59. ret.Url = resp.Url
  60. ret.PublicUrl = resp.PublicUrl
  61. ret.Error = resp.Error
  62. ret.Auth = security.EncodedJwt(resp.Auth)
  63. if resp.Error != "" {
  64. return fmt.Errorf("assignRequest: %v", resp.Error)
  65. }
  66. return nil
  67. })
  68. if lastError != nil {
  69. continue
  70. }
  71. if ret.Count <= 0 {
  72. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  73. continue
  74. }
  75. break
  76. }
  77. return ret, lastError
  78. }
  79. func LookupJwt(master string, fileId string) security.EncodedJwt {
  80. tokenStr := ""
  81. if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil {
  82. bearer := h.Get("Authorization")
  83. if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
  84. tokenStr = bearer[7:]
  85. }
  86. }
  87. return security.EncodedJwt(tokenStr)
  88. }
  89. type StorageOption struct {
  90. Replication string
  91. DiskType string
  92. Collection string
  93. DataCenter string
  94. Rack string
  95. TtlSeconds int32
  96. Fsync bool
  97. VolumeGrowthCount uint32
  98. }
  99. func (so *StorageOption) TtlString() string {
  100. return needle.SecondsToTTL(so.TtlSeconds)
  101. }
  102. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  103. ar = &VolumeAssignRequest{
  104. Count: uint64(count),
  105. Replication: so.Replication,
  106. Collection: so.Collection,
  107. Ttl: so.TtlString(),
  108. DiskType: so.DiskType,
  109. DataCenter: so.DataCenter,
  110. Rack: so.Rack,
  111. WritableVolumeCount: so.VolumeGrowthCount,
  112. }
  113. if so.DataCenter != "" || so.Rack != "" {
  114. altRequest = &VolumeAssignRequest{
  115. Count: uint64(count),
  116. Replication: so.Replication,
  117. Collection: so.Collection,
  118. Ttl: so.TtlString(),
  119. DiskType: so.DiskType,
  120. DataCenter: "",
  121. Rack: "",
  122. WritableVolumeCount: so.VolumeGrowthCount,
  123. }
  124. }
  125. return
  126. }