node.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package topology
  2. import (
  3. "errors"
  4. "math/rand"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. type NodeId string
  13. type Node interface {
  14. Id() NodeId
  15. String() string
  16. FreeSpace() int64
  17. ReserveOneVolume(r int64) (*DataNode, error)
  18. UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
  19. UpAdjustVolumeCountDelta(volumeCountDelta int64)
  20. UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64)
  21. UpAdjustEcShardCountDelta(ecShardCountDelta int64)
  22. UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
  23. UpAdjustMaxVolumeId(vid needle.VolumeId)
  24. GetVolumeCount() int64
  25. GetEcShardCount() int64
  26. GetActiveVolumeCount() int64
  27. GetRemoteVolumeCount() int64
  28. GetMaxVolumeCount() int64
  29. GetMaxVolumeId() needle.VolumeId
  30. SetParent(Node)
  31. LinkChildNode(node Node)
  32. UnlinkChildNode(nodeId NodeId)
  33. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
  34. IsDataNode() bool
  35. IsRack() bool
  36. IsDataCenter() bool
  37. Children() []Node
  38. Parent() Node
  39. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  40. }
  41. type NodeImpl struct {
  42. volumeCount int64
  43. remoteVolumeCount int64
  44. activeVolumeCount int64
  45. ecShardCount int64
  46. maxVolumeCount int64
  47. id NodeId
  48. parent Node
  49. sync.RWMutex // lock children
  50. children map[NodeId]Node
  51. maxVolumeId needle.VolumeId
  52. //for rack, data center, topology
  53. nodeType string
  54. value interface{}
  55. }
  56. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  57. func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  58. candidates := make([]Node, 0, len(n.children))
  59. var errs []string
  60. n.RLock()
  61. for _, node := range n.children {
  62. if err := filterFirstNodeFn(node); err == nil {
  63. candidates = append(candidates, node)
  64. } else {
  65. errs = append(errs, string(node.Id())+":"+err.Error())
  66. }
  67. }
  68. n.RUnlock()
  69. if len(candidates) == 0 {
  70. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  71. }
  72. firstNode = candidates[rand.Intn(len(candidates))]
  73. glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
  74. restNodes = make([]Node, numberOfNodes-1)
  75. candidates = candidates[:0]
  76. n.RLock()
  77. for _, node := range n.children {
  78. if node.Id() == firstNode.Id() {
  79. continue
  80. }
  81. if node.FreeSpace() <= 0 {
  82. continue
  83. }
  84. glog.V(2).Infoln("select rest node candidate:", node.Id())
  85. candidates = append(candidates, node)
  86. }
  87. n.RUnlock()
  88. glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
  89. ret := len(restNodes) == 0
  90. for k, node := range candidates {
  91. if k < len(restNodes) {
  92. restNodes[k] = node
  93. if k == len(restNodes)-1 {
  94. ret = true
  95. }
  96. } else {
  97. r := rand.Intn(k + 1)
  98. if r < len(restNodes) {
  99. restNodes[r] = node
  100. }
  101. }
  102. }
  103. if !ret {
  104. glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
  105. err = errors.New("No enough data node found!")
  106. }
  107. return
  108. }
  109. func (n *NodeImpl) IsDataNode() bool {
  110. return n.nodeType == "DataNode"
  111. }
  112. func (n *NodeImpl) IsRack() bool {
  113. return n.nodeType == "Rack"
  114. }
  115. func (n *NodeImpl) IsDataCenter() bool {
  116. return n.nodeType == "DataCenter"
  117. }
  118. func (n *NodeImpl) String() string {
  119. if n.parent != nil {
  120. return n.parent.String() + ":" + string(n.id)
  121. }
  122. return string(n.id)
  123. }
  124. func (n *NodeImpl) Id() NodeId {
  125. return n.id
  126. }
  127. func (n *NodeImpl) FreeSpace() int64 {
  128. freeVolumeSlotCount := n.maxVolumeCount + n.remoteVolumeCount - n.volumeCount
  129. if n.ecShardCount > 0 {
  130. freeVolumeSlotCount = freeVolumeSlotCount - n.ecShardCount/erasure_coding.DataShardsCount - 1
  131. }
  132. return freeVolumeSlotCount
  133. }
  134. func (n *NodeImpl) SetParent(node Node) {
  135. n.parent = node
  136. }
  137. func (n *NodeImpl) Children() (ret []Node) {
  138. n.RLock()
  139. defer n.RUnlock()
  140. for _, c := range n.children {
  141. ret = append(ret, c)
  142. }
  143. return ret
  144. }
  145. func (n *NodeImpl) Parent() Node {
  146. return n.parent
  147. }
  148. func (n *NodeImpl) GetValue() interface{} {
  149. return n.value
  150. }
  151. func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
  152. n.RLock()
  153. defer n.RUnlock()
  154. for _, node := range n.children {
  155. freeSpace := node.FreeSpace()
  156. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  157. if freeSpace <= 0 {
  158. continue
  159. }
  160. if r >= freeSpace {
  161. r -= freeSpace
  162. } else {
  163. if node.IsDataNode() && node.FreeSpace() > 0 {
  164. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  165. return node.(*DataNode), nil
  166. }
  167. assignedNode, err = node.ReserveOneVolume(r)
  168. if err == nil {
  169. return
  170. }
  171. }
  172. }
  173. return nil, errors.New("No free volume slot found!")
  174. }
  175. func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
  176. atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
  177. if n.parent != nil {
  178. n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
  179. }
  180. }
  181. func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
  182. atomic.AddInt64(&n.volumeCount, volumeCountDelta)
  183. if n.parent != nil {
  184. n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
  185. }
  186. }
  187. func (n *NodeImpl) UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta int64) { //can be negative
  188. atomic.AddInt64(&n.remoteVolumeCount, remoteVolumeCountDelta)
  189. if n.parent != nil {
  190. n.parent.UpAdjustRemoteVolumeCountDelta(remoteVolumeCountDelta)
  191. }
  192. }
  193. func (n *NodeImpl) UpAdjustEcShardCountDelta(ecShardCountDelta int64) { //can be negative
  194. atomic.AddInt64(&n.ecShardCount, ecShardCountDelta)
  195. if n.parent != nil {
  196. n.parent.UpAdjustEcShardCountDelta(ecShardCountDelta)
  197. }
  198. }
  199. func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
  200. atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
  201. if n.parent != nil {
  202. n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
  203. }
  204. }
  205. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  206. if n.maxVolumeId < vid {
  207. n.maxVolumeId = vid
  208. if n.parent != nil {
  209. n.parent.UpAdjustMaxVolumeId(vid)
  210. }
  211. }
  212. }
  213. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  214. return n.maxVolumeId
  215. }
  216. func (n *NodeImpl) GetVolumeCount() int64 {
  217. return n.volumeCount
  218. }
  219. func (n *NodeImpl) GetEcShardCount() int64 {
  220. return n.ecShardCount
  221. }
  222. func (n *NodeImpl) GetRemoteVolumeCount() int64 {
  223. return n.remoteVolumeCount
  224. }
  225. func (n *NodeImpl) GetActiveVolumeCount() int64 {
  226. return n.activeVolumeCount
  227. }
  228. func (n *NodeImpl) GetMaxVolumeCount() int64 {
  229. return n.maxVolumeCount
  230. }
  231. func (n *NodeImpl) LinkChildNode(node Node) {
  232. n.Lock()
  233. defer n.Unlock()
  234. if n.children[node.Id()] == nil {
  235. n.children[node.Id()] = node
  236. n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
  237. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  238. n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
  239. n.UpAdjustRemoteVolumeCountDelta(node.GetRemoteVolumeCount())
  240. n.UpAdjustEcShardCountDelta(node.GetEcShardCount())
  241. n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
  242. node.SetParent(n)
  243. glog.V(0).Infoln(n, "adds child", node.Id())
  244. }
  245. }
  246. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  247. n.Lock()
  248. defer n.Unlock()
  249. node := n.children[nodeId]
  250. if node != nil {
  251. node.SetParent(nil)
  252. delete(n.children, node.Id())
  253. n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
  254. n.UpAdjustRemoteVolumeCountDelta(-node.GetRemoteVolumeCount())
  255. n.UpAdjustEcShardCountDelta(-node.GetEcShardCount())
  256. n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
  257. n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
  258. glog.V(0).Infoln(n, "removes", node.Id())
  259. }
  260. }
  261. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
  262. if n.IsRack() {
  263. for _, c := range n.Children() {
  264. dn := c.(*DataNode) //can not cast n to DataNode
  265. for _, v := range dn.GetVolumes() {
  266. if uint64(v.Size) >= volumeSizeLimit {
  267. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  268. n.GetTopology().chanFullVolumes <- v
  269. }
  270. }
  271. }
  272. } else {
  273. for _, c := range n.Children() {
  274. c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
  275. }
  276. }
  277. }
  278. func (n *NodeImpl) GetTopology() *Topology {
  279. var p Node
  280. p = n
  281. for p.Parent() != nil {
  282. p = p.Parent()
  283. }
  284. return p.GetValue().(*Topology)
  285. }