node.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package topology
  2. import (
  3. "errors"
  4. "math/rand/v2"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  14. )
  15. type NodeId string
  16. type Node interface {
  17. Id() NodeId
  18. String() string
  19. AvailableSpaceFor(option *VolumeGrowOption) int64
  20. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  21. UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
  22. UpAdjustMaxVolumeId(vid needle.VolumeId)
  23. GetDiskUsages() *DiskUsages
  24. GetMaxVolumeId() needle.VolumeId
  25. SetParent(Node)
  26. LinkChildNode(node Node)
  27. UnlinkChildNode(nodeId NodeId)
  28. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
  29. IsDataNode() bool
  30. IsRack() bool
  31. IsDataCenter() bool
  32. IsLocked() bool
  33. Children() []Node
  34. Parent() Node
  35. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  36. }
  37. type NodeImpl struct {
  38. diskUsages *DiskUsages
  39. id NodeId
  40. parent Node
  41. sync.RWMutex // lock children
  42. children map[NodeId]Node
  43. maxVolumeId needle.VolumeId
  44. //for rack, data center, topology
  45. nodeType string
  46. value interface{}
  47. }
  48. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  49. return n.diskUsages
  50. }
  51. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  52. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  53. var totalWeights int64
  54. var errs []string
  55. n.RLock()
  56. candidates := make([]Node, 0, len(n.children))
  57. candidatesWeights := make([]int64, 0, len(n.children))
  58. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  59. for _, node := range n.children {
  60. if node.AvailableSpaceFor(option) <= 0 {
  61. continue
  62. }
  63. totalWeights += node.AvailableSpaceFor(option)
  64. candidates = append(candidates, node)
  65. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  66. }
  67. n.RUnlock()
  68. if len(candidates) < numberOfNodes {
  69. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  70. return nil, nil, errors.New("Not enough data nodes found!")
  71. }
  72. //pick nodes randomly by weights, the node picked earlier has higher final weights
  73. sortedCandidates := make([]Node, 0, len(candidates))
  74. for i := 0; i < len(candidates); i++ {
  75. weightsInterval := rand.Int64N(totalWeights)
  76. lastWeights := int64(0)
  77. for k, weights := range candidatesWeights {
  78. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  79. sortedCandidates = append(sortedCandidates, candidates[k])
  80. candidatesWeights[k] = 0
  81. totalWeights -= weights
  82. break
  83. }
  84. lastWeights += weights
  85. }
  86. }
  87. restNodes = make([]Node, 0, numberOfNodes-1)
  88. ret := false
  89. n.RLock()
  90. for k, node := range sortedCandidates {
  91. if err := filterFirstNodeFn(node); err == nil {
  92. firstNode = node
  93. if k >= numberOfNodes-1 {
  94. restNodes = sortedCandidates[:numberOfNodes-1]
  95. } else {
  96. restNodes = append(restNodes, sortedCandidates[:k]...)
  97. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  98. }
  99. ret = true
  100. break
  101. } else {
  102. errs = append(errs, string(node.Id())+":"+err.Error())
  103. }
  104. }
  105. n.RUnlock()
  106. if !ret {
  107. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  108. }
  109. return
  110. }
  111. func (n *NodeImpl) IsDataNode() bool {
  112. return n.nodeType == "DataNode"
  113. }
  114. func (n *NodeImpl) IsRack() bool {
  115. return n.nodeType == "Rack"
  116. }
  117. func (n *NodeImpl) IsDataCenter() bool {
  118. return n.nodeType == "DataCenter"
  119. }
  120. func (n *NodeImpl) IsLocked() (isTryLock bool) {
  121. if isTryLock = n.TryRLock(); isTryLock {
  122. n.RUnlock()
  123. }
  124. return !isTryLock
  125. }
  126. func (n *NodeImpl) String() string {
  127. if n.parent != nil {
  128. return n.parent.String() + ":" + string(n.id)
  129. }
  130. return string(n.id)
  131. }
  132. func (n *NodeImpl) Id() NodeId {
  133. return n.id
  134. }
  135. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  136. return n.diskUsages.getOrCreateDisk(diskType)
  137. }
  138. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  139. t := n.getOrCreateDisk(option.DiskType)
  140. freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
  141. ecShardCount := atomic.LoadInt64(&t.ecShardCount)
  142. if ecShardCount > 0 {
  143. freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
  144. }
  145. return freeVolumeSlotCount
  146. }
  147. func (n *NodeImpl) SetParent(node Node) {
  148. n.parent = node
  149. }
  150. func (n *NodeImpl) Children() (ret []Node) {
  151. n.RLock()
  152. defer n.RUnlock()
  153. for _, c := range n.children {
  154. ret = append(ret, c)
  155. }
  156. return ret
  157. }
  158. func (n *NodeImpl) Parent() Node {
  159. return n.parent
  160. }
  161. func (n *NodeImpl) GetValue() interface{} {
  162. return n.value
  163. }
  164. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  165. n.RLock()
  166. defer n.RUnlock()
  167. for _, node := range n.children {
  168. freeSpace := node.AvailableSpaceFor(option)
  169. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  170. if freeSpace <= 0 {
  171. continue
  172. }
  173. if r >= freeSpace {
  174. r -= freeSpace
  175. } else {
  176. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  177. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  178. dn := node.(*DataNode)
  179. if dn.IsTerminating {
  180. continue
  181. }
  182. return dn, nil
  183. }
  184. assignedNode, err = node.ReserveOneVolume(r, option)
  185. if err == nil {
  186. return
  187. }
  188. }
  189. }
  190. return nil, errors.New("No free volume slot found!")
  191. }
  192. func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative
  193. existingDisk := n.getOrCreateDisk(diskType)
  194. existingDisk.addDiskUsageCounts(diskUsage)
  195. if n.parent != nil {
  196. n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage)
  197. }
  198. }
  199. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  200. if n.maxVolumeId < vid {
  201. n.maxVolumeId = vid
  202. if n.parent != nil {
  203. n.parent.UpAdjustMaxVolumeId(vid)
  204. }
  205. }
  206. }
  207. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  208. return n.maxVolumeId
  209. }
  210. func (n *NodeImpl) LinkChildNode(node Node) {
  211. n.Lock()
  212. defer n.Unlock()
  213. n.doLinkChildNode(node)
  214. }
  215. func (n *NodeImpl) doLinkChildNode(node Node) {
  216. if n.children[node.Id()] == nil {
  217. n.children[node.Id()] = node
  218. for dt, du := range node.GetDiskUsages().usages {
  219. n.UpAdjustDiskUsageDelta(dt, du)
  220. }
  221. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  222. node.SetParent(n)
  223. glog.V(0).Infoln(n, "adds child", node.Id())
  224. }
  225. }
  226. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  227. n.Lock()
  228. defer n.Unlock()
  229. node := n.children[nodeId]
  230. if node != nil {
  231. node.SetParent(nil)
  232. delete(n.children, node.Id())
  233. for dt, du := range node.GetDiskUsages().negative().usages {
  234. n.UpAdjustDiskUsageDelta(dt, du)
  235. }
  236. glog.V(0).Infoln(n, "removes", node.Id())
  237. }
  238. }
  239. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64, volumeSizeLimit uint64, growThreshold float64) {
  240. if n.IsRack() {
  241. for _, c := range n.Children() {
  242. dn := c.(*DataNode) //can not cast n to DataNode
  243. for _, v := range dn.GetVolumes() {
  244. topo := n.GetTopology()
  245. diskType := types.ToDiskType(v.DiskType)
  246. vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
  247. if v.Size >= volumeSizeLimit {
  248. vl.accessLock.RLock()
  249. vacuumTime, ok := vl.vacuumedVolumes[v.Id]
  250. vl.accessLock.RUnlock()
  251. // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
  252. // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
  253. // the volume size should be correct, not the size before the vacuum.
  254. if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
  255. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  256. topo.chanFullVolumes <- v
  257. }
  258. } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
  259. topo.chanCrowdedVolumes <- v
  260. }
  261. copyCount := v.ReplicaPlacement.GetCopyCount()
  262. if copyCount > 1 {
  263. if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
  264. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
  265. } else {
  266. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
  267. }
  268. }
  269. }
  270. }
  271. } else {
  272. for _, c := range n.Children() {
  273. c.CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime, volumeSizeLimit, growThreshold)
  274. }
  275. }
  276. }
  277. func (n *NodeImpl) GetTopology() *Topology {
  278. var p Node
  279. p = n
  280. for p.Parent() != nil {
  281. p = p.Parent()
  282. }
  283. return p.GetValue().(*Topology)
  284. }