package sub_coordinator import ( "errors" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "sync" "time" ) /* Market is a data structure that keeps track of the state of the consumer group instances and the partitions. When rebalancing, the market will try to balance the load of the partitions among the consumer group instances. For each loop, the market will: * If a consumer group instance has more partitions than the average, it will unassign some partitions. * If a consumer group instance has less partitions than the average, it will assign some partitions. Trigger rebalance when: * A new consumer group instance is added * Some partitions are unassigned from a consumer group instance. If multiple reblance requests are received, after a certain period, the market will only process the latest request. However, if the number of unassigned partition is increased to exactly the total number of partitions, and total partitions are less than or equal to the sum of the max partition count of all consumer group instances, the market will process the request immediately. This is to ensure a partition can be migrated to another consumer group instance as soon as possible. Emit these adjustments to the subscriber coordinator: * Assign a partition to a consumer group instance * Unassign a partition from a consumer group instance Because the adjustment is sent to the subscriber coordinator, the market will keep track of the inflight adjustments. The subscriber coordinator will send back the response to the market when the adjustment is processed. If the adjustment is older than a certain time(inflightAdjustmentTTL), it would be considered expired. Otherwise, the adjustment is considered inflight, so it would be used when calculating the load. Later features: * A consumer group instance is not keeping up with the load. Since a coordinator, and thus the market, may be restarted or moved to another node, the market should be able to recover the state from the subscriber coordinator. The subscriber coordinator should be able to send the current state of the consumer group instances and the partitions to the market. */ type PartitionSlot struct { Partition topic.Partition AssignedTo *ConsumerGroupInstance // Track the consumer assigned to this partition slot } type Adjustment struct { isAssign bool partition topic.Partition consumer ConsumerGroupInstanceId ts time.Time } type Market struct { mu sync.Mutex partitions map[topic.Partition]*PartitionSlot consumerInstances map[ConsumerGroupInstanceId]*ConsumerGroupInstance AdjustmentChan chan *Adjustment inflightAdjustments []*Adjustment inflightAdjustmentTTL time.Duration lastBalancedTime time.Time stopChan chan struct{} balanceRequestChan chan struct{} hasBalanceRequest bool } func NewMarket(partitions []topic.Partition, inflightAdjustmentTTL time.Duration) *Market { partitionMap := make(map[topic.Partition]*PartitionSlot) for _, partition := range partitions { partitionMap[partition] = &PartitionSlot{ Partition: partition, } } m := &Market{ partitions: partitionMap, consumerInstances: make(map[ConsumerGroupInstanceId]*ConsumerGroupInstance), AdjustmentChan: make(chan *Adjustment, 100), inflightAdjustmentTTL: inflightAdjustmentTTL, stopChan: make(chan struct{}), balanceRequestChan: make(chan struct{}), } m.lastBalancedTime = time.Now() go m.loopBalanceLoad() return m } func (m *Market) ShutdownMarket() { close(m.stopChan) close(m.AdjustmentChan) } func (m *Market) AddConsumerInstance(consumer *ConsumerGroupInstance) error { m.mu.Lock() defer m.mu.Unlock() if _, exists := m.consumerInstances[consumer.InstanceId]; exists { return errors.New("consumer instance already exists") } m.consumerInstances[consumer.InstanceId] = consumer m.balanceRequestChan <- struct{}{} return nil } func (m *Market) RemoveConsumerInstance(consumerId ConsumerGroupInstanceId) error { m.mu.Lock() defer m.mu.Unlock() consumer, exists := m.consumerInstances[consumerId] if !exists { return nil } delete(m.consumerInstances, consumerId) for _, partition := range consumer.AssignedPartitions { if partitionSlot, exists := m.partitions[partition]; exists { partitionSlot.AssignedTo = nil } } m.balanceRequestChan <- struct{}{} return nil } func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) { var bestConsumer *ConsumerGroupInstance var minLoad = int(^uint(0) >> 1) // Max int value inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int) for _, adjustment := range m.inflightAdjustments { if adjustment.isAssign { inflightConsumerAdjustments[adjustment.consumer]++ } else { inflightConsumerAdjustments[adjustment.consumer]-- } } for _, consumer := range m.consumerInstances { consumerLoad := len(consumer.AssignedPartitions) if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists { consumerLoad += inflightAdjustments } // fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) if consumerLoad < int(consumer.MaxPartitionCount) { if consumerLoad < minLoad { bestConsumer = consumer minLoad = consumerLoad // fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad) } } } if bestConsumer != nil { // change consumer assigned partitions later when the adjustment is confirmed adjustment := &Adjustment{ isAssign: true, partition: partition.Partition, consumer: bestConsumer.InstanceId, ts: time.Now(), } m.AdjustmentChan <- adjustment m.inflightAdjustments = append(m.inflightAdjustments, adjustment) m.lastBalancedTime = adjustment.ts } } func (m *Market) loopBalanceLoad() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: if m.hasBalanceRequest { m.hasBalanceRequest = false inflightAdjustments := make([]*Adjustment, 0, len(m.inflightAdjustments)) for _, adjustment := range m.inflightAdjustments { if adjustment.ts.Add(m.inflightAdjustmentTTL).After(time.Now()) { inflightAdjustments = append(inflightAdjustments, adjustment) } } m.inflightAdjustments = inflightAdjustments m.doBalanceLoad() // println("Balance load completed.") m.Status() } case <-m.balanceRequestChan: m.hasBalanceRequest = true case <-m.stopChan: return } } } // doBalanceLoad will balance the load of the partitions among the consumer group instances. // It will try to unassign partitions from the consumer group instances that have more partitions than the average. // It will try to assign partitions to the consumer group instances that have less partitions than the average. func (m *Market) doBalanceLoad() { if len(m.consumerInstances) == 0 { return } // find the average load for all consumers averageLoad := m.findAverageLoad() // find the consumers with the higher load than average if m.adjustBusyConsumers(averageLoad) { return } // find partitions with no consumer assigned m.adjustUnassignedPartitions() } func (m *Market) findAverageLoad() (averageLoad float32) { var totalLoad int for _, consumer := range m.consumerInstances { totalLoad += len(consumer.AssignedPartitions) } for _, adjustment := range m.inflightAdjustments { if adjustment.isAssign { totalLoad++ } else { totalLoad-- } } averageLoad = float32(totalLoad) / float32(len(m.consumerInstances)) return } func (m *Market) adjustBusyConsumers(averageLoad float32) (hasAdjustments bool) { inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int) for _, adjustment := range m.inflightAdjustments { if adjustment.isAssign { inflightConsumerAdjustments[adjustment.consumer]++ } else { inflightConsumerAdjustments[adjustment.consumer]-- } } for _, consumer := range m.consumerInstances { consumerLoad := len(consumer.AssignedPartitions) if inflightAdjustment, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists { consumerLoad += inflightAdjustment } delta := int(float32(consumerLoad) - averageLoad) if delta <= 0 { continue } adjustTime := time.Now() for i := 0; i < delta; i++ { adjustment := &Adjustment{ isAssign: false, partition: consumer.AssignedPartitions[i], consumer: consumer.InstanceId, ts: adjustTime, } m.AdjustmentChan <- adjustment m.inflightAdjustments = append(m.inflightAdjustments, adjustment) m.lastBalancedTime = adjustment.ts } hasAdjustments = true } return } func (m *Market) adjustUnassignedPartitions() { inflightPartitionAdjustments := make(map[topic.Partition]bool) for _, adjustment := range m.inflightAdjustments { inflightPartitionAdjustments[adjustment.partition] = true } for _, partitionSlot := range m.partitions { if partitionSlot.AssignedTo == nil { if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists { continue } // fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition) m.assignPartitionToConsumer(partitionSlot) } } } func (m *Market) ConfirmAdjustment(adjustment *Adjustment) { if adjustment.isAssign { m.confirmAssignPartition(adjustment.partition, adjustment.consumer) } else { m.unassignPartitionSlot(adjustment.partition) } glog.V(1).Infof("ConfirmAdjustment %+v", adjustment) m.Status() } func (m *Market) unassignPartitionSlot(partition topic.Partition) { m.mu.Lock() defer m.mu.Unlock() partitionSlot, exists := m.partitions[partition] if !exists { glog.V(0).Infof("partition %+v slot is not tracked", partition) return } if partitionSlot.AssignedTo == nil { glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition) return } consumer := partitionSlot.AssignedTo for i, p := range consumer.AssignedPartitions { if p == partition { consumer.AssignedPartitions = append(consumer.AssignedPartitions[:i], consumer.AssignedPartitions[i+1:]...) partitionSlot.AssignedTo = nil m.balanceRequestChan <- struct{}{} return } } glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition) } func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInstanceId ConsumerGroupInstanceId) { m.mu.Lock() defer m.mu.Unlock() partitionSlot, exists := m.partitions[partition] if !exists { glog.V(0).Infof("partition %+v slot is not tracked", partition) return } if partitionSlot.AssignedTo != nil { glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId) return } consumerInstance, exists := m.consumerInstances[consumerInstanceId] if !exists { glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId) return } partitionSlot.AssignedTo = consumerInstance consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition) } func (m *Market) Status() { m.mu.Lock() defer m.mu.Unlock() glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances)) for partition, slot := range m.partitions { if slot.AssignedTo == nil { glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition) } else { glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId) } } for _, consumer := range m.consumerInstances { glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions)) } }