disk.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package topology
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/storage"
  12. )
  13. type Disk struct {
  14. NodeImpl
  15. volumes map[needle.VolumeId]storage.VolumeInfo
  16. ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
  17. ecShardsLock sync.RWMutex
  18. }
  19. func NewDisk(diskType string) *Disk {
  20. s := &Disk{}
  21. s.id = NodeId(diskType)
  22. s.nodeType = "Disk"
  23. s.diskUsages = newDiskUsages()
  24. s.volumes = make(map[needle.VolumeId]storage.VolumeInfo, 2)
  25. s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo, 2)
  26. s.NodeImpl.value = s
  27. return s
  28. }
  29. type DiskUsages struct {
  30. sync.RWMutex
  31. usages map[types.DiskType]*DiskUsageCounts
  32. }
  33. func newDiskUsages() *DiskUsages {
  34. return &DiskUsages{
  35. usages: make(map[types.DiskType]*DiskUsageCounts),
  36. }
  37. }
  38. func (d *DiskUsages) negative() *DiskUsages {
  39. d.RLock()
  40. defer d.RUnlock()
  41. t := newDiskUsages()
  42. for diskType, b := range d.usages {
  43. a := t.getOrCreateDisk(diskType)
  44. a.volumeCount = -b.volumeCount
  45. a.remoteVolumeCount = -b.remoteVolumeCount
  46. a.activeVolumeCount = -b.activeVolumeCount
  47. a.ecShardCount = -b.ecShardCount
  48. a.maxVolumeCount = -b.maxVolumeCount
  49. }
  50. return t
  51. }
  52. func (d *DiskUsages) ToDiskInfo() map[string]*master_pb.DiskInfo {
  53. ret := make(map[string]*master_pb.DiskInfo)
  54. for diskType, diskUsageCounts := range d.usages {
  55. m := &master_pb.DiskInfo{
  56. VolumeCount: diskUsageCounts.volumeCount,
  57. MaxVolumeCount: diskUsageCounts.maxVolumeCount,
  58. FreeVolumeCount: diskUsageCounts.maxVolumeCount - (diskUsageCounts.volumeCount - diskUsageCounts.remoteVolumeCount) - (diskUsageCounts.ecShardCount+1)/erasure_coding.DataShardsCount,
  59. ActiveVolumeCount: diskUsageCounts.activeVolumeCount,
  60. RemoteVolumeCount: diskUsageCounts.remoteVolumeCount,
  61. }
  62. ret[string(diskType)] = m
  63. }
  64. return ret
  65. }
  66. func (d *DiskUsages) FreeSpace() (freeSpace int64) {
  67. d.RLock()
  68. defer d.RUnlock()
  69. for _, diskUsage := range d.usages {
  70. freeSpace += diskUsage.FreeSpace()
  71. }
  72. return
  73. }
  74. func (d *DiskUsages) GetMaxVolumeCount() (maxVolumeCount int64) {
  75. d.RLock()
  76. defer d.RUnlock()
  77. for _, diskUsage := range d.usages {
  78. maxVolumeCount += diskUsage.maxVolumeCount
  79. }
  80. return
  81. }
  82. type DiskUsageCounts struct {
  83. volumeCount int64
  84. remoteVolumeCount int64
  85. activeVolumeCount int64
  86. ecShardCount int64
  87. maxVolumeCount int64
  88. }
  89. func (a *DiskUsageCounts) addDiskUsageCounts(b *DiskUsageCounts) {
  90. atomic.AddInt64(&a.volumeCount, b.volumeCount)
  91. atomic.AddInt64(&a.remoteVolumeCount, b.remoteVolumeCount)
  92. atomic.AddInt64(&a.activeVolumeCount, b.activeVolumeCount)
  93. atomic.AddInt64(&a.ecShardCount, b.ecShardCount)
  94. atomic.AddInt64(&a.maxVolumeCount, b.maxVolumeCount)
  95. }
  96. func (a *DiskUsageCounts) FreeSpace() int64 {
  97. freeVolumeSlotCount := a.maxVolumeCount + a.remoteVolumeCount - a.volumeCount
  98. if a.ecShardCount > 0 {
  99. freeVolumeSlotCount = freeVolumeSlotCount - a.ecShardCount/erasure_coding.DataShardsCount - 1
  100. }
  101. return freeVolumeSlotCount
  102. }
  103. func (a *DiskUsageCounts) minus(b *DiskUsageCounts) *DiskUsageCounts {
  104. return &DiskUsageCounts{
  105. volumeCount: a.volumeCount - b.volumeCount,
  106. remoteVolumeCount: a.remoteVolumeCount - b.remoteVolumeCount,
  107. activeVolumeCount: a.activeVolumeCount - b.activeVolumeCount,
  108. ecShardCount: a.ecShardCount - b.ecShardCount,
  109. maxVolumeCount: a.maxVolumeCount - b.maxVolumeCount,
  110. }
  111. }
  112. func (du *DiskUsages) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  113. du.Lock()
  114. defer du.Unlock()
  115. t, found := du.usages[diskType]
  116. if found {
  117. return t
  118. }
  119. t = &DiskUsageCounts{}
  120. du.usages[diskType] = t
  121. return t
  122. }
  123. func (d *Disk) String() string {
  124. d.RLock()
  125. defer d.RUnlock()
  126. return fmt.Sprintf("Disk:%s, volumes:%v, ecShards:%v", d.NodeImpl.String(), d.volumes, d.ecShards)
  127. }
  128. func (d *Disk) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  129. d.Lock()
  130. defer d.Unlock()
  131. return d.doAddOrUpdateVolume(v)
  132. }
  133. func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  134. deltaDiskUsage := &DiskUsageCounts{}
  135. if oldV, ok := d.volumes[v.Id]; !ok {
  136. d.volumes[v.Id] = v
  137. deltaDiskUsage.volumeCount = 1
  138. if v.IsRemote() {
  139. deltaDiskUsage.remoteVolumeCount = 1
  140. }
  141. if !v.ReadOnly {
  142. deltaDiskUsage.activeVolumeCount = 1
  143. }
  144. d.UpAdjustMaxVolumeId(v.Id)
  145. d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
  146. isNew = true
  147. } else {
  148. if oldV.IsRemote() != v.IsRemote() {
  149. if v.IsRemote() {
  150. deltaDiskUsage.remoteVolumeCount = 1
  151. }
  152. if oldV.IsRemote() {
  153. deltaDiskUsage.remoteVolumeCount = -1
  154. }
  155. d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
  156. }
  157. isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
  158. d.volumes[v.Id] = v
  159. }
  160. return
  161. }
  162. func (d *Disk) GetVolumes() (ret []storage.VolumeInfo) {
  163. d.RLock()
  164. for _, v := range d.volumes {
  165. ret = append(ret, v)
  166. }
  167. d.RUnlock()
  168. return ret
  169. }
  170. func (d *Disk) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
  171. d.RLock()
  172. defer d.RUnlock()
  173. vInfo, ok := d.volumes[id]
  174. if ok {
  175. return vInfo, nil
  176. } else {
  177. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  178. }
  179. }
  180. func (d *Disk) DeleteVolumeById(id needle.VolumeId) {
  181. d.Lock()
  182. defer d.Unlock()
  183. delete(d.volumes, id)
  184. }
  185. func (d *Disk) GetDataCenter() *DataCenter {
  186. dn := d.Parent()
  187. rack := dn.Parent()
  188. dcNode := rack.Parent()
  189. dcValue := dcNode.GetValue()
  190. return dcValue.(*DataCenter)
  191. }
  192. func (d *Disk) GetRack() *Rack {
  193. return d.Parent().Parent().(*NodeImpl).value.(*Rack)
  194. }
  195. func (d *Disk) GetTopology() *Topology {
  196. p := d.Parent()
  197. for p.Parent() != nil {
  198. p = p.Parent()
  199. }
  200. t := p.(*Topology)
  201. return t
  202. }
  203. func (d *Disk) ToMap() interface{} {
  204. ret := make(map[string]interface{})
  205. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  206. ret["Volumes"] = diskUsage.volumeCount
  207. ret["VolumeIds"] = d.GetVolumeIds()
  208. ret["EcShards"] = diskUsage.ecShardCount
  209. ret["Max"] = diskUsage.maxVolumeCount
  210. ret["Free"] = d.FreeSpace()
  211. return ret
  212. }
  213. func (d *Disk) FreeSpace() int64 {
  214. t := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  215. return t.FreeSpace()
  216. }
  217. func (d *Disk) ToDiskInfo() *master_pb.DiskInfo {
  218. diskUsage := d.diskUsages.getOrCreateDisk(types.ToDiskType(string(d.Id())))
  219. m := &master_pb.DiskInfo{
  220. Type: string(d.Id()),
  221. VolumeCount: diskUsage.volumeCount,
  222. MaxVolumeCount: diskUsage.maxVolumeCount,
  223. FreeVolumeCount: diskUsage.maxVolumeCount - (diskUsage.volumeCount - diskUsage.remoteVolumeCount) - (diskUsage.ecShardCount+1)/erasure_coding.DataShardsCount,
  224. ActiveVolumeCount: diskUsage.activeVolumeCount,
  225. RemoteVolumeCount: diskUsage.remoteVolumeCount,
  226. }
  227. for _, v := range d.GetVolumes() {
  228. m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
  229. }
  230. for _, ecv := range d.GetEcShards() {
  231. m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
  232. }
  233. return m
  234. }
  235. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  236. func (d *Disk) GetVolumeIds() string {
  237. d.RLock()
  238. defer d.RUnlock()
  239. ids := make([]int, 0, len(d.volumes))
  240. for k := range d.volumes {
  241. ids = append(ids, int(k))
  242. }
  243. return util.HumanReadableIntsMax(100, ids...)
  244. }