balancer.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. const (
  8. MaxPartitionCount = 8 * 9 * 5 * 7 //2520
  9. LockBrokerBalancer = "broker_balancer"
  10. )
  11. // Balancer collects stats from all brokers.
  12. //
  13. // When publishers wants to create topics, it picks brokers to assign the topic partitions.
  14. // When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
  15. //
  16. // When a partition needs to be split or merged, or a partition needs to be moved to another broker,
  17. // the balancer will let the broker tell the consumer instance to stop processing the partition.
  18. // The existing consumer instance will flush the internal state, and then stop processing.
  19. // Then the balancer will tell the brokers to start sending new messages in the new/moved partition to the consumer instances.
  20. //
  21. // Failover to standby consumer instances:
  22. //
  23. // A consumer group can have min and max number of consumer instances.
  24. // For consumer instances joined after the max number, they will be in standby mode.
  25. //
  26. // When a consumer instance is down, the broker will notice this and inform the balancer.
  27. // The balancer will then tell the broker to send the partition to another standby consumer instance.
  28. type Balancer struct {
  29. Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
  30. // Collected from all brokers when they connect to the broker leader
  31. TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
  32. OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
  33. OnAddBroker func(broker string, brokerStats *BrokerStats)
  34. OnRemoveBroker func(broker string, brokerStats *BrokerStats)
  35. }
  36. func NewBalancer() *Balancer {
  37. return &Balancer{
  38. Brokers: cmap.New[*BrokerStats](),
  39. TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
  40. }
  41. }
  42. func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
  43. var found bool
  44. brokerStats, found = balancer.Brokers.Get(broker)
  45. if !found {
  46. brokerStats = NewBrokerStats()
  47. if !balancer.Brokers.SetIfAbsent(broker, brokerStats) {
  48. brokerStats, _ = balancer.Brokers.Get(broker)
  49. }
  50. }
  51. balancer.onPubAddBroker(broker, brokerStats)
  52. balancer.OnAddBroker(broker, brokerStats)
  53. return brokerStats
  54. }
  55. func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
  56. balancer.Brokers.Remove(broker)
  57. // update TopicToBrokers
  58. for _, topic := range stats.Topics {
  59. partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
  60. if !found {
  61. continue
  62. }
  63. pickedBroker := pickBrokers(balancer.Brokers, 1)
  64. if len(pickedBroker) == 0 {
  65. partitionSlotToBrokerList.RemoveBroker(broker)
  66. } else {
  67. partitionSlotToBrokerList.ReplaceBroker(broker, pickedBroker[0])
  68. }
  69. }
  70. balancer.onPubRemoveBroker(broker, stats)
  71. balancer.OnRemoveBroker(broker, stats)
  72. }
  73. func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
  74. brokerStats.UpdateStats(receivedStats)
  75. // update TopicToBrokers
  76. for _, topicPartitionStats := range receivedStats.Stats {
  77. topicKey := topic.FromPbTopic(topicPartitionStats.Topic).String()
  78. partition := topicPartitionStats.Partition
  79. partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topicKey)
  80. if !found {
  81. partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount)
  82. if !balancer.TopicToBrokers.SetIfAbsent(topicKey, partitionSlotToBrokerList) {
  83. partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topicKey)
  84. }
  85. }
  86. partitionSlotToBrokerList.AddBroker(partition, broker)
  87. }
  88. }
  89. // OnPubAddBroker is called when a broker is added for a publisher coordinator
  90. func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
  91. }
  92. // OnPubRemoveBroker is called when a broker is removed for a publisher coordinator
  93. func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
  94. }