allocate.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package pub_balancer
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  7. "math/rand"
  8. "time"
  9. )
  10. func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment) {
  11. // divide the ring into partitions
  12. now := time.Now().UnixNano()
  13. rangeSize := MaxPartitionCount / partitionCount
  14. for i := int32(0); i < partitionCount; i++ {
  15. assignment := &mq_pb.BrokerPartitionAssignment{
  16. Partition: &schema_pb.Partition{
  17. RingSize: MaxPartitionCount,
  18. RangeStart: int32(i * rangeSize),
  19. RangeStop: int32((i + 1) * rangeSize),
  20. UnixTimeNs: now,
  21. },
  22. }
  23. if i == partitionCount-1 {
  24. assignment.Partition.RangeStop = MaxPartitionCount
  25. }
  26. assignments = append(assignments, assignment)
  27. }
  28. EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
  29. glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
  30. return
  31. }
  32. // randomly pick n brokers, which may contain duplicates
  33. // TODO pick brokers based on the broker stats
  34. func pickBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats], count int32) []string {
  35. candidates := make([]string, 0, brokers.Count())
  36. for brokerStatsItem := range brokers.IterBuffered() {
  37. candidates = append(candidates, brokerStatsItem.Key)
  38. }
  39. pickedBrokers := make([]string, 0, count)
  40. for i := int32(0); i < count; i++ {
  41. p := rand.Intn(len(candidates))
  42. pickedBrokers = append(pickedBrokers, candidates[p])
  43. }
  44. return pickedBrokers
  45. }
  46. // reservoir sampling select N brokers from the active brokers, with exclusion of the excluded broker
  47. func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, excludedBroker string) []string {
  48. pickedBrokers := make([]string, 0, count)
  49. for i, broker := range brokers {
  50. if broker == excludedBroker {
  51. continue
  52. }
  53. if len(pickedBrokers) < count {
  54. pickedBrokers = append(pickedBrokers, broker)
  55. } else {
  56. j := rand.Intn(i + 1)
  57. if j < count {
  58. pickedBrokers[j] = broker
  59. }
  60. }
  61. }
  62. // shuffle the picked brokers
  63. count = len(pickedBrokers)
  64. for i := 0; i < count; i++ {
  65. j := rand.Intn(count)
  66. pickedBrokers[i], pickedBrokers[j] = pickedBrokers[j], pickedBrokers[i]
  67. }
  68. return pickedBrokers
  69. }
  70. // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
  71. func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
  72. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
  73. candidates := make([]string, 0, activeBrokers.Count())
  74. for brokerStatsItem := range activeBrokers.IterBuffered() {
  75. candidates = append(candidates, brokerStatsItem.Key)
  76. }
  77. for _, assignment := range assignments {
  78. // count how many brokers are needed
  79. count := 0
  80. if assignment.LeaderBroker == "" {
  81. count++
  82. } else if _, found := activeBrokers.Get(assignment.LeaderBroker); !found {
  83. assignment.LeaderBroker = ""
  84. count++
  85. }
  86. if assignment.FollowerBroker == "" {
  87. count++
  88. } else if _, found := activeBrokers.Get(assignment.FollowerBroker); !found {
  89. assignment.FollowerBroker = ""
  90. count++
  91. }
  92. if count > 0 {
  93. pickedBrokers := pickBrokersExcluded(candidates, count, assignment.LeaderBroker, assignment.FollowerBroker)
  94. i := 0
  95. if assignment.LeaderBroker == "" {
  96. if i < len(pickedBrokers) {
  97. assignment.LeaderBroker = pickedBrokers[i]
  98. i++
  99. hasChanges = true
  100. }
  101. }
  102. if assignment.FollowerBroker == "" {
  103. if i < len(pickedBrokers) {
  104. assignment.FollowerBroker = pickedBrokers[i]
  105. i++
  106. hasChanges = true
  107. }
  108. }
  109. }
  110. }
  111. glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
  112. return
  113. }