123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- package cluster
- import (
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "math"
- "sync"
- "time"
- )
- const (
- MasterType = "master"
- FilerType = "filer"
- BrokerType = "broker"
- )
- type ClusterNode struct {
- Address pb.ServerAddress
- Version string
- counter int
- createdTs time.Time
- }
- type Leaders struct {
- leaders [3]pb.ServerAddress
- }
- type Cluster struct {
- filers map[pb.ServerAddress]*ClusterNode
- filersLock sync.RWMutex
- filerLeaders *Leaders
- brokers map[pb.ServerAddress]*ClusterNode
- brokersLock sync.RWMutex
- }
- func NewCluster() *Cluster {
- return &Cluster{
- filers: make(map[pb.ServerAddress]*ClusterNode),
- filerLeaders: &Leaders{},
- brokers: make(map[pb.ServerAddress]*ClusterNode),
- }
- }
- func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
- switch nodeType {
- case FilerType:
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
- if existingNode, found := cluster.filers[address]; found {
- existingNode.counter++
- return nil
- }
- cluster.filers[address] = &ClusterNode{
- Address: address,
- Version: version,
- counter: 1,
- createdTs: time.Now(),
- }
- return cluster.ensureFilerLeaders(true, nodeType, address)
- case BrokerType:
- cluster.brokersLock.Lock()
- defer cluster.brokersLock.Unlock()
- if existingNode, found := cluster.brokers[address]; found {
- existingNode.counter++
- return nil
- }
- cluster.brokers[address] = &ClusterNode{
- Address: address,
- Version: version,
- counter: 1,
- createdTs: time.Now(),
- }
- return []*master_pb.KeepConnectedResponse{
- {
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsAdd: true,
- },
- },
- }
- case MasterType:
- }
- return nil
- }
- func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
- switch nodeType {
- case FilerType:
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
- if existingNode, found := cluster.filers[address]; !found {
- return nil
- } else {
- existingNode.counter--
- if existingNode.counter <= 0 {
- delete(cluster.filers, address)
- return cluster.ensureFilerLeaders(false, nodeType, address)
- }
- }
- case BrokerType:
- cluster.brokersLock.Lock()
- defer cluster.brokersLock.Unlock()
- if existingNode, found := cluster.brokers[address]; !found {
- return nil
- } else {
- existingNode.counter--
- if existingNode.counter <= 0 {
- delete(cluster.brokers, address)
- return []*master_pb.KeepConnectedResponse{
- {
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsAdd: false,
- },
- },
- }
- }
- }
- case MasterType:
- }
- return nil
- }
- func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
- switch nodeType {
- case FilerType:
- cluster.filersLock.RLock()
- defer cluster.filersLock.RUnlock()
- for _, node := range cluster.filers {
- nodes = append(nodes, node)
- }
- case BrokerType:
- cluster.brokersLock.RLock()
- defer cluster.brokersLock.RUnlock()
- for _, node := range cluster.brokers {
- nodes = append(nodes, node)
- }
- case MasterType:
- }
- return
- }
- func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool {
- return cluster.filerLeaders.isOneLeader(address)
- }
- func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
- if isAdd {
- if cluster.filerLeaders.addLeaderIfVacant(address) {
- // has added the address as one leader
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsLeader: true,
- IsAdd: true,
- },
- })
- } else {
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsLeader: false,
- IsAdd: true,
- },
- })
- }
- } else {
- if cluster.filerLeaders.removeLeaderIfExists(address) {
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsLeader: true,
- IsAdd: false,
- },
- })
- // pick the freshest one, since it is less likely to go away
- var shortestDuration int64 = math.MaxInt64
- now := time.Now()
- var candidateAddress pb.ServerAddress
- for _, node := range cluster.filers {
- if cluster.filerLeaders.isOneLeader(node.Address) {
- continue
- }
- duration := now.Sub(node.createdTs).Nanoseconds()
- if duration < shortestDuration {
- shortestDuration = duration
- candidateAddress = node.Address
- }
- }
- if candidateAddress != "" {
- cluster.filerLeaders.addLeaderIfVacant(candidateAddress)
- // added a new leader
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(candidateAddress),
- IsLeader: true,
- IsAdd: true,
- },
- })
- }
- } else {
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsLeader: false,
- IsAdd: false,
- },
- })
- }
- }
- return
- }
- func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
- if leaders.isOneLeader(address) {
- return
- }
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == "" {
- leaders.leaders[i] = address
- hasChanged = true
- return
- }
- }
- return
- }
- func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
- if !leaders.isOneLeader(address) {
- return
- }
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == address {
- leaders.leaders[i] = ""
- hasChanged = true
- return
- }
- }
- return
- }
- func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == address {
- return true
- }
- }
- return false
- }
- func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] != "" {
- addresses = append(addresses, leaders.leaders[i])
- }
- }
- return
- }
|