123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package sub_coordinator
- import (
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
- "time"
- )
- type PartitionConsumerMapping struct {
- currentMapping *PartitionSlotToConsumerInstanceList
- prevMappings []*PartitionSlotToConsumerInstanceList
- }
- func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
- newVersion := time.Now().UnixNano()
- return &PartitionConsumerMapping{
- currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
- }
- }
- // Balance goal:
- // 1. max processing power utilization
- // 2. allow one consumer instance to be down unexpectedly
- // without affecting the processing power utilization
- func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) {
- if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
- return
- }
- newVersion := time.Now().UnixNano()
- newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
- var prevMapping *PartitionSlotToConsumerInstanceList
- if len(pcm.prevMappings) > 0 {
- prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
- } else {
- prevMapping = nil
- }
- newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping)
- if pcm.currentMapping != nil {
- pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
- if len(pcm.prevMappings) > 10 {
- pcm.prevMappings = pcm.prevMappings[1:]
- }
- }
- pcm.currentMapping = newMapping
- }
- func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
- // collect previous consumer instance ids
- prevConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
- if prevMapping != nil {
- for _, prevPartitionSlot := range prevMapping.PartitionSlots {
- if prevPartitionSlot.AssignedInstanceId != "" {
- prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
- }
- }
- }
- // collect current consumer instance ids
- currConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
- for _, consumerInstance := range consumerInstances {
- currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{}
- }
- // check deleted consumer instances
- deletedConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
- for consumerInstanceId := range prevConsumerInstanceIds {
- if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
- deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
- }
- }
- // convert partition slots from list to a map
- prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
- if prevMapping != nil {
- for _, partitionSlot := range prevMapping.PartitionSlots {
- key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
- prevPartitionSlotMap[key] = partitionSlot
- }
- }
- // make a copy of old mapping, skipping the deleted consumer instances
- newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
- for _, partition := range partitions {
- newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- UnixTimeNs: partition.UnixTimeNs,
- Broker: partition.AssignedBroker,
- FollowerBroker: partition.FollowerBroker,
- })
- }
- for _, newPartitionSlot := range newPartitionSlots {
- key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
- if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
- if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
- newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
- }
- }
- }
- // for all consumer instances, count the average number of partitions
- // that are assigned to them
- consumerInstancePartitionCount := make(map[ConsumerGroupInstanceId]int)
- for _, newPartitionSlot := range newPartitionSlots {
- if newPartitionSlot.AssignedInstanceId != "" {
- consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
- }
- }
- // average number of partitions that are assigned to each consumer instance
- averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances))
- // assign unassigned partition slots to consumer instances that is underloaded
- consumerInstanceIdsIndex := 0
- for _, newPartitionSlot := range newPartitionSlots {
- if newPartitionSlot.AssignedInstanceId == "" {
- for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- {
- consumerInstance := consumerInstances[consumerInstanceIdsIndex]
- if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad {
- newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId
- consumerInstancePartitionCount[consumerInstance.InstanceId]++
- consumerInstanceIdsIndex++
- if consumerInstanceIdsIndex >= len(consumerInstances) {
- consumerInstanceIdsIndex = 0
- }
- break
- } else {
- consumerInstanceIdsIndex++
- if consumerInstanceIdsIndex >= len(consumerInstances) {
- consumerInstanceIdsIndex = 0
- }
- }
- }
- }
- }
- return newPartitionSlots
- }
|