partition_consumer_mapping.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  5. "time"
  6. )
  7. type PartitionConsumerMapping struct {
  8. currentMapping *PartitionSlotToConsumerInstanceList
  9. prevMappings []*PartitionSlotToConsumerInstanceList
  10. }
  11. func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
  12. newVersion := time.Now().UnixNano()
  13. return &PartitionConsumerMapping{
  14. currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
  15. }
  16. }
  17. // Balance goal:
  18. // 1. max processing power utilization
  19. // 2. allow one consumer instance to be down unexpectedly
  20. // without affecting the processing power utilization
  21. func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, consumerInstanceIds []string) {
  22. if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstanceIds) == 0 {
  23. return
  24. }
  25. newVersion := time.Now().UnixNano()
  26. newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
  27. var prevMapping *PartitionSlotToConsumerInstanceList
  28. if len(pcm.prevMappings) > 0 {
  29. prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
  30. } else {
  31. prevMapping = nil
  32. }
  33. newMapping.PartitionSlots = doBalanceSticky(partitionSlotToBrokerList.PartitionSlots, consumerInstanceIds, prevMapping)
  34. if pcm.currentMapping != nil {
  35. pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
  36. if len(pcm.prevMappings) > 10 {
  37. pcm.prevMappings = pcm.prevMappings[1:]
  38. }
  39. }
  40. pcm.currentMapping = newMapping
  41. }
  42. func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
  43. // collect previous consumer instance ids
  44. prevConsumerInstanceIds := make(map[string]struct{})
  45. if prevMapping != nil {
  46. for _, prevPartitionSlot := range prevMapping.PartitionSlots {
  47. if prevPartitionSlot.AssignedInstanceId != "" {
  48. prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
  49. }
  50. }
  51. }
  52. // collect current consumer instance ids
  53. currConsumerInstanceIds := make(map[string]struct{})
  54. for _, consumerInstanceId := range consumerInstanceIds {
  55. currConsumerInstanceIds[consumerInstanceId] = struct{}{}
  56. }
  57. // check deleted consumer instances
  58. deletedConsumerInstanceIds := make(map[string]struct{})
  59. for consumerInstanceId := range prevConsumerInstanceIds {
  60. if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
  61. deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
  62. }
  63. }
  64. // convert partition slots from list to a map
  65. prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
  66. if prevMapping != nil {
  67. for _, partitionSlot := range prevMapping.PartitionSlots {
  68. key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
  69. prevPartitionSlotMap[key] = partitionSlot
  70. }
  71. }
  72. // make a copy of old mapping, skipping the deleted consumer instances
  73. newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
  74. for _, partition := range partitions {
  75. newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
  76. RangeStart: partition.RangeStart,
  77. RangeStop: partition.RangeStop,
  78. UnixTimeNs: partition.UnixTimeNs,
  79. Broker: partition.AssignedBroker,
  80. })
  81. }
  82. for _, newPartitionSlot := range newPartitionSlots {
  83. key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
  84. if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
  85. if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
  86. newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
  87. }
  88. }
  89. }
  90. // for all consumer instances, count the average number of partitions
  91. // that are assigned to them
  92. consumerInstancePartitionCount := make(map[string]int)
  93. for _, newPartitionSlot := range newPartitionSlots {
  94. if newPartitionSlot.AssignedInstanceId != "" {
  95. consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
  96. }
  97. }
  98. // average number of partitions that are assigned to each consumer instance
  99. averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds))
  100. // assign unassigned partition slots to consumer instances that is underloaded
  101. consumerInstanceIdsIndex := 0
  102. for _, newPartitionSlot := range newPartitionSlots {
  103. if newPartitionSlot.AssignedInstanceId == "" {
  104. for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- {
  105. consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex]
  106. if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad {
  107. newPartitionSlot.AssignedInstanceId = consumerInstanceId
  108. consumerInstancePartitionCount[consumerInstanceId]++
  109. consumerInstanceIdsIndex++
  110. if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
  111. consumerInstanceIdsIndex = 0
  112. }
  113. break
  114. } else {
  115. consumerInstanceIdsIndex++
  116. if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
  117. consumerInstanceIdsIndex = 0
  118. }
  119. }
  120. }
  121. }
  122. }
  123. return newPartitionSlots
  124. }