disk.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/storage/types"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage"
  11. )
  12. type Disk struct {
  13. NodeImpl
  14. volumes map[needle.VolumeId]storage.VolumeInfo
  15. ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
  16. ecShardsLock sync.RWMutex
  17. }
  18. func NewDisk(diskType string) *Disk {
  19. s := &Disk{}
  20. s.id = NodeId(diskType)
  21. s.nodeType = "Disk"
  22. s.diskUsages = newDiskUsages()
  23. s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2)
  24. s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2)
  25. s.NodeImpl.value = s
  26. return s
  27. }
  28. type DiskUsages struct {
  29. sync.RWMutex
  30. usages map[types.DiskType]*DiskUsageCounts
  31. }
  32. func newDiskUsages() *DiskUsages {
  33. return &DiskUsages{
  34. usages: make(map[types.DiskType]*DiskUsageCounts),
  35. }
  36. }
  37. func (d *DiskUsages) negative() *DiskUsages {
  38. d.RLock()
  39. defer d.RUnlock()
  40. t := newDiskUsages()
  41. for diskType, b := range d.usages {
  42. a := t.getOrCreateDisk(diskType)
  43. a.volumeCount = -b.volumeCount
  44. a.remoteVolumeCount = -b.remoteVolumeCount
  45. a.activeVolumeCount = -b.activeVolumeCount
  46. a.ecShardCount = -b.ecShardCount
  47. a.maxVolumeCount = -b.maxVolumeCount
  48. }
  49. return t
  50. }
  51. func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
  52. ret := make(map[string]*master_pb.DiskInfo)
  53. for diskType, diskUsageCounts := range d.usages {
  54. m := &master_pb.DiskInfo{
  55. VolumeCount: diskUsageCounts.volumeCount,
  56. MaxVolumeCount: diskUsageCounts.maxVolumeCount,
  57. FreeVolumeCount: diskUsageCounts.maxVolumeCount - diskUsageCounts.volumeCount,
  58. ActiveVolumeCount: diskUsageCounts.activeVolumeCount,
  59. RemoteVolumeCount: diskUsageCounts.remoteVolumeCount,
  60. }
  61. ret[string(diskType)] = m
  62. }
  63. return ret
  64. }
  65. func (d *DiskUsages) FreeSpace() (freeSpace int64) {
  66. d.RLock()
  67. defer d.RUnlock()
  68. for _, diskUsage := range d.usages {
  69. freeSpace += diskUsage.FreeSpace()
  70. }
  71. return
  72. }
  73. func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) {
  74. d.RLock()
  75. defer d.RUnlock()
  76. for _, diskUsage := range d.usages {
  77. maxVolumeCount += diskUsage.maxVolumeCount
  78. }
  79. return
  80. }
  81. type DiskUsageCounts struct {
  82. volumeCount int64
  83. remoteVolumeCount int64
  84. activeVolumeCount int64
  85. ecShardCount int64
  86. maxVolumeCount int64
  87. }
  88. func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) {
  89. a.volumeCount += b.volumeCount
  90. a.remoteVolumeCount += b.remoteVolumeCount
  91. a.activeVolumeCount += b.activeVolumeCount
  92. a.ecShardCount += b.ecShardCount
  93. a.maxVolumeCount += b.maxVolumeCount
  94. }
  95. func (a *DiskUsageCounts) FreeSpace() int64 {
  96. freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount
  97. if a.ecShardCount > 0 {
  98. freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1
  99. }
  100. return freeVolumeSlotCount
  101. }
  102. func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts {
  103. return &DiskUsageCounts{
  104. volumeCount: a.volumeCount - b.volumeCount,
  105. remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount,
  106. activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount,
  107. ecShardCount: a.ecShardCount - b.ecShardCount,
  108. maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount,
  109. }
  110. }
  111. func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  112. du.Lock()
  113. defer du.Unlock()
  114. t, found := du.usages[diskType]
  115. if found {
  116. return t
  117. }
  118. t = &DiskUsageCounts{}
  119. du.usages[diskType] = t
  120. return t
  121. }
  122. func (d *Disk) String() string {
  123. d.RLock()
  124. defer d.RUnlock()
  125. return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards)
  126. }
  127. func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  128. d.Lock()
  129. defer d.Unlock()
  130. return d.doAddOrUpdateVolume(v)
  131. }
  132. func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  133. deltaDiskUsages := newDiskUsages()
  134. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  135. if oldV, ok := d.volumes[v.Id]; !ok {
  136. d.volumes[v.Id] = v
  137. deltaDiskUsage.volumeCount = 1
  138. if v.IsRemote() {
  139. deltaDiskUsage.remoteVolumeCount = 1
  140. }
  141. if !v.ReadOnly {
  142. deltaDiskUsage.activeVolumeCount = 1
  143. }
  144. d.UpAdjustMaxVolumeId(v.Id)
  145. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  146. isNew = true
  147. } else {
  148. if oldV.IsRemote() != v.IsRemote() {
  149. if v.IsRemote() {
  150. deltaDiskUsage.remoteVolumeCount = 1
  151. }
  152. if oldV.IsRemote() {
  153. deltaDiskUsage.remoteVolumeCount = -1
  154. }
  155. d.UpAdjustDiskUsageDelta(deltaDiskUsages)
  156. }
  157. isChangedRO = d.volumes[v.Id].ReadOnly != v.ReadOnly
  158. d.volumes[v.Id] = v
  159. }
  160. return
  161. }
  162. func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) {
  163. d.RLock()
  164. for _, v := range d.volumes {
  165. ret = append(ret, v)
  166. }
  167. d.RUnlock()
  168. return ret
  169. }
  170. func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
  171. d.RLock()
  172. defer d.RUnlock()
  173. vInfo, ok := d.volumes[id]
  174. if ok {
  175. return vInfo, nil
  176. } else {
  177. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  178. }
  179. }
  180. func (d *Disk) GetDataCenter() *DataCenter {
  181. dn := d.Parent()
  182. rack := dn.Parent()
  183. dcNode := rack.Parent()
  184. dcValue := dcNode.GetValue()
  185. return dcValue.(*DataCenter)
  186. }
  187. func (d *Disk) GetRack() *Rack {
  188. return d.Parent().Parent().(*NodeImpl).value.(*Rack)
  189. }
  190. func (d *Disk) GetTopology() *Topology {
  191. p := d.Parent()
  192. for p.Parent() != nil {
  193. p = p.Parent()
  194. }
  195. t := p.(*Topology)
  196. return t
  197. }
  198. func (d *Disk) ToMap() interface{} {
  199. ret := make(map[string]interface{})
  200. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  201. ret["Volumes"] = diskUsage.volumeCount
  202. ret["VolumeIds"] = d.GetVolumeIds()
  203. ret["EcShards"] = diskUsage.ecShardCount
  204. ret["Max"] = diskUsage.maxVolumeCount
  205. ret["Free"] = d.FreeSpace()
  206. return ret
  207. }
  208. func (d *Disk) FreeSpace() int64 {
  209. t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  210. return t.FreeSpace()
  211. }
  212. func (d *Disk) ToDiskInfo() *master_pb.DiskInfo {
  213. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  214. m := &master_pb.DiskInfo{
  215. Type: string(d.Id()),
  216. VolumeCount: diskUsage.volumeCount,
  217. MaxVolumeCount: diskUsage.maxVolumeCount,
  218. FreeVolumeCount: diskUsage.maxVolumeCount - diskUsage.volumeCount,
  219. ActiveVolumeCount: diskUsage.activeVolumeCount,
  220. RemoteVolumeCount: diskUsage.remoteVolumeCount,
  221. }
  222. for _, v := range d.GetVolumes() {
  223. m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
  224. }
  225. for _, ecv := range d.GetEcShards() {
  226. m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
  227. }
  228. return m
  229. }
  230. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  231. func (d *Disk) GetVolumeIds() string {
  232. d.RLock()
  233. defer d.RUnlock()
  234. ids := make([]int, 0, len(d.volumes))
  235. for k := range d.volumes {
  236. ids = append(ids, int(k))
  237. }
  238. return util.HumanReadableIntsMax(100, ids...)
  239. }