assign_file_id.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. "google.golang.org/grpc"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. )
  11. type VolumeAssignRequest struct {
  12. Count uint64
  13. Replication string
  14. Collection string
  15. Ttl string
  16. DiskType string
  17. DataCenter string
  18. Rack string
  19. DataNode string
  20. WritableVolumeCount uint32
  21. }
  22. type AssignResult struct {
  23. Fid string `json:"fid,omitempty"`
  24. Url string `json:"url,omitempty"`
  25. PublicUrl string `json:"publicUrl,omitempty"`
  26. GrpcPort int `json:"grpcPort,omitempty"`
  27. Count uint64 `json:"count,omitempty"`
  28. Error string `json:"error,omitempty"`
  29. Auth security.EncodedJwt `json:"auth,omitempty"`
  30. Replicas []Location `json:"replicas,omitempty"`
  31. }
  32. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  33. var requests []*VolumeAssignRequest
  34. requests = append(requests, primaryRequest)
  35. requests = append(requests, alternativeRequests...)
  36. var lastError error
  37. ret := &AssignResult{}
  38. for i, request := range requests {
  39. if request == nil {
  40. continue
  41. }
  42. lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  43. req := &master_pb.AssignRequest{
  44. Count: request.Count,
  45. Replication: request.Replication,
  46. Collection: request.Collection,
  47. Ttl: request.Ttl,
  48. DiskType: request.DiskType,
  49. DataCenter: request.DataCenter,
  50. Rack: request.Rack,
  51. DataNode: request.DataNode,
  52. WritableVolumeCount: request.WritableVolumeCount,
  53. }
  54. resp, grpcErr := masterClient.Assign(context.Background(), req)
  55. if grpcErr != nil {
  56. return grpcErr
  57. }
  58. if resp.Error != "" {
  59. return fmt.Errorf("assignRequest: %v", resp.Error)
  60. }
  61. ret.Count = resp.Count
  62. ret.Fid = resp.Fid
  63. ret.Url = resp.Location.Url
  64. ret.PublicUrl = resp.Location.PublicUrl
  65. ret.GrpcPort = int(resp.Location.GrpcPort)
  66. ret.Error = resp.Error
  67. ret.Auth = security.EncodedJwt(resp.Auth)
  68. for _, r := range resp.Replicas {
  69. ret.Replicas = append(ret.Replicas, Location{
  70. Url: r.Url,
  71. PublicUrl: r.PublicUrl,
  72. DataCenter: r.DataCenter,
  73. })
  74. }
  75. return nil
  76. })
  77. if lastError != nil {
  78. continue
  79. }
  80. if ret.Count <= 0 {
  81. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  82. continue
  83. }
  84. break
  85. }
  86. return ret, lastError
  87. }
  88. func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  89. WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  90. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  91. VolumeOrFileIds: []string{fileId},
  92. })
  93. if grpcErr != nil {
  94. return grpcErr
  95. }
  96. if len(resp.VolumeIdLocations) == 0 {
  97. return nil
  98. }
  99. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  100. return nil
  101. })
  102. return
  103. }
  104. type StorageOption struct {
  105. Replication string
  106. DiskType string
  107. Collection string
  108. DataCenter string
  109. Rack string
  110. DataNode string
  111. TtlSeconds int32
  112. Fsync bool
  113. VolumeGrowthCount uint32
  114. }
  115. func (so *StorageOption) TtlString() string {
  116. return needle.SecondsToTTL(so.TtlSeconds)
  117. }
  118. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  119. ar = &VolumeAssignRequest{
  120. Count: uint64(count),
  121. Replication: so.Replication,
  122. Collection: so.Collection,
  123. Ttl: so.TtlString(),
  124. DiskType: so.DiskType,
  125. DataCenter: so.DataCenter,
  126. Rack: so.Rack,
  127. DataNode: so.DataNode,
  128. WritableVolumeCount: so.VolumeGrowthCount,
  129. }
  130. if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
  131. altRequest = &VolumeAssignRequest{
  132. Count: uint64(count),
  133. Replication: so.Replication,
  134. Collection: so.Collection,
  135. Ttl: so.TtlString(),
  136. DiskType: so.DiskType,
  137. DataCenter: "",
  138. Rack: "",
  139. DataNode: "",
  140. WritableVolumeCount: so.VolumeGrowthCount,
  141. }
  142. }
  143. return
  144. }