node.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package topology
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/stats"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "math/rand"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. )
  14. type NodeId string
  15. type Node interface {
  16. Id() NodeId
  17. String() string
  18. AvailableSpaceFor(option *VolumeGrowOption) int64
  19. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  20. UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
  21. UpAdjustMaxVolumeId(vid needle.VolumeId)
  22. GetDiskUsages() *DiskUsages
  23. GetMaxVolumeId() needle.VolumeId
  24. SetParent(Node)
  25. LinkChildNode(node Node)
  26. UnlinkChildNode(nodeId NodeId)
  27. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
  28. IsDataNode() bool
  29. IsRack() bool
  30. IsDataCenter() bool
  31. Children() []Node
  32. Parent() Node
  33. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  34. }
  35. type NodeImpl struct {
  36. diskUsages *DiskUsages
  37. id NodeId
  38. parent Node
  39. sync.RWMutex // lock children
  40. children map[NodeId]Node
  41. maxVolumeId needle.VolumeId
  42. //for rack, data center, topology
  43. nodeType string
  44. value interface{}
  45. }
  46. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  47. return n.diskUsages
  48. }
  49. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  50. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  51. var totalWeights int64
  52. var errs []string
  53. n.RLock()
  54. candidates := make([]Node, 0, len(n.children))
  55. candidatesWeights := make([]int64, 0, len(n.children))
  56. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  57. for _, node := range n.children {
  58. if node.AvailableSpaceFor(option) <= 0 {
  59. continue
  60. }
  61. totalWeights += node.AvailableSpaceFor(option)
  62. candidates = append(candidates, node)
  63. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  64. }
  65. n.RUnlock()
  66. if len(candidates) < numberOfNodes {
  67. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  68. return nil, nil, errors.New("No enough data node found!")
  69. }
  70. //pick nodes randomly by weights, the node picked earlier has higher final weights
  71. sortedCandidates := make([]Node, 0, len(candidates))
  72. for i := 0; i < len(candidates); i++ {
  73. weightsInterval := rand.Int63n(totalWeights)
  74. lastWeights := int64(0)
  75. for k, weights := range candidatesWeights {
  76. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  77. sortedCandidates = append(sortedCandidates, candidates[k])
  78. candidatesWeights[k] = 0
  79. totalWeights -= weights
  80. break
  81. }
  82. lastWeights += weights
  83. }
  84. }
  85. restNodes = make([]Node, 0, numberOfNodes-1)
  86. ret := false
  87. n.RLock()
  88. for k, node := range sortedCandidates {
  89. if err := filterFirstNodeFn(node); err == nil {
  90. firstNode = node
  91. if k >= numberOfNodes-1 {
  92. restNodes = sortedCandidates[:numberOfNodes-1]
  93. } else {
  94. restNodes = append(restNodes, sortedCandidates[:k]...)
  95. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  96. }
  97. ret = true
  98. break
  99. } else {
  100. errs = append(errs, string(node.Id())+":"+err.Error())
  101. }
  102. }
  103. n.RUnlock()
  104. if !ret {
  105. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  106. }
  107. return
  108. }
  109. func (n *NodeImpl) IsDataNode() bool {
  110. return n.nodeType == "DataNode"
  111. }
  112. func (n *NodeImpl) IsRack() bool {
  113. return n.nodeType == "Rack"
  114. }
  115. func (n *NodeImpl) IsDataCenter() bool {
  116. return n.nodeType == "DataCenter"
  117. }
  118. func (n *NodeImpl) String() string {
  119. if n.parent != nil {
  120. return n.parent.String() + ":" + string(n.id)
  121. }
  122. return string(n.id)
  123. }
  124. func (n *NodeImpl) Id() NodeId {
  125. return n.id
  126. }
  127. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  128. return n.diskUsages.getOrCreateDisk(diskType)
  129. }
  130. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  131. t := n.getOrCreateDisk(option.DiskType)
  132. freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
  133. ecShardCount := atomic.LoadInt64(&t.ecShardCount)
  134. if ecShardCount > 0 {
  135. freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
  136. }
  137. return freeVolumeSlotCount
  138. }
  139. func (n *NodeImpl) SetParent(node Node) {
  140. n.parent = node
  141. }
  142. func (n *NodeImpl) Children() (ret []Node) {
  143. n.RLock()
  144. defer n.RUnlock()
  145. for _, c := range n.children {
  146. ret = append(ret, c)
  147. }
  148. return ret
  149. }
  150. func (n *NodeImpl) Parent() Node {
  151. return n.parent
  152. }
  153. func (n *NodeImpl) GetValue() interface{} {
  154. return n.value
  155. }
  156. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  157. n.RLock()
  158. defer n.RUnlock()
  159. for _, node := range n.children {
  160. freeSpace := node.AvailableSpaceFor(option)
  161. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  162. if freeSpace <= 0 {
  163. continue
  164. }
  165. if r >= freeSpace {
  166. r -= freeSpace
  167. } else {
  168. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  169. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  170. return node.(*DataNode), nil
  171. }
  172. assignedNode, err = node.ReserveOneVolume(r, option)
  173. if err == nil {
  174. return
  175. }
  176. }
  177. }
  178. return nil, errors.New("No free volume slot found!")
  179. }
  180. func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
  181. for diskType, diskUsage := range deltaDiskUsages.usages {
  182. existingDisk := n.getOrCreateDisk(diskType)
  183. existingDisk.addDiskUsageCounts(diskUsage)
  184. }
  185. if n.parent != nil {
  186. n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
  187. }
  188. }
  189. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  190. if n.maxVolumeId < vid {
  191. n.maxVolumeId = vid
  192. if n.parent != nil {
  193. n.parent.UpAdjustMaxVolumeId(vid)
  194. }
  195. }
  196. }
  197. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  198. return n.maxVolumeId
  199. }
  200. func (n *NodeImpl) LinkChildNode(node Node) {
  201. n.Lock()
  202. defer n.Unlock()
  203. n.doLinkChildNode(node)
  204. }
  205. func (n *NodeImpl) doLinkChildNode(node Node) {
  206. if n.children[node.Id()] == nil {
  207. n.children[node.Id()] = node
  208. n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
  209. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  210. node.SetParent(n)
  211. glog.V(0).Infoln(n, "adds child", node.Id())
  212. }
  213. }
  214. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  215. n.Lock()
  216. defer n.Unlock()
  217. node := n.children[nodeId]
  218. if node != nil {
  219. node.SetParent(nil)
  220. delete(n.children, node.Id())
  221. n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
  222. glog.V(0).Infoln(n, "removes", node.Id())
  223. }
  224. }
  225. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64) {
  226. if n.IsRack() {
  227. for _, c := range n.Children() {
  228. dn := c.(*DataNode) //can not cast n to DataNode
  229. for _, v := range dn.GetVolumes() {
  230. if v.Size >= volumeSizeLimit {
  231. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  232. n.GetTopology().chanFullVolumes <- v
  233. } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
  234. n.GetTopology().chanCrowdedVolumes <- v
  235. }
  236. copyCount := v.ReplicaPlacement.GetCopyCount()
  237. if copyCount > 1 {
  238. if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
  239. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
  240. } else {
  241. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
  242. }
  243. }
  244. }
  245. }
  246. } else {
  247. for _, c := range n.Children() {
  248. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit, growThreshold)
  249. }
  250. }
  251. }
  252. func (n *NodeImpl) GetTopology() *Topology {
  253. var p Node
  254. p = n
  255. for p.Parent() != nil {
  256. p = p.Parent()
  257. }
  258. return p.GetValue().(*Topology)
  259. }