master_grpc_server_volume.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/raft"
  6. "reflect"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/security"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  16. "github.com/seaweedfs/seaweedfs/weed/topology"
  17. )
  18. func (ms *MasterServer) ProcessGrowRequest() {
  19. go func() {
  20. filter := sync.Map{}
  21. for {
  22. req, ok := <-ms.vgCh
  23. if !ok {
  24. break
  25. }
  26. if !ms.Topo.IsLeader() {
  27. //discard buffered requests
  28. time.Sleep(time.Second * 1)
  29. continue
  30. }
  31. // filter out identical requests being processed
  32. found := false
  33. filter.Range(func(k, v interface{}) bool {
  34. if reflect.DeepEqual(k, req) {
  35. found = true
  36. }
  37. return !found
  38. })
  39. option := req.Option
  40. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
  41. // not atomic but it's okay
  42. if !found && vl.ShouldGrowVolumes(option) {
  43. filter.Store(req, nil)
  44. // we have lock called inside vg
  45. go func() {
  46. glog.V(1).Infoln("starting automatic volume grow")
  47. start := time.Now()
  48. newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
  49. glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
  50. if err == nil {
  51. for _, newVidLocation := range newVidLocations {
  52. ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
  53. }
  54. }
  55. vl.DoneGrowRequest()
  56. if req.ErrCh != nil {
  57. req.ErrCh <- err
  58. close(req.ErrCh)
  59. }
  60. filter.Delete(req)
  61. }()
  62. } else {
  63. glog.V(4).Infoln("discard volume grow request")
  64. }
  65. }
  66. }()
  67. }
  68. func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) {
  69. resp := &master_pb.LookupVolumeResponse{}
  70. volumeLocations := ms.lookupVolumeId(req.VolumeOrFileIds, req.Collection)
  71. for _, result := range volumeLocations {
  72. var locations []*master_pb.Location
  73. for _, loc := range result.Locations {
  74. locations = append(locations, &master_pb.Location{
  75. Url: loc.Url,
  76. PublicUrl: loc.PublicUrl,
  77. DataCenter: loc.DataCenter,
  78. })
  79. }
  80. var auth string
  81. if strings.Contains(result.VolumeOrFileId, ",") { // this is a file id
  82. auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId))
  83. }
  84. resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{
  85. VolumeOrFileId: result.VolumeOrFileId,
  86. Locations: locations,
  87. Error: result.Error,
  88. Auth: auth,
  89. })
  90. }
  91. return resp, nil
  92. }
  93. func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) {
  94. if !ms.Topo.IsLeader() {
  95. return nil, raft.NotLeaderError
  96. }
  97. if req.Count == 0 {
  98. req.Count = 1
  99. }
  100. if req.Replication == "" {
  101. req.Replication = ms.option.DefaultReplicaPlacement
  102. }
  103. replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
  104. if err != nil {
  105. return nil, err
  106. }
  107. ttl, err := needle.ReadTTL(req.Ttl)
  108. if err != nil {
  109. return nil, err
  110. }
  111. diskType := types.ToDiskType(req.DiskType)
  112. option := &topology.VolumeGrowOption{
  113. Collection: req.Collection,
  114. ReplicaPlacement: replicaPlacement,
  115. Ttl: ttl,
  116. DiskType: diskType,
  117. Preallocate: ms.preallocateSize,
  118. DataCenter: req.DataCenter,
  119. Rack: req.Rack,
  120. DataNode: req.DataNode,
  121. MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
  122. }
  123. vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
  124. if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) {
  125. if ms.Topo.AvailableSpaceFor(option) <= 0 {
  126. return nil, fmt.Errorf("no free volumes left for " + option.String())
  127. }
  128. vl.AddGrowRequest()
  129. ms.vgCh <- &topology.VolumeGrowRequest{
  130. Option: option,
  131. Count: int(req.WritableVolumeCount),
  132. }
  133. }
  134. var (
  135. lastErr error
  136. maxTimeout = time.Second * 10
  137. startTime = time.Now()
  138. )
  139. for time.Now().Sub(startTime) < maxTimeout {
  140. fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
  141. if err == nil {
  142. dn := dnList.Head()
  143. var replicas []*master_pb.Location
  144. for _, r := range dnList.Rest() {
  145. replicas = append(replicas, &master_pb.Location{
  146. Url: r.Url(),
  147. PublicUrl: r.PublicUrl,
  148. GrpcPort: uint32(r.GrpcPort),
  149. DataCenter: r.GetDataCenterId(),
  150. })
  151. }
  152. return &master_pb.AssignResponse{
  153. Fid: fid,
  154. Location: &master_pb.Location{
  155. Url: dn.Url(),
  156. PublicUrl: dn.PublicUrl,
  157. GrpcPort: uint32(dn.GrpcPort),
  158. DataCenter: dn.GetDataCenterId(),
  159. },
  160. Count: count,
  161. Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
  162. Replicas: replicas,
  163. }, nil
  164. }
  165. //glog.V(4).Infoln("waiting for volume growing...")
  166. lastErr = err
  167. time.Sleep(200 * time.Millisecond)
  168. }
  169. return nil, lastErr
  170. }
  171. func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) {
  172. if !ms.Topo.IsLeader() {
  173. return nil, raft.NotLeaderError
  174. }
  175. if req.Replication == "" {
  176. req.Replication = ms.option.DefaultReplicaPlacement
  177. }
  178. replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
  179. if err != nil {
  180. return nil, err
  181. }
  182. ttl, err := needle.ReadTTL(req.Ttl)
  183. if err != nil {
  184. return nil, err
  185. }
  186. volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
  187. stats := volumeLayout.Stats()
  188. totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
  189. resp := &master_pb.StatisticsResponse{
  190. TotalSize: uint64(totalSize),
  191. UsedSize: stats.UsedSize,
  192. FileCount: stats.FileCount,
  193. }
  194. return resp, nil
  195. }
  196. func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
  197. if !ms.Topo.IsLeader() {
  198. return nil, raft.NotLeaderError
  199. }
  200. resp := &master_pb.VolumeListResponse{
  201. TopologyInfo: ms.Topo.ToTopologyInfo(),
  202. VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
  203. }
  204. return resp, nil
  205. }
  206. func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
  207. if !ms.Topo.IsLeader() {
  208. return nil, raft.NotLeaderError
  209. }
  210. resp := &master_pb.LookupEcVolumeResponse{}
  211. ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
  212. if !found {
  213. return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
  214. }
  215. resp.VolumeId = req.VolumeId
  216. for shardId, shardLocations := range ecLocations.Locations {
  217. var locations []*master_pb.Location
  218. for _, dn := range shardLocations {
  219. locations = append(locations, &master_pb.Location{
  220. Url: string(dn.Id()),
  221. PublicUrl: dn.PublicUrl,
  222. DataCenter: dn.GetDataCenterId(),
  223. })
  224. }
  225. resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
  226. ShardId: uint32(shardId),
  227. Locations: locations,
  228. })
  229. }
  230. return resp, nil
  231. }
  232. func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
  233. if !ms.Topo.IsLeader() {
  234. return nil, raft.NotLeaderError
  235. }
  236. resp := &master_pb.VacuumVolumeResponse{}
  237. ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
  238. return resp, nil
  239. }
  240. func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
  241. if !ms.Topo.IsLeader() {
  242. return nil, raft.NotLeaderError
  243. }
  244. resp := &master_pb.VolumeMarkReadonlyResponse{}
  245. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
  246. vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
  247. dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
  248. for _, dn := range dataNodes {
  249. if dn.Ip == req.Ip && dn.Port == int(req.Port) {
  250. if req.IsReadonly {
  251. vl.SetVolumeUnavailable(dn, needle.VolumeId(req.VolumeId))
  252. } else {
  253. vl.SetVolumeAvailable(dn, needle.VolumeId(req.VolumeId), false)
  254. }
  255. }
  256. }
  257. return resp, nil
  258. }