assign_file_id.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/security"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "google.golang.org/grpc"
  10. "sync"
  11. )
  12. type VolumeAssignRequest struct {
  13. Count uint64
  14. Replication string
  15. Collection string
  16. Ttl string
  17. DiskType string
  18. DataCenter string
  19. Rack string
  20. DataNode string
  21. WritableVolumeCount uint32
  22. }
  23. type AssignResult struct {
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. PublicUrl string `json:"publicUrl,omitempty"`
  27. GrpcPort int `json:"grpcPort,omitempty"`
  28. Count uint64 `json:"count,omitempty"`
  29. Error string `json:"error,omitempty"`
  30. Auth security.EncodedJwt `json:"auth,omitempty"`
  31. Replicas []Location `json:"replicas,omitempty"`
  32. }
  33. // This is a proxy to the master server, only for assigning volume ids.
  34. // It runs via grpc to the master server in streaming mode.
  35. // The connection to the master would only be re-established when the last connection has error.
  36. type AssignProxy struct {
  37. grpcConnection *grpc.ClientConn
  38. pool chan *singleThreadAssignProxy
  39. }
  40. func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
  41. ap = &AssignProxy{
  42. pool: make(chan *singleThreadAssignProxy, concurrency),
  43. }
  44. ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
  45. if err != nil {
  46. return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
  47. }
  48. for i := 0; i < concurrency; i++ {
  49. ap.pool <- &singleThreadAssignProxy{}
  50. }
  51. return ap, nil
  52. }
  53. func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  54. p := <-ap.pool
  55. defer func() {
  56. ap.pool <- p
  57. }()
  58. return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
  59. }
  60. type singleThreadAssignProxy struct {
  61. assignClient master_pb.Seaweed_StreamAssignClient
  62. sync.Mutex
  63. }
  64. func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
  65. ap.Lock()
  66. defer ap.Unlock()
  67. if ap.assignClient == nil {
  68. client := master_pb.NewSeaweedClient(grpcConnection)
  69. ap.assignClient, err = client.StreamAssign(context.Background())
  70. if err != nil {
  71. ap.assignClient = nil
  72. return nil, fmt.Errorf("fail to create stream assign client: %v", err)
  73. }
  74. }
  75. var requests []*VolumeAssignRequest
  76. requests = append(requests, primaryRequest)
  77. requests = append(requests, alternativeRequests...)
  78. ret = &AssignResult{}
  79. for _, request := range requests {
  80. if request == nil {
  81. continue
  82. }
  83. req := &master_pb.AssignRequest{
  84. Count: request.Count,
  85. Replication: request.Replication,
  86. Collection: request.Collection,
  87. Ttl: request.Ttl,
  88. DiskType: request.DiskType,
  89. DataCenter: request.DataCenter,
  90. Rack: request.Rack,
  91. DataNode: request.DataNode,
  92. WritableVolumeCount: request.WritableVolumeCount,
  93. }
  94. if err = ap.assignClient.Send(req); err != nil {
  95. return nil, fmt.Errorf("StreamAssignSend: %v", err)
  96. }
  97. resp, grpcErr := ap.assignClient.Recv()
  98. if grpcErr != nil {
  99. return nil, grpcErr
  100. }
  101. if resp.Error != "" {
  102. return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
  103. }
  104. ret.Count = resp.Count
  105. ret.Fid = resp.Fid
  106. ret.Url = resp.Location.Url
  107. ret.PublicUrl = resp.Location.PublicUrl
  108. ret.GrpcPort = int(resp.Location.GrpcPort)
  109. ret.Error = resp.Error
  110. ret.Auth = security.EncodedJwt(resp.Auth)
  111. for _, r := range resp.Replicas {
  112. ret.Replicas = append(ret.Replicas, Location{
  113. Url: r.Url,
  114. PublicUrl: r.PublicUrl,
  115. DataCenter: r.DataCenter,
  116. })
  117. }
  118. if ret.Count <= 0 {
  119. continue
  120. }
  121. break
  122. }
  123. return
  124. }
  125. func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
  126. var requests []*VolumeAssignRequest
  127. requests = append(requests, primaryRequest)
  128. requests = append(requests, alternativeRequests...)
  129. var lastError error
  130. ret := &AssignResult{}
  131. for i, request := range requests {
  132. if request == nil {
  133. continue
  134. }
  135. lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  136. req := &master_pb.AssignRequest{
  137. Count: request.Count,
  138. Replication: request.Replication,
  139. Collection: request.Collection,
  140. Ttl: request.Ttl,
  141. DiskType: request.DiskType,
  142. DataCenter: request.DataCenter,
  143. Rack: request.Rack,
  144. DataNode: request.DataNode,
  145. WritableVolumeCount: request.WritableVolumeCount,
  146. }
  147. resp, grpcErr := masterClient.Assign(context.Background(), req)
  148. if grpcErr != nil {
  149. return grpcErr
  150. }
  151. if resp.Error != "" {
  152. return fmt.Errorf("assignRequest: %v", resp.Error)
  153. }
  154. ret.Count = resp.Count
  155. ret.Fid = resp.Fid
  156. ret.Url = resp.Location.Url
  157. ret.PublicUrl = resp.Location.PublicUrl
  158. ret.GrpcPort = int(resp.Location.GrpcPort)
  159. ret.Error = resp.Error
  160. ret.Auth = security.EncodedJwt(resp.Auth)
  161. for _, r := range resp.Replicas {
  162. ret.Replicas = append(ret.Replicas, Location{
  163. Url: r.Url,
  164. PublicUrl: r.PublicUrl,
  165. DataCenter: r.DataCenter,
  166. })
  167. }
  168. return nil
  169. })
  170. if lastError != nil {
  171. continue
  172. }
  173. if ret.Count <= 0 {
  174. lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
  175. continue
  176. }
  177. break
  178. }
  179. return ret, lastError
  180. }
  181. func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
  182. WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  183. resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
  184. VolumeOrFileIds: []string{fileId},
  185. })
  186. if grpcErr != nil {
  187. return grpcErr
  188. }
  189. if len(resp.VolumeIdLocations) == 0 {
  190. return nil
  191. }
  192. token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth)
  193. return nil
  194. })
  195. return
  196. }
  197. type StorageOption struct {
  198. Replication string
  199. DiskType string
  200. Collection string
  201. DataCenter string
  202. Rack string
  203. DataNode string
  204. TtlSeconds int32
  205. VolumeGrowthCount uint32
  206. MaxFileNameLength uint32
  207. Fsync bool
  208. SaveInside bool
  209. }
  210. func (so *StorageOption) TtlString() string {
  211. return needle.SecondsToTTL(so.TtlSeconds)
  212. }
  213. func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
  214. ar = &VolumeAssignRequest{
  215. Count: uint64(count),
  216. Replication: so.Replication,
  217. Collection: so.Collection,
  218. Ttl: so.TtlString(),
  219. DiskType: so.DiskType,
  220. DataCenter: so.DataCenter,
  221. Rack: so.Rack,
  222. DataNode: so.DataNode,
  223. WritableVolumeCount: so.VolumeGrowthCount,
  224. }
  225. if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
  226. altRequest = &VolumeAssignRequest{
  227. Count: uint64(count),
  228. Replication: so.Replication,
  229. Collection: so.Collection,
  230. Ttl: so.TtlString(),
  231. DiskType: so.DiskType,
  232. DataCenter: "",
  233. Rack: "",
  234. DataNode: "",
  235. WritableVolumeCount: so.VolumeGrowthCount,
  236. }
  237. }
  238. return
  239. }