node.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package topology
  2. import (
  3. "errors"
  4. "math/rand"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/chrislusf/seaweedfs/weed/util/log"
  9. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. type NodeId string
  13. type Node interface {
  14. Id() NodeId
  15. String() string
  16. FreeSpace() int64
  17. ReserveOneVolume(r int64) (*DataNode, error)
  18. UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
  19. UpAdjustVolumeCountDelta(volumeCountDelta int64)
  20. UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
  21. UpAdjustEcShardCountDelta(ecShardCountDelta int64)
  22. UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
  23. UpAdjustMaxVolumeId(vid needle.VolumeId)
  24. GetVolumeCount() int64
  25. GetEcShardCount() int64
  26. GetActiveVolumeCount() int64
  27. GetRemoteVolumeCount() int64
  28. GetMaxVolumeCount() int64
  29. GetMaxVolumeId() needle.VolumeId
  30. SetParent(Node)
  31. LinkChildNode(node Node)
  32. UnlinkChildNode(nodeId NodeId)
  33. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
  34. IsDataNode() bool
  35. IsRack() bool
  36. IsDataCenter() bool
  37. Children() []Node
  38. Parent() Node
  39. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  40. }
  41. type NodeImpl struct {
  42. volumeCount int64
  43. remoteVolumeCount int64
  44. activeVolumeCount int64
  45. ecShardCount int64
  46. maxVolumeCount int64
  47. id NodeId
  48. parent Node
  49. sync.RWMutex // lock children
  50. children map[NodeId]Node
  51. maxVolumeId needle.VolumeId
  52. //for rack, data center, topology
  53. nodeType string
  54. value interface{}
  55. }
  56. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  57. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  58. var totalWeights int64
  59. var errs []string
  60. n.RLock()
  61. candidates := make([]Node, 0, len(n.children))
  62. candidatesWeights := make([]int64, 0, len(n.children))
  63. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  64. for _, node := range n.children {
  65. if node.FreeSpace() <= 0 {
  66. continue
  67. }
  68. totalWeights += node.FreeSpace()
  69. candidates = append(candidates, node)
  70. candidatesWeights = append(candidatesWeights, node.FreeSpace())
  71. }
  72. n.RUnlock()
  73. if len(candidates) < numberOfNodes {
  74. log.Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  75. return nil, nil, errors.New("No enough data node found!")
  76. }
  77. //pick nodes randomly by weights, the node picked earlier has higher final weights
  78. sortedCandidates := make([]Node, 0, len(candidates))
  79. for i := 0; i < len(candidates); i++ {
  80. weightsInterval := rand.Int63n(totalWeights)
  81. lastWeights := int64(0)
  82. for k, weights := range candidatesWeights {
  83. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  84. sortedCandidates = append(sortedCandidates, candidates[k])
  85. candidatesWeights[k] = 0
  86. totalWeights -= weights
  87. break
  88. }
  89. lastWeights += weights
  90. }
  91. }
  92. restNodes = make([]Node, 0, numberOfNodes-1)
  93. ret := false
  94. n.RLock()
  95. for k, node := range sortedCandidates {
  96. if err := filterFirstNodeFn(node); err == nil {
  97. firstNode = node
  98. if k >= numberOfNodes-1 {
  99. restNodes = sortedCandidates[:numberOfNodes-1]
  100. } else {
  101. restNodes = append(restNodes, sortedCandidates[:k]...)
  102. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  103. }
  104. ret = true
  105. break
  106. } else {
  107. errs = append(errs, string(node.Id())+":"+err.Error())
  108. }
  109. }
  110. n.RUnlock()
  111. if !ret {
  112. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  113. }
  114. return
  115. }
  116. func (n *NodeImpl) IsDataNode() bool {
  117. return n.nodeType == "DataNode"
  118. }
  119. func (n *NodeImpl) IsRack() bool {
  120. return n.nodeType == "Rack"
  121. }
  122. func (n *NodeImpl) IsDataCenter() bool {
  123. return n.nodeType == "DataCenter"
  124. }
  125. func (n *NodeImpl) String() string {
  126. if n.parent != nil {
  127. return n.parent.String() + ":" + string(n.id)
  128. }
  129. return string(n.id)
  130. }
  131. func (n *NodeImpl) Id() NodeId {
  132. return n.id
  133. }
  134. func (n *NodeImpl) FreeSpace() int64 {
  135. freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
  136. if n.ecShardCount > 0 {
  137. freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
  138. }
  139. return freeVolumeSlotCount
  140. }
  141. func (n *NodeImpl) SetParent(node Node) {
  142. n.parent = node
  143. }
  144. func (n *NodeImpl) Children() (ret []Node) {
  145. n.RLock()
  146. defer n.RUnlock()
  147. for _, c := range n.children {
  148. ret = append(ret, c)
  149. }
  150. return ret
  151. }
  152. func (n *NodeImpl) Parent() Node {
  153. return n.parent
  154. }
  155. func (n *NodeImpl) GetValue() interface{} {
  156. return n.value
  157. }
  158. func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
  159. n.RLock()
  160. defer n.RUnlock()
  161. for _, node := range n.children {
  162. freeSpace := node.FreeSpace()
  163. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  164. if freeSpace <= 0 {
  165. continue
  166. }
  167. if r >= freeSpace {
  168. r -= freeSpace
  169. } else {
  170. if node.IsDataNode() && node.FreeSpace() > 0 {
  171. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  172. return node.(*DataNode), nil
  173. }
  174. assignedNode, err = node.ReserveOneVolume(r)
  175. if err == nil {
  176. return
  177. }
  178. }
  179. }
  180. return nil, errors.New("No free volume slot found!")
  181. }
  182. func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
  183. if maxVolumeCountDelta == 0 {
  184. return
  185. }
  186. atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
  187. if n.parent != nil {
  188. n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
  189. }
  190. }
  191. func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
  192. if volumeCountDelta == 0 {
  193. return
  194. }
  195. atomic.AddInt64(&n.volumeCount, volumeCountDelta)
  196. if n.parent != nil {
  197. n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
  198. }
  199. }
  200. func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
  201. if remoteVolumeCountDelta == 0 {
  202. return
  203. }
  204. atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
  205. if n.parent != nil {
  206. n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
  207. }
  208. }
  209. func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
  210. if ecShardCountDelta == 0 {
  211. return
  212. }
  213. atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
  214. if n.parent != nil {
  215. n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
  216. }
  217. }
  218. func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
  219. if activeVolumeCountDelta == 0 {
  220. return
  221. }
  222. atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
  223. if n.parent != nil {
  224. n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
  225. }
  226. }
  227. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  228. if n.maxVolumeId < vid {
  229. n.maxVolumeId = vid
  230. if n.parent != nil {
  231. n.parent.UpAdjustMaxVolumeId(vid)
  232. }
  233. }
  234. }
  235. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  236. return n.maxVolumeId
  237. }
  238. func (n *NodeImpl) GetVolumeCount() int64 {
  239. return n.volumeCount
  240. }
  241. func (n *NodeImpl) GetEcShardCount() int64 {
  242. return n.ecShardCount
  243. }
  244. func (n *NodeImpl) GetRemoteVolumeCount() int64 {
  245. return n.remoteVolumeCount
  246. }
  247. func (n *NodeImpl) GetActiveVolumeCount() int64 {
  248. return n.activeVolumeCount
  249. }
  250. func (n *NodeImpl) GetMaxVolumeCount() int64 {
  251. return n.maxVolumeCount
  252. }
  253. func (n *NodeImpl) LinkChildNode(node Node) {
  254. n.Lock()
  255. defer n.Unlock()
  256. if n.children[node.Id()] == nil {
  257. n.children[node.Id()] = node
  258. n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
  259. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  260. n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
  261. n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
  262. n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
  263. n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
  264. node.SetParent(n)
  265. log.Infoln(n, "adds child", node.Id())
  266. }
  267. }
  268. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  269. n.Lock()
  270. defer n.Unlock()
  271. node := n.children[nodeId]
  272. if node != nil {
  273. node.SetParent(nil)
  274. delete(n.children, node.Id())
  275. n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
  276. n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
  277. n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
  278. n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
  279. n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
  280. log.Infoln(n, "removes", node.Id())
  281. }
  282. }
  283. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
  284. if n.IsRack() {
  285. for _, c := range n.Children() {
  286. dn := c.(*DataNode) //can not cast n to DataNode
  287. for _, v := range dn.GetVolumes() {
  288. if uint64(v.Size) >= volumeSizeLimit {
  289. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  290. n.GetTopology().chanFullVolumes <- v
  291. }
  292. }
  293. }
  294. } else {
  295. for _, c := range n.Children() {
  296. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
  297. }
  298. }
  299. }
  300. func (n *NodeImpl) GetTopology() *Topology {
  301. var p Node
  302. p = n
  303. for p.Parent() != nil {
  304. p = p.Parent()
  305. }
  306. return p.GetValue().(*Topology)
  307. }