partition_consumer_mapping.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  5. "time"
  6. )
  7. type PartitionConsumerMapping struct {
  8. currentMapping *PartitionSlotToConsumerInstanceList
  9. prevMappings []*PartitionSlotToConsumerInstanceList
  10. }
  11. func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
  12. newVersion := time.Now().UnixNano()
  13. return &PartitionConsumerMapping{
  14. currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
  15. }
  16. }
  17. // Balance goal:
  18. // 1. max processing power utilization
  19. // 2. allow one consumer instance to be down unexpectedly
  20. // without affecting the processing power utilization
  21. func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) {
  22. if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
  23. return
  24. }
  25. newVersion := time.Now().UnixNano()
  26. newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
  27. var prevMapping *PartitionSlotToConsumerInstanceList
  28. if len(pcm.prevMappings) > 0 {
  29. prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
  30. } else {
  31. prevMapping = nil
  32. }
  33. newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping)
  34. if pcm.currentMapping != nil {
  35. pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
  36. if len(pcm.prevMappings) > 10 {
  37. pcm.prevMappings = pcm.prevMappings[1:]
  38. }
  39. }
  40. pcm.currentMapping = newMapping
  41. }
  42. func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
  43. // collect previous consumer instance ids
  44. prevConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  45. if prevMapping != nil {
  46. for _, prevPartitionSlot := range prevMapping.PartitionSlots {
  47. if prevPartitionSlot.AssignedInstanceId != "" {
  48. prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
  49. }
  50. }
  51. }
  52. // collect current consumer instance ids
  53. currConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  54. for _, consumerInstance := range consumerInstances {
  55. currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{}
  56. }
  57. // check deleted consumer instances
  58. deletedConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  59. for consumerInstanceId := range prevConsumerInstanceIds {
  60. if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
  61. deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
  62. }
  63. }
  64. // convert partition slots from list to a map
  65. prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
  66. if prevMapping != nil {
  67. for _, partitionSlot := range prevMapping.PartitionSlots {
  68. key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
  69. prevPartitionSlotMap[key] = partitionSlot
  70. }
  71. }
  72. // make a copy of old mapping, skipping the deleted consumer instances
  73. newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
  74. for _, partition := range partitions {
  75. newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
  76. RangeStart: partition.RangeStart,
  77. RangeStop: partition.RangeStop,
  78. UnixTimeNs: partition.UnixTimeNs,
  79. Broker: partition.AssignedBroker,
  80. FollowerBroker: partition.FollowerBroker,
  81. })
  82. }
  83. for _, newPartitionSlot := range newPartitionSlots {
  84. key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
  85. if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
  86. if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
  87. newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
  88. }
  89. }
  90. }
  91. // for all consumer instances, count the average number of partitions
  92. // that are assigned to them
  93. consumerInstancePartitionCount := make(map[ConsumerGroupInstanceId]int)
  94. for _, newPartitionSlot := range newPartitionSlots {
  95. if newPartitionSlot.AssignedInstanceId != "" {
  96. consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
  97. }
  98. }
  99. // average number of partitions that are assigned to each consumer instance
  100. averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances))
  101. // assign unassigned partition slots to consumer instances that is underloaded
  102. consumerInstanceIdsIndex := 0
  103. for _, newPartitionSlot := range newPartitionSlots {
  104. if newPartitionSlot.AssignedInstanceId == "" {
  105. for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- {
  106. consumerInstance := consumerInstances[consumerInstanceIdsIndex]
  107. if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad {
  108. newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId
  109. consumerInstancePartitionCount[consumerInstance.InstanceId]++
  110. consumerInstanceIdsIndex++
  111. if consumerInstanceIdsIndex >= len(consumerInstances) {
  112. consumerInstanceIdsIndex = 0
  113. }
  114. break
  115. } else {
  116. consumerInstanceIdsIndex++
  117. if consumerInstanceIdsIndex >= len(consumerInstances) {
  118. consumerInstanceIdsIndex = 0
  119. }
  120. }
  121. }
  122. }
  123. }
  124. return newPartitionSlots
  125. }