partition_consumer_mapping.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  5. "time"
  6. )
  7. type PartitionConsumerMapping struct {
  8. currentMapping *PartitionSlotToConsumerInstanceList
  9. prevMappings []*PartitionSlotToConsumerInstanceList
  10. }
  11. // Balance goal:
  12. // 1. max processing power utilization
  13. // 2. allow one consumer instance to be down unexpectedly
  14. // without affecting the processing power utilization
  15. func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstances []*ConsumerGroupInstance) {
  16. if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
  17. return
  18. }
  19. newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, time.Now())
  20. var prevMapping *PartitionSlotToConsumerInstanceList
  21. if len(pcm.prevMappings) > 0 {
  22. prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
  23. } else {
  24. prevMapping = nil
  25. }
  26. newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstances, prevMapping)
  27. if pcm.currentMapping != nil {
  28. pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
  29. if len(pcm.prevMappings) > 10 {
  30. pcm.prevMappings = pcm.prevMappings[1:]
  31. }
  32. }
  33. pcm.currentMapping = newMapping
  34. }
  35. func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstances []*ConsumerGroupInstance, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
  36. // collect previous consumer instance ids
  37. prevConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  38. if prevMapping != nil {
  39. for _, prevPartitionSlot := range prevMapping.PartitionSlots {
  40. if prevPartitionSlot.AssignedInstanceId != "" {
  41. prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
  42. }
  43. }
  44. }
  45. // collect current consumer instance ids
  46. currConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  47. for _, consumerInstance := range consumerInstances {
  48. currConsumerInstanceIds[consumerInstance.InstanceId] = struct{}{}
  49. }
  50. // check deleted consumer instances
  51. deletedConsumerInstanceIds := make(map[ConsumerGroupInstanceId]struct{})
  52. for consumerInstanceId := range prevConsumerInstanceIds {
  53. if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
  54. deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
  55. }
  56. }
  57. // convert partition slots from list to a map
  58. prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
  59. if prevMapping != nil {
  60. for _, partitionSlot := range prevMapping.PartitionSlots {
  61. key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
  62. prevPartitionSlotMap[key] = partitionSlot
  63. }
  64. }
  65. // make a copy of old mapping, skipping the deleted consumer instances
  66. newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
  67. for _, partition := range partitions {
  68. newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
  69. RangeStart: partition.RangeStart,
  70. RangeStop: partition.RangeStop,
  71. UnixTimeNs: partition.UnixTimeNs,
  72. Broker: partition.AssignedBroker,
  73. FollowerBroker: partition.FollowerBroker,
  74. })
  75. }
  76. for _, newPartitionSlot := range newPartitionSlots {
  77. key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
  78. if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
  79. if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
  80. newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
  81. }
  82. }
  83. }
  84. // for all consumer instances, count the average number of partitions
  85. // that are assigned to them
  86. consumerInstancePartitionCount := make(map[ConsumerGroupInstanceId]int)
  87. for _, newPartitionSlot := range newPartitionSlots {
  88. if newPartitionSlot.AssignedInstanceId != "" {
  89. consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
  90. }
  91. }
  92. // average number of partitions that are assigned to each consumer instance
  93. averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstances))
  94. // assign unassigned partition slots to consumer instances that is underloaded
  95. consumerInstanceIdsIndex := 0
  96. for _, newPartitionSlot := range newPartitionSlots {
  97. if newPartitionSlot.AssignedInstanceId == "" {
  98. for avoidDeadLoop := len(consumerInstances); avoidDeadLoop > 0; avoidDeadLoop-- {
  99. consumerInstance := consumerInstances[consumerInstanceIdsIndex]
  100. if float32(consumerInstancePartitionCount[consumerInstance.InstanceId]) < averageConsumerInstanceLoad {
  101. newPartitionSlot.AssignedInstanceId = consumerInstance.InstanceId
  102. consumerInstancePartitionCount[consumerInstance.InstanceId]++
  103. consumerInstanceIdsIndex++
  104. if consumerInstanceIdsIndex >= len(consumerInstances) {
  105. consumerInstanceIdsIndex = 0
  106. }
  107. break
  108. } else {
  109. consumerInstanceIdsIndex++
  110. if consumerInstanceIdsIndex >= len(consumerInstances) {
  111. consumerInstanceIdsIndex = 0
  112. }
  113. }
  114. }
  115. }
  116. }
  117. return newPartitionSlots
  118. }