package operation import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "google.golang.org/grpc" "sync" ) type VolumeAssignRequest struct { Count uint64 Replication string Collection string Ttl string DiskType string DataCenter string Rack string DataNode string WritableVolumeCount uint32 } type AssignResult struct { Fid string `json:"fid,omitempty"` Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` GrpcPort int `json:"grpcPort,omitempty"` Count uint64 `json:"count,omitempty"` Error string `json:"error,omitempty"` Auth security.EncodedJwt `json:"auth,omitempty"` Replicas []Location `json:"replicas,omitempty"` } // This is a proxy to the master server, only for assigning volume ids. // It runs via grpc to the master server in streaming mode. // The connection to the master would only be re-established when the last connection has error. type AssignProxy struct { grpcConnection *grpc.ClientConn pool chan *singleThreadAssignProxy } func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) { ap = &AssignProxy{ pool: make(chan *singleThreadAssignProxy, concurrency), } ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err) } for i := 0; i < concurrency; i++ { ap.pool <- &singleThreadAssignProxy{} } return ap, nil } func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { p := <-ap.pool defer func() { ap.pool <- p }() return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...) } type singleThreadAssignProxy struct { assignClient master_pb.Seaweed_StreamAssignClient sync.Mutex } func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { ap.Lock() defer ap.Unlock() if ap.assignClient == nil { client := master_pb.NewSeaweedClient(grpcConnection) ap.assignClient, err = client.StreamAssign(context.Background()) if err != nil { ap.assignClient = nil return nil, fmt.Errorf("fail to create stream assign client: %v", err) } } var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) requests = append(requests, alternativeRequests...) ret = &AssignResult{} for _, request := range requests { if request == nil { continue } req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, Collection: request.Collection, Ttl: request.Ttl, DiskType: request.DiskType, DataCenter: request.DataCenter, Rack: request.Rack, DataNode: request.DataNode, WritableVolumeCount: request.WritableVolumeCount, } if err = ap.assignClient.Send(req); err != nil { return nil, fmt.Errorf("StreamAssignSend: %v", err) } resp, grpcErr := ap.assignClient.Recv() if grpcErr != nil { return nil, grpcErr } if resp.Error != "" { return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) } ret.Count = resp.Count ret.Fid = resp.Fid ret.Url = resp.Location.Url ret.PublicUrl = resp.Location.PublicUrl ret.GrpcPort = int(resp.Location.GrpcPort) ret.Error = resp.Error ret.Auth = security.EncodedJwt(resp.Auth) for _, r := range resp.Replicas { ret.Replicas = append(ret.Replicas, Location{ Url: r.Url, PublicUrl: r.PublicUrl, DataCenter: r.DataCenter, }) } if ret.Count <= 0 { continue } break } return } func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) requests = append(requests, alternativeRequests...) var lastError error ret := &AssignResult{} for i, request := range requests { if request == nil { continue } lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, Collection: request.Collection, Ttl: request.Ttl, DiskType: request.DiskType, DataCenter: request.DataCenter, Rack: request.Rack, DataNode: request.DataNode, WritableVolumeCount: request.WritableVolumeCount, } resp, grpcErr := masterClient.Assign(context.Background(), req) if grpcErr != nil { return grpcErr } if resp.Error != "" { return fmt.Errorf("assignRequest: %v", resp.Error) } ret.Count = resp.Count ret.Fid = resp.Fid ret.Url = resp.Location.Url ret.PublicUrl = resp.Location.PublicUrl ret.GrpcPort = int(resp.Location.GrpcPort) ret.Error = resp.Error ret.Auth = security.EncodedJwt(resp.Auth) for _, r := range resp.Replicas { ret.Replicas = append(ret.Replicas, Location{ Url: r.Url, PublicUrl: r.PublicUrl, DataCenter: r.DataCenter, }) } return nil }) if lastError != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc() continue } if ret.Count <= 0 { lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error) continue } break } return ret, lastError } func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) if grpcErr != nil { return grpcErr } if len(resp.VolumeIdLocations) == 0 { return nil } token = security.EncodedJwt(resp.VolumeIdLocations[0].Auth) return nil }) return } type StorageOption struct { Replication string DiskType string Collection string DataCenter string Rack string DataNode string TtlSeconds int32 VolumeGrowthCount uint32 MaxFileNameLength uint32 Fsync bool SaveInside bool } func (so *StorageOption) TtlString() string { return needle.SecondsToTTL(so.TtlSeconds) } func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) { ar = &VolumeAssignRequest{ Count: uint64(count), Replication: so.Replication, Collection: so.Collection, Ttl: so.TtlString(), DiskType: so.DiskType, DataCenter: so.DataCenter, Rack: so.Rack, DataNode: so.DataNode, WritableVolumeCount: so.VolumeGrowthCount, } if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" { altRequest = &VolumeAssignRequest{ Count: uint64(count), Replication: so.Replication, Collection: so.Collection, Ttl: so.TtlString(), DiskType: so.DiskType, DataCenter: "", Rack: "", DataNode: "", WritableVolumeCount: so.VolumeGrowthCount, } } return }