cluster.go 6.4 KB


  1. package cluster
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. FilerType = "filer"
  12. BrokerType = "broker"
  13. )
  14. type ClusterNode struct {
  15. Address pb.ServerAddress
  16. Version string
  17. counter int
  18. createdTs time.Time
  19. }
  20. type Leaders struct {
  21. leaders [3]pb.ServerAddress
  22. }
  23. type Cluster struct {
  24. filers map[pb.ServerAddress]*ClusterNode
  25. filersLock sync.RWMutex
  26. filerLeaders *Leaders
  27. brokers map[pb.ServerAddress]*ClusterNode
  28. brokersLock sync.RWMutex
  29. }
  30. func NewCluster() *Cluster {
  31. return &Cluster{
  32. filers: make(map[pb.ServerAddress]*ClusterNode),
  33. filerLeaders: &Leaders{},
  34. brokers: make(map[pb.ServerAddress]*ClusterNode),
  35. }
  36. }
  37. func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  38. switch nodeType {
  39. case FilerType:
  40. cluster.filersLock.Lock()
  41. defer cluster.filersLock.Unlock()
  42. if existingNode, found := cluster.filers[address]; found {
  43. existingNode.counter++
  44. return nil
  45. }
  46. cluster.filers[address] = &ClusterNode{
  47. Address: address,
  48. Version: version,
  49. counter: 1,
  50. createdTs: time.Now(),
  51. }
  52. return cluster.ensureFilerLeaders(true, nodeType, address)
  53. case BrokerType:
  54. cluster.brokersLock.Lock()
  55. defer cluster.brokersLock.Unlock()
  56. if existingNode, found := cluster.brokers[address]; found {
  57. existingNode.counter++
  58. return nil
  59. }
  60. cluster.brokers[address] = &ClusterNode{
  61. Address: address,
  62. Version: version,
  63. counter: 1,
  64. createdTs: time.Now(),
  65. }
  66. return []*master_pb.KeepConnectedResponse{
  67. {
  68. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  69. NodeType: nodeType,
  70. Address: string(address),
  71. IsAdd: true,
  72. },
  73. },
  74. }
  75. case MasterType:
  76. }
  77. return nil
  78. }
  79. func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  80. switch nodeType {
  81. case FilerType:
  82. cluster.filersLock.Lock()
  83. defer cluster.filersLock.Unlock()
  84. if existingNode, found := cluster.filers[address]; !found {
  85. return nil
  86. } else {
  87. existingNode.counter--
  88. if existingNode.counter <= 0 {
  89. delete(cluster.filers, address)
  90. return cluster.ensureFilerLeaders(false, nodeType, address)
  91. }
  92. }
  93. case BrokerType:
  94. cluster.brokersLock.Lock()
  95. defer cluster.brokersLock.Unlock()
  96. if existingNode, found := cluster.brokers[address]; !found {
  97. return nil
  98. } else {
  99. existingNode.counter--
  100. if existingNode.counter <= 0 {
  101. delete(cluster.brokers, address)
  102. return []*master_pb.KeepConnectedResponse{
  103. {
  104. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  105. NodeType: nodeType,
  106. Address: string(address),
  107. IsAdd: false,
  108. },
  109. },
  110. }
  111. }
  112. }
  113. case MasterType:
  114. }
  115. return nil
  116. }
  117. func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
  118. switch nodeType {
  119. case FilerType:
  120. cluster.filersLock.RLock()
  121. defer cluster.filersLock.RUnlock()
  122. for _, node := range cluster.filers {
  123. nodes = append(nodes, node)
  124. }
  125. case BrokerType:
  126. cluster.brokersLock.RLock()
  127. defer cluster.brokersLock.RUnlock()
  128. for _, node := range cluster.brokers {
  129. nodes = append(nodes, node)
  130. }
  131. case MasterType:
  132. }
  133. return
  134. }
  135. func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
  136. return cluster.filerLeaders.isOneLeader(address)
  137. }
  138. func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  139. if isAdd {
  140. if cluster.filerLeaders.addLeaderIfVacant(address) {
  141. // has added the address as one leader
  142. result = append(result, &master_pb.KeepConnectedResponse{
  143. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  144. NodeType: nodeType,
  145. Address: string(address),
  146. IsLeader: true,
  147. IsAdd: true,
  148. },
  149. })
  150. } else {
  151. result = append(result, &master_pb.KeepConnectedResponse{
  152. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  153. NodeType: nodeType,
  154. Address: string(address),
  155. IsLeader: false,
  156. IsAdd: true,
  157. },
  158. })
  159. }
  160. } else {
  161. if cluster.filerLeaders.removeLeaderIfExists(address) {
  162. result = append(result, &master_pb.KeepConnectedResponse{
  163. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  164. NodeType: nodeType,
  165. Address: string(address),
  166. IsLeader: true,
  167. IsAdd: false,
  168. },
  169. })
  170. // pick the freshest one, since it is less likely to go away
  171. var shortestDuration int64 = math.MaxInt64
  172. now := time.Now()
  173. var candidateAddress pb.ServerAddress
  174. for _, node := range cluster.filers {
  175. if cluster.filerLeaders.isOneLeader(node.Address) {
  176. continue
  177. }
  178. duration := now.Sub(node.createdTs).Nanoseconds()
  179. if duration < shortestDuration {
  180. shortestDuration = duration
  181. candidateAddress = node.Address
  182. }
  183. }
  184. if candidateAddress != "" {
  185. cluster.filerLeaders.addLeaderIfVacant(candidateAddress)
  186. // added a new leader
  187. result = append(result, &master_pb.KeepConnectedResponse{
  188. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  189. NodeType: nodeType,
  190. Address: string(candidateAddress),
  191. IsLeader: true,
  192. IsAdd: true,
  193. },
  194. })
  195. }
  196. } else {
  197. result = append(result, &master_pb.KeepConnectedResponse{
  198. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  199. NodeType: nodeType,
  200. Address: string(address),
  201. IsLeader: false,
  202. IsAdd: false,
  203. },
  204. })
  205. }
  206. }
  207. return
  208. }
  209. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  210. if leaders.isOneLeader(address) {
  211. return
  212. }
  213. for i := 0; i < len(leaders.leaders); i++ {
  214. if leaders.leaders[i] == "" {
  215. leaders.leaders[i] = address
  216. hasChanged = true
  217. return
  218. }
  219. }
  220. return
  221. }
  222. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  223. if !leaders.isOneLeader(address) {
  224. return
  225. }
  226. for i := 0; i < len(leaders.leaders); i++ {
  227. if leaders.leaders[i] == address {
  228. leaders.leaders[i] = ""
  229. hasChanged = true
  230. return
  231. }
  232. }
  233. return
  234. }
  235. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  236. for i := 0; i < len(leaders.leaders); i++ {
  237. if leaders.leaders[i] == address {
  238. return true
  239. }
  240. }
  241. return false
  242. }
  243. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  244. for i := 0; i < len(leaders.leaders); i++ {
  245. if leaders.leaders[i] != "" {
  246. addresses = append(addresses, leaders.leaders[i])
  247. }
  248. }
  249. return
  250. }