node.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package topology
  2. import (
  3. "errors"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "github.com/chrislusf/seaweedfs/weed/storage/types"
  8. "math/rand"
  9. "strings"
  10. "sync"
  11. )
  12. type NodeId string
  13. type Node interface {
  14. Id() NodeId
  15. String() string
  16. AvailableSpaceFor(option *VolumeGrowOption) int64
  17. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  18. UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages)
  19. UpAdjustMaxVolumeId(vid needle.VolumeId)
  20. GetDiskUsages() *DiskUsages
  21. GetMaxVolumeId() needle.VolumeId
  22. SetParent(Node)
  23. LinkChildNode(node Node)
  24. UnlinkChildNode(nodeId NodeId)
  25. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
  26. IsDataNode() bool
  27. IsRack() bool
  28. IsDataCenter() bool
  29. Children() []Node
  30. Parent() Node
  31. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  32. }
  33. type NodeImpl struct {
  34. diskUsages *DiskUsages
  35. id NodeId
  36. parent Node
  37. sync.RWMutex // lock children
  38. children map[NodeId]Node
  39. maxVolumeId needle.VolumeId
  40. //for rack, data center, topology
  41. nodeType string
  42. value interface{}
  43. }
  44. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  45. return n.diskUsages
  46. }
  47. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  48. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  49. var totalWeights int64
  50. var errs []string
  51. n.RLock()
  52. candidates := make([]Node, 0, len(n.children))
  53. candidatesWeights := make([]int64, 0, len(n.children))
  54. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  55. for _, node := range n.children {
  56. if node.AvailableSpaceFor(option) <= 0 {
  57. continue
  58. }
  59. totalWeights += node.AvailableSpaceFor(option)
  60. candidates = append(candidates, node)
  61. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  62. }
  63. n.RUnlock()
  64. if len(candidates) < numberOfNodes {
  65. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  66. return nil, nil, errors.New("No enough data node found!")
  67. }
  68. //pick nodes randomly by weights, the node picked earlier has higher final weights
  69. sortedCandidates := make([]Node, 0, len(candidates))
  70. for i := 0; i < len(candidates); i++ {
  71. weightsInterval := rand.Int63n(totalWeights)
  72. lastWeights := int64(0)
  73. for k, weights := range candidatesWeights {
  74. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  75. sortedCandidates = append(sortedCandidates, candidates[k])
  76. candidatesWeights[k] = 0
  77. totalWeights -= weights
  78. break
  79. }
  80. lastWeights += weights
  81. }
  82. }
  83. restNodes = make([]Node, 0, numberOfNodes-1)
  84. ret := false
  85. n.RLock()
  86. for k, node := range sortedCandidates {
  87. if err := filterFirstNodeFn(node); err == nil {
  88. firstNode = node
  89. if k >= numberOfNodes-1 {
  90. restNodes = sortedCandidates[:numberOfNodes-1]
  91. } else {
  92. restNodes = append(restNodes, sortedCandidates[:k]...)
  93. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  94. }
  95. ret = true
  96. break
  97. } else {
  98. errs = append(errs, string(node.Id())+":"+err.Error())
  99. }
  100. }
  101. n.RUnlock()
  102. if !ret {
  103. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  104. }
  105. return
  106. }
  107. func (n *NodeImpl) IsDataNode() bool {
  108. return n.nodeType == "DataNode"
  109. }
  110. func (n *NodeImpl) IsRack() bool {
  111. return n.nodeType == "Rack"
  112. }
  113. func (n *NodeImpl) IsDataCenter() bool {
  114. return n.nodeType == "DataCenter"
  115. }
  116. func (n *NodeImpl) String() string {
  117. if n.parent != nil {
  118. return n.parent.String() + ":" + string(n.id)
  119. }
  120. return string(n.id)
  121. }
  122. func (n *NodeImpl) Id() NodeId {
  123. return n.id
  124. }
  125. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  126. return n.diskUsages.getOrCreateDisk(diskType)
  127. }
  128. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  129. t := n.getOrCreateDisk(option.DiskType)
  130. freeVolumeSlotCount := t.maxVolumeCount + t.remoteVolumeCount - t.volumeCount
  131. if t.ecShardCount > 0 {
  132. freeVolumeSlotCount = freeVolumeSlotCount - t.ecShardCount/erasure_coding.DataShardsCount - 1
  133. }
  134. return freeVolumeSlotCount
  135. }
  136. func (n *NodeImpl) SetParent(node Node) {
  137. n.parent = node
  138. }
  139. func (n *NodeImpl) Children() (ret []Node) {
  140. n.RLock()
  141. defer n.RUnlock()
  142. for _, c := range n.children {
  143. ret = append(ret, c)
  144. }
  145. return ret
  146. }
  147. func (n *NodeImpl) Parent() Node {
  148. return n.parent
  149. }
  150. func (n *NodeImpl) GetValue() interface{} {
  151. return n.value
  152. }
  153. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  154. n.RLock()
  155. defer n.RUnlock()
  156. for _, node := range n.children {
  157. freeSpace := node.AvailableSpaceFor(option)
  158. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  159. if freeSpace <= 0 {
  160. continue
  161. }
  162. if r >= freeSpace {
  163. r -= freeSpace
  164. } else {
  165. if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
  166. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  167. return node.(*DataNode), nil
  168. }
  169. assignedNode, err = node.ReserveOneVolume(r, option)
  170. if err == nil {
  171. return
  172. }
  173. }
  174. }
  175. return nil, errors.New("No free volume slot found!")
  176. }
  177. func (n *NodeImpl) UpAdjustDiskUsageDelta(deltaDiskUsages *DiskUsages) { //can be negative
  178. for diskType, diskUsage := range deltaDiskUsages.usages {
  179. existingDisk := n.getOrCreateDisk(diskType)
  180. existingDisk.addDiskUsageCounts(diskUsage)
  181. }
  182. if n.parent != nil {
  183. n.parent.UpAdjustDiskUsageDelta(deltaDiskUsages)
  184. }
  185. }
  186. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  187. if n.maxVolumeId < vid {
  188. n.maxVolumeId = vid
  189. if n.parent != nil {
  190. n.parent.UpAdjustMaxVolumeId(vid)
  191. }
  192. }
  193. }
  194. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  195. return n.maxVolumeId
  196. }
  197. func (n *NodeImpl) LinkChildNode(node Node) {
  198. n.Lock()
  199. defer n.Unlock()
  200. n.doLinkChildNode(node)
  201. }
  202. func (n *NodeImpl) doLinkChildNode(node Node) {
  203. if n.children[node.Id()] == nil {
  204. n.children[node.Id()] = node
  205. n.UpAdjustDiskUsageDelta(node.GetDiskUsages())
  206. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  207. node.SetParent(n)
  208. glog.V(0).Infoln(n, "adds child", node.Id())
  209. }
  210. }
  211. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  212. n.Lock()
  213. defer n.Unlock()
  214. node := n.children[nodeId]
  215. if node != nil {
  216. node.SetParent(nil)
  217. delete(n.children, node.Id())
  218. n.UpAdjustDiskUsageDelta(node.GetDiskUsages().negative())
  219. glog.V(0).Infoln(n, "removes", node.Id())
  220. }
  221. }
  222. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
  223. if n.IsRack() {
  224. for _, c := range n.Children() {
  225. dn := c.(*DataNode) //can not cast n to DataNode
  226. for _, v := range dn.GetVolumes() {
  227. if uint64(v.Size) >= volumeSizeLimit {
  228. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  229. n.GetTopology().chanFullVolumes <- v
  230. }
  231. }
  232. }
  233. } else {
  234. for _, c := range n.Children() {
  235. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
  236. }
  237. }
  238. }
  239. func (n *NodeImpl) GetTopology() *Topology {
  240. var p Node
  241. p = n
  242. for p.Parent() != nil {
  243. p = p.Parent()
  244. }
  245. return p.GetValue().(*Topology)
  246. }