cluster.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package cluster
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. MasterType = "master"
  10. VolumeServerType = "volumeServer"
  11. FilerType = "filer"
  12. BrokerType = "broker"
  13. )
  14. type FilerGroupName string
  15. type DataCenter string
  16. type Rack string
  17. type ClusterNode struct {
  18. Address pb.ServerAddress
  19. Version string
  20. counter int
  21. CreatedTs time.Time
  22. DataCenter DataCenter
  23. Rack Rack
  24. }
  25. type ClusterNodeGroups struct {
  26. groupMembers map[FilerGroupName]*GroupMembers
  27. sync.RWMutex
  28. }
  29. type Cluster struct {
  30. filerGroups *ClusterNodeGroups
  31. brokerGroups *ClusterNodeGroups
  32. }
  33. func newClusterNodeGroups() *ClusterNodeGroups {
  34. return &ClusterNodeGroups{
  35. groupMembers: map[FilerGroupName]*GroupMembers{},
  36. }
  37. }
  38. func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers {
  39. members, found := g.groupMembers[filerGroup]
  40. if !found && createIfNotFound {
  41. members = newGroupMembers()
  42. g.groupMembers[filerGroup] = members
  43. }
  44. return members
  45. }
  46. func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  47. g.Lock()
  48. defer g.Unlock()
  49. m := g.getGroupMembers(filerGroup, true)
  50. if t := m.addMember(dataCenter, rack, address, version); t != nil {
  51. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  52. }
  53. return nil
  54. }
  55. func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  56. g.Lock()
  57. defer g.Unlock()
  58. m := g.getGroupMembers(filerGroup, false)
  59. if m == nil {
  60. return nil
  61. }
  62. if m.removeMember(address) {
  63. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  64. }
  65. return nil
  66. }
  67. func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) {
  68. g.Lock()
  69. defer g.Unlock()
  70. m := g.getGroupMembers(filerGroup, false)
  71. if m == nil {
  72. return nil
  73. }
  74. for _, node := range m.members {
  75. nodes = append(nodes, node)
  76. }
  77. return
  78. }
  79. func NewCluster() *Cluster {
  80. return &Cluster{
  81. filerGroups: newClusterNodeGroups(),
  82. brokerGroups: newClusterNodeGroups(),
  83. }
  84. }
  85. func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers {
  86. switch nodeType {
  87. case FilerType:
  88. return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound)
  89. case BrokerType:
  90. return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound)
  91. }
  92. return nil
  93. }
  94. func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
  95. filerGroup := FilerGroupName(ns)
  96. switch nodeType {
  97. case FilerType:
  98. return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  99. case BrokerType:
  100. return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version)
  101. case MasterType:
  102. return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
  103. }
  104. return nil
  105. }
  106. func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
  107. filerGroup := FilerGroupName(ns)
  108. switch nodeType {
  109. case FilerType:
  110. return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  111. case BrokerType:
  112. return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address)
  113. case MasterType:
  114. return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
  115. }
  116. return nil
  117. }
  118. func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) {
  119. switch nodeType {
  120. case FilerType:
  121. return cluster.filerGroups.ListClusterNode(filerGroup)
  122. case BrokerType:
  123. return cluster.brokerGroups.ListClusterNode(filerGroup)
  124. case MasterType:
  125. }
  126. return
  127. }
  128. func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
  129. result = append(result, &master_pb.KeepConnectedResponse{
  130. ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
  131. FilerGroup: string(filerGroup),
  132. NodeType: nodeType,
  133. Address: string(address),
  134. IsAdd: isAdd,
  135. },
  136. })
  137. return
  138. }