partition_consumer_mapping.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "time"
  6. )
  7. type PartitionConsumerMapping struct {
  8. currentMapping *PartitionSlotToConsumerInstanceList
  9. prevMappings []*PartitionSlotToConsumerInstanceList
  10. }
  11. func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
  12. newVersion := time.Now().UnixNano()
  13. return &PartitionConsumerMapping{
  14. currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
  15. }
  16. }
  17. // Balance goal:
  18. // 1. max processing power utilization
  19. // 2. allow one consumer instance to be down unexpectedly
  20. // without affecting the processing power utilization
  21. func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*topic.Partition, consumerInstanceIds []string) {
  22. if len(partitions) == 0 || len(consumerInstanceIds) == 0 {
  23. return
  24. }
  25. newVersion := time.Now().UnixNano()
  26. newMapping := NewPartitionSlotToConsumerInstanceList(partitions[0].RingSize, newVersion)
  27. newMapping.PartitionSlots = doBalanceSticky(partitions, consumerInstanceIds, pcm.prevMappings[0])
  28. if pcm.currentMapping != nil {
  29. pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
  30. }
  31. pcm.currentMapping = newMapping
  32. }
  33. func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
  34. // collect previous consumer instance ids
  35. prevConsumerInstanceIds := make(map[string]struct{})
  36. if prevMapping != nil {
  37. for _, prevPartitionSlot := range prevMapping.PartitionSlots {
  38. if prevPartitionSlot.AssignedInstanceId != "" {
  39. prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
  40. }
  41. }
  42. }
  43. // collect current consumer instance ids
  44. currConsumerInstanceIds := make(map[string]struct{})
  45. for _, consumerInstanceId := range consumerInstanceIds {
  46. currConsumerInstanceIds[consumerInstanceId] = struct{}{}
  47. }
  48. // check deleted consumer instances
  49. deletedConsumerInstanceIds := make(map[string]struct{})
  50. for consumerInstanceId := range prevConsumerInstanceIds {
  51. if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
  52. deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
  53. }
  54. }
  55. // convert partition slots from list to a map
  56. prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
  57. if prevMapping != nil {
  58. for _, partitionSlot := range prevMapping.PartitionSlots {
  59. key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
  60. prevPartitionSlotMap[key] = partitionSlot
  61. }
  62. }
  63. // make a copy of old mapping, skipping the deleted consumer instances
  64. newPartitionSlots := ToPartitionSlots(partitions)
  65. for _, newPartitionSlot := range newPartitionSlots {
  66. key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
  67. if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
  68. if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
  69. newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
  70. }
  71. }
  72. }
  73. // for all consumer instances, count the average number of partitions
  74. // that are assigned to them
  75. consumerInstancePartitionCount := make(map[string]int)
  76. for _, newPartitionSlot := range newPartitionSlots {
  77. if newPartitionSlot.AssignedInstanceId != "" {
  78. consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
  79. }
  80. }
  81. // average number of partitions that are assigned to each consumer instance
  82. averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds))
  83. // assign unassigned partition slots to consumer instances that is underloaded
  84. consumerInstanceIdsIndex := 0
  85. for _, newPartitionSlot := range newPartitionSlots {
  86. if newPartitionSlot.AssignedInstanceId == "" {
  87. for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- {
  88. consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex]
  89. if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad {
  90. newPartitionSlot.AssignedInstanceId = consumerInstanceId
  91. consumerInstancePartitionCount[consumerInstanceId]++
  92. consumerInstanceIdsIndex++
  93. if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
  94. consumerInstanceIdsIndex = 0
  95. }
  96. break
  97. } else {
  98. consumerInstanceIdsIndex++
  99. if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
  100. consumerInstanceIdsIndex = 0
  101. }
  102. }
  103. }
  104. }
  105. }
  106. return newPartitionSlots
  107. }