assign_file_id.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/security"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "google.golang.org/grpc"
  10. "sync"
  11. )
  12. type VolumeAssignRequest struct {
  13. Count uint64
  14. Replication string
  15. Collection string
  16. Ttl string
  17. DiskType string
  18. DataCenter string
  19. Rack string
  20. DataNode string
  21. WritableVolumeCount uint32
  22. }
  23. type AssignResult struct {
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. PublicUrl string `json:"publicUrl,omitempty"`
  27. GrpcPort int `json:"grpcPort,omitempty"`
  28. Count uint64 `json:"count,omitempty"`
  29. Error string `json:"error,omitempty"`
  30. Auth security.EncodedJwt `json:"auth,omitempty"`
  31. Replicas []Location `json:"replicas,omitempty"`
  32. }
  33. // This is a proxy to the master server, only for assigning volume ids.
  34. // It runs via grpc to the master server in streaming mode.
  35. // The connection to the master would only be re-established when the last connection has error.
  36. type AssignProxy struct {
  37. grpcConnection *grpc.ClientConn
  38. pool chan *singleThreadAssignProxy
  39. }
  40. func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
  41. ap = &AssignProxy{
  42. pool: make(chan *singleThreadAssignProxy, concurrency),
  43. }
  44. ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
  45. if err != nil {
  46. return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
  47. }
  48. for i := 0; i < concurrency; i++ {
  49. ap.pool <- &singleThreadAssignProxy{}
  50. }
  51. return ap, nil
  52. }
  53. func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  54. p := <-ap.pool
  55. defer func() {
  56. ap.pool <- p
  57. }()
  58. return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
  59. }
  60. type singleThreadAssignProxy struct {
  61. assignClient master_pb.Seaweed_StreamAssignClient
  62. sync.Mutex
  63. }
  64. func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  65. ap.Lock()
  66. defer ap.Unlock()
  67. if ap.assignClient == nil {
  68. client := master_pb.NewSeaweedClient(grpcConnection)
  69. ap.assignClient, err = client.StreamAssign(context.Background())
  70. if err != nil {
  71. ap.assignClient = nil
  72. return nil, fmt.Errorf("fail to create stream assign client: %v", err)
  73. }
  74. }
  75. var requests []*VolumeAssignRequest
  76. requests = append(requests, primaryRequest)
  77. requests = append(requests, alternativeRequests...)
  78. ret = &AssignResult{}
  79. for _, request := range requests {
  80. if request == nil {
  81. continue
  82. }
  83. req := &master_pb.AssignRequest{
  84. Count: request.Count,
  85. Replication: request.Replication,
  86. Collection: request.Collection,
  87. Ttl: request.Ttl,
  88. DiskType: request.DiskType,
  89. DataCenter: request.DataCenter,
  90. Rack: request.Rack,
  91. DataNode: request.DataNode,
  92. WritableVolumeCount: request.WritableVolumeCount,
  93. }
  94. if err = ap.assignClient.Send(req); err != nil {
  95. return nil, fmt.Errorf("StreamAssignSend: %v", err)
  96. }
  97. resp, grpcErr := ap.assignClient.Recv()
  98. if grpcErr != nil {
  99. return nil, grpcErr
  100. }
  101. if resp.Error != "" {
  102. return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
  103. }
  104. ret.Count = resp.Count
  105. ret.Fid = resp.Fid
  106. ret.Url = resp.Location.Url
  107. ret.PublicUrl = resp.Location.PublicUrl
  108. ret.GrpcPort = int(resp.Location.GrpcPort)
  109. ret.Error = resp.Error
  110. ret.Auth = security.EncodedJwt(resp.Auth)
  111. for _, r := range resp.Replicas {
  112. ret.Replicas = append(ret.Replicas, Location{
  113. Url: r.Url,
  114. PublicUrl: r.PublicUrl,
  115. DataCenter: r.DataCenter,
  116. })
  117. }
  118. if ret.Count <= 0 {
  119. continue
  120. }
  121. break
  122. }
  123. return
  124. }
  125. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  126. var requests []*VolumeAssignRequest
  127. requests = append(requests, primaryRequest)
  128. requests = append(requests, alternativeRequests...)
  129. var lastError error
  130. ret := &AssignResult{}
  131. for i, request := range requests {
  132. if request == nil {
  133. continue
  134. }
  135. lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  136. req := &master_pb.AssignRequest{
  137. Count: request.Count,
  138. Replication: request.Replication,
  139. Collection: request.Collection,
  140. Ttl: request.Ttl,
  141. DiskType: request.DiskType,
  142. DataCenter: request.DataCenter,
  143. Rack: request.Rack,
  144. DataNode: request.DataNode,
  145. WritableVolumeCount: request.WritableVolumeCount,
  146. }
  147. resp, grpcErr := masterClient.Assign(context.Background(), req)
  148. if grpcErr != nil {
  149. return grpcErr
  150. }
  151. if resp.Error != "" {
  152. return fmt.Errorf("assignRequest: %v", resp.Error)
  153. }
  154. ret.Count = resp.Count
  155. ret.Fid = resp.Fid
  156. ret.Url = resp.Location.Url
  157. ret.PublicUrl = resp.Location.PublicUrl
  158. ret.GrpcPort = int(resp.Location.GrpcPort)
  159. ret.Error = resp.Error
  160. ret.Auth = security.EncodedJwt(resp.Auth)
  161. for _, r := range resp.Replicas {
  162. ret.Replicas = append(ret.Replicas, Location{
  163. Url: r.Url,
  164. PublicUrl: r.PublicUrl,
  165. DataCenter: r.DataCenter,
  166. })
  167. }
  168. return nil
  169. })
  170. if lastError != nil {
  171. continue
  172. }
  173. if ret.Count <= 0 {
  174. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  175. continue
  176. }
  177. break
  178. }
  179. return ret, lastError
  180. }
  181. func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  182. WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  183. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  184. VolumeOrFileIds: []string{fileId},
  185. })
  186. if grpcErr != nil {
  187. return grpcErr
  188. }
  189. if len(resp.VolumeIdLocations) == 0 {
  190. return nil
  191. }
  192. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  193. return nil
  194. })
  195. return
  196. }
  197. type StorageOption struct {
  198. Replication string
  199. DiskType string
  200. Collection string
  201. DataCenter string
  202. Rack string
  203. DataNode string
  204. TtlSeconds int32
  205. Fsync bool
  206. VolumeGrowthCount uint32
  207. SaveInside bool
  208. }
  209. func (so *StorageOption) TtlString() string {
  210. return needle.SecondsToTTL(so.TtlSeconds)
  211. }
  212. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  213. ar = &VolumeAssignRequest{
  214. Count: uint64(count),
  215. Replication: so.Replication,
  216. Collection: so.Collection,
  217. Ttl: so.TtlString(),
  218. DiskType: so.DiskType,
  219. DataCenter: so.DataCenter,
  220. Rack: so.Rack,
  221. DataNode: so.DataNode,
  222. WritableVolumeCount: so.VolumeGrowthCount,
  223. }
  224. if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
  225. altRequest = &VolumeAssignRequest{
  226. Count: uint64(count),
  227. Replication: so.Replication,
  228. Collection: so.Collection,
  229. Ttl: so.TtlString(),
  230. DiskType: so.DiskType,
  231. DataCenter: "",
  232. Rack: "",
  233. DataNode: "",
  234. WritableVolumeCount: so.VolumeGrowthCount,
  235. }
  236. }
  237. return
  238. }