123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package pub_balancer
- import (
- cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "math/rand"
- "modernc.org/mathutil"
- "sort"
- )
- func (balancer *Balancer) RepairTopics() []BalanceAction {
- action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
- return []BalanceAction{action}
- }
- type TopicPartitionInfo struct {
- Broker string
- }
- // RepairMissingTopicPartitions check the stats of all brokers,
- // and repair the missing topic partitions on the brokers.
- func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
- // find all topic partitions
- topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
- for brokerStatsItem := range brokers.IterBuffered() {
- broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
- for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
- topicPartitionStat := topicPartitionStatsItem.Val
- topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
- if !found {
- topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
- topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
- }
- tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
- if !found {
- tpi = &TopicPartitionInfo{}
- topicPartitionToInfo[topicPartitionStat.Partition] = tpi
- }
- tpi.Broker = broker
- }
- }
- // collect all brokers as candidates
- candidates := make([]string, 0, brokers.Count())
- for brokerStatsItem := range brokers.IterBuffered() {
- candidates = append(candidates, brokerStatsItem.Key)
- }
- // find the missing topic partitions
- for t, topicPartitionToInfo := range topicToTopicPartitions {
- missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
- for _, partition := range missingPartitions {
- actions = append(actions, BalanceActionCreate{
- TopicPartition: topic.TopicPartition{
- Topic: t,
- Partition: partition,
- },
- TargetBroker: candidates[rand.Intn(len(candidates))],
- })
- }
- }
- return actions
- }
- func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
- // find the missing topic partitions
- var partitions []topic.Partition
- for partition := range info {
- partitions = append(partitions, partition)
- }
- return findMissingPartitions(partitions, MaxPartitionCount)
- }
- // findMissingPartitions find the missing partitions
- func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
- // sort the partitions by range start
- sort.Slice(partitions, func(i, j int) bool {
- return partitions[i].RangeStart < partitions[j].RangeStart
- })
- // calculate the average partition size
- var covered int32
- for _, partition := range partitions {
- covered += partition.RangeStop - partition.RangeStart
- }
- averagePartitionSize := covered / int32(len(partitions))
- // find the missing partitions
- var coveredWatermark int32
- i := 0
- for i < len(partitions) {
- partition := partitions[i]
- if partition.RangeStart > coveredWatermark {
- upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
- missingPartitions = append(missingPartitions, topic.Partition{
- RangeStart: coveredWatermark,
- RangeStop: upperBound,
- RingSize: ringSize,
- })
- coveredWatermark = upperBound
- if coveredWatermark == partition.RangeStop {
- i++
- }
- } else {
- coveredWatermark = partition.RangeStop
- i++
- }
- }
- for coveredWatermark < ringSize {
- upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
- missingPartitions = append(missingPartitions, topic.Partition{
- RangeStart: coveredWatermark,
- RangeStop: upperBound,
- RingSize: ringSize,
- })
- coveredWatermark = upperBound
- }
- return missingPartitions
- }
|