cluster.go 9.4 KB


  1. package cluster
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. MasterType = "master"
  11. VolumeServerType = "volumeServer"
  12. FilerType = "filer"
  13. BrokerType = "broker"
  14. )
  15. type FilerGroupName string
  16. type DataCenter string
  17. type Rack string
  18. type Leaders struct {
  19. leaders [3]pb.ServerAddress
  20. }
  21. type ClusterNode struct {
  22. Address pb.ServerAddress
  23. Version string
  24. counter int
  25. CreatedTs time.Time
  26. DataCenter DataCenter
  27. Rack Rack
  28. }
  29. type GroupMembers struct {
  30. members map[pb.ServerAddress]*ClusterNode
  31. leaders *Leaders
  32. }
  33. type ClusterNodeGroups struct {
  34. groupMembers map[FilerGroupName]*GroupMembers
  35. sync.RWMutex
  36. }
  37. type Cluster struct {
  38. filerGroups *ClusterNodeGroups
  39. brokerGroups *ClusterNodeGroups
  40. }
  41. func newClusterNodeGroups() *ClusterNodeGroups {
  42. return &ClusterNodeGroups{
  43. groupMembers: map[FilerGroupName]*GroupMembers{},
  44. }
  45. }
  46. func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
  47. members, found := g.groupMembers[filerGroup]
  48. if !found && createIfNotFound {
  49. members = &GroupMembers{
  50. members: make(map[pb.ServerAddress]*ClusterNode),
  51. leaders: &Leaders{},
  52. }
  53. g.groupMembers[filerGroup] = members
  54. }
  55. return members
  56. }
  57. func (m *GroupMembers) addMember(dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) *ClusterNode {
  58. if existingNode, found := m.members[address]; found {
  59. existingNode.counter++
  60. return nil
  61. }
  62. t := &ClusterNode{
  63. Address: address,
  64. Version: version,
  65. counter: 1,
  66. CreatedTs: time.Now(),
  67. DataCenter: dataCenter,
  68. Rack: rack,
  69. }
  70. m.members[address] = t
  71. return t
  72. }
  73. func (m *GroupMembers) removeMember(address pb.ServerAddress) bool {
  74. if existingNode, found := m.members[address]; !found {
  75. return false
  76. } else {
  77. existingNode.counter--
  78. if existingNode.counter <= 0 {
  79. delete(m.members, address)
  80. return true
  81. }
  82. }
  83. return false
  84. }
  85. func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  86. g.Lock()
  87. defer g.Unlock()
  88. m := g.getGroupMembers(filerGroup, true)
  89. if t := m.addMember(dataCenter, rack, address, version); t != nil {
  90. return ensureGroupLeaders(m, true, filerGroup, nodeType, address)
  91. }
  92. return nil
  93. }
  94. func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  95. g.Lock()
  96. defer g.Unlock()
  97. m := g.getGroupMembers(filerGroup, false)
  98. if m == nil {
  99. return nil
  100. }
  101. if m.removeMember(address) {
  102. return ensureGroupLeaders(m, false, filerGroup, nodeType, address)
  103. }
  104. return nil
  105. }
  106. func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) {
  107. g.Lock()
  108. defer g.Unlock()
  109. m := g.getGroupMembers(filerGroup, false)
  110. if m == nil {
  111. return nil
  112. }
  113. for _, node := range m.members {
  114. nodes = append(nodes, node)
  115. }
  116. return
  117. }
  118. func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.ServerAddress) bool {
  119. g.Lock()
  120. defer g.Unlock()
  121. m := g.getGroupMembers(filerGroup, false)
  122. if m == nil {
  123. return false
  124. }
  125. return m.leaders.isOneLeader(address)
  126. }
  127. func (g *ClusterNodeGroups) ListClusterNodeLeaders(filerGroup FilerGroupName) (nodes []pb.ServerAddress) {
  128. g.Lock()
  129. defer g.Unlock()
  130. m := g.getGroupMembers(filerGroup, false)
  131. if m == nil {
  132. return nil
  133. }
  134. return m.leaders.GetLeaders()
  135. }
  136. func NewCluster() *Cluster {
  137. return &Cluster{
  138. filerGroups: newClusterNodeGroups(),
  139. brokerGroups: newClusterNodeGroups(),
  140. }
  141. }
  142. func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers {
  143. switch nodeType {
  144. case FilerType:
  145. return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound)
  146. case BrokerType:
  147. return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound)
  148. }
  149. return nil
  150. }
  151. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  152. filerGroup := FilerGroupName(ns)
  153. switch nodeType {
  154. case FilerType:
  155. return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  156. case BrokerType:
  157. return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  158. case MasterType:
  159. return []*master_pb.KeepConnectedResponse{
  160. {
  161. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  162. NodeType: nodeType,
  163. Address: string(address),
  164. IsAdd: true,
  165. },
  166. },
  167. }
  168. }
  169. return nil
  170. }
  171. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  172. filerGroup := FilerGroupName(ns)
  173. switch nodeType {
  174. case FilerType:
  175. return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  176. case BrokerType:
  177. return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  178. case MasterType:
  179. return []*master_pb.KeepConnectedResponse{
  180. {
  181. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  182. NodeType: nodeType,
  183. Address: string(address),
  184. IsAdd: false,
  185. },
  186. },
  187. }
  188. }
  189. return nil
  190. }
  191. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) {
  192. switch nodeType {
  193. case FilerType:
  194. return cluster.filerGroups.ListClusterNode(filerGroup)
  195. case BrokerType:
  196. return cluster.brokerGroups.ListClusterNode(filerGroup)
  197. case MasterType:
  198. }
  199. return
  200. }
  201. func (cluster *Cluster) ListClusterNodeLeaders(filerGroup FilerGroupName, nodeType string) (nodes []pb.ServerAddress) {
  202. switch nodeType {
  203. case FilerType:
  204. return cluster.filerGroups.ListClusterNodeLeaders(filerGroup)
  205. case BrokerType:
  206. return cluster.brokerGroups.ListClusterNodeLeaders(filerGroup)
  207. case MasterType:
  208. }
  209. return
  210. }
  211. func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool {
  212. switch nodeType {
  213. case FilerType:
  214. return cluster.filerGroups.IsOneLeader(filerGroup, address)
  215. case BrokerType:
  216. return cluster.brokerGroups.IsOneLeader(filerGroup, address)
  217. case MasterType:
  218. }
  219. return false
  220. }
  221. func ensureGroupLeaders(m *GroupMembers, isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  222. if isAdd {
  223. if m.leaders.addLeaderIfVacant(address) {
  224. // has added the address as one leader
  225. result = append(result, &master_pb.KeepConnectedResponse{
  226. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  227. FilerGroup: string(filerGroup),
  228. NodeType: nodeType,
  229. Address: string(address),
  230. IsLeader: true,
  231. IsAdd: true,
  232. },
  233. })
  234. } else {
  235. result = append(result, &master_pb.KeepConnectedResponse{
  236. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  237. FilerGroup: string(filerGroup),
  238. NodeType: nodeType,
  239. Address: string(address),
  240. IsLeader: false,
  241. IsAdd: true,
  242. },
  243. })
  244. }
  245. } else {
  246. if m.leaders.removeLeaderIfExists(address) {
  247. result = append(result, &master_pb.KeepConnectedResponse{
  248. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  249. FilerGroup: string(filerGroup),
  250. NodeType: nodeType,
  251. Address: string(address),
  252. IsLeader: true,
  253. IsAdd: false,
  254. },
  255. })
  256. // pick the freshest one, since it is less likely to go away
  257. var shortestDuration int64 = math.MaxInt64
  258. now := time.Now()
  259. var candidateAddress pb.ServerAddress
  260. for _, node := range m.members {
  261. if m.leaders.isOneLeader(node.Address) {
  262. continue
  263. }
  264. duration := now.Sub(node.CreatedTs).Nanoseconds()
  265. if duration < shortestDuration {
  266. shortestDuration = duration
  267. candidateAddress = node.Address
  268. }
  269. }
  270. if candidateAddress != "" {
  271. m.leaders.addLeaderIfVacant(candidateAddress)
  272. // added a new leader
  273. result = append(result, &master_pb.KeepConnectedResponse{
  274. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  275. NodeType: nodeType,
  276. Address: string(candidateAddress),
  277. IsLeader: true,
  278. IsAdd: true,
  279. },
  280. })
  281. }
  282. } else {
  283. result = append(result, &master_pb.KeepConnectedResponse{
  284. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  285. FilerGroup: string(filerGroup),
  286. NodeType: nodeType,
  287. Address: string(address),
  288. IsLeader: false,
  289. IsAdd: false,
  290. },
  291. })
  292. }
  293. }
  294. return
  295. }
  296. func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
  297. if leaders.isOneLeader(address) {
  298. return
  299. }
  300. for i := 0; i < len(leaders.leaders); i++ {
  301. if leaders.leaders[i] == "" {
  302. leaders.leaders[i] = address
  303. hasChanged = true
  304. return
  305. }
  306. }
  307. return
  308. }
  309. func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
  310. if !leaders.isOneLeader(address) {
  311. return
  312. }
  313. for i := 0; i < len(leaders.leaders); i++ {
  314. if leaders.leaders[i] == address {
  315. leaders.leaders[i] = ""
  316. hasChanged = true
  317. return
  318. }
  319. }
  320. return
  321. }
  322. func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
  323. for i := 0; i < len(leaders.leaders); i++ {
  324. if leaders.leaders[i] == address {
  325. return true
  326. }
  327. }
  328. return false
  329. }
  330. func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
  331. for i := 0; i < len(leaders.leaders); i++ {
  332. if leaders.leaders[i] != "" {
  333. addresses = append(addresses, leaders.leaders[i])
  334. }
  335. }
  336. return
  337. }