market.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. package sub_coordinator
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "sync"
  7. "time"
  8. )
  9. /*
  10. Market is a data structure that keeps track of the state of the consumer group instances and the partitions.
  11. When rebalancing, the market will try to balance the load of the partitions among the consumer group instances.
  12. For each loop, the market will:
  13. * If a consumer group instance has more partitions than the average, it will unassign some partitions.
  14. * If a consumer group instance has less partitions than the average, it will assign some partitions.
  15. Trigger rebalance when:
  16. * A new consumer group instance is added
  17. * Some partitions are unassigned from a consumer group instance.
  18. If multiple reblance requests are received, after a certain period, the market will only process the latest request.
  19. However, if the number of unassigned partition is increased to exactly the total number of partitions,
  20. and total partitions are less than or equal to the sum of the max partition count of all consumer group instances,
  21. the market will process the request immediately.
  22. This is to ensure a partition can be migrated to another consumer group instance as soon as possible.
  23. Emit these adjustments to the subscriber coordinator:
  24. * Assign a partition to a consumer group instance
  25. * Unassign a partition from a consumer group instance
  26. Because the adjustment is sent to the subscriber coordinator, the market will keep track of the inflight adjustments.
  27. The subscriber coordinator will send back the response to the market when the adjustment is processed.
  28. If the adjustment is older than a certain time(inflightAdjustmentTTL), it would be considered expired.
  29. Otherwise, the adjustment is considered inflight, so it would be used when calculating the load.
  30. Later features:
  31. * A consumer group instance is not keeping up with the load.
  32. Since a coordinator, and thus the market, may be restarted or moved to another node, the market should be able to recover the state from the subscriber coordinator.
  33. The subscriber coordinator should be able to send the current state of the consumer group instances and the partitions to the market.
  34. */
  35. type PartitionSlot struct {
  36. Partition topic.Partition
  37. AssignedTo *ConsumerGroupInstance // Track the consumer assigned to this partition slot
  38. }
  39. type Adjustment struct {
  40. isAssign bool
  41. partition topic.Partition
  42. consumer ConsumerGroupInstanceId
  43. ts time.Time
  44. }
  45. type Market struct {
  46. mu sync.Mutex
  47. partitions map[topic.Partition]*PartitionSlot
  48. consumerInstances map[ConsumerGroupInstanceId]*ConsumerGroupInstance
  49. AdjustmentChan chan *Adjustment
  50. inflightAdjustments []*Adjustment
  51. inflightAdjustmentTTL time.Duration
  52. lastBalancedTime time.Time
  53. stopChan chan struct{}
  54. balanceRequestChan chan struct{}
  55. hasBalanceRequest bool
  56. }
  57. func NewMarket(partitions []topic.Partition, inflightAdjustmentTTL time.Duration) *Market {
  58. partitionMap := make(map[topic.Partition]*PartitionSlot)
  59. for _, partition := range partitions {
  60. partitionMap[partition] = &PartitionSlot{
  61. Partition: partition,
  62. }
  63. }
  64. m := &Market{
  65. partitions: partitionMap,
  66. consumerInstances: make(map[ConsumerGroupInstanceId]*ConsumerGroupInstance),
  67. AdjustmentChan: make(chan *Adjustment, 100),
  68. inflightAdjustmentTTL: inflightAdjustmentTTL,
  69. stopChan: make(chan struct{}),
  70. balanceRequestChan: make(chan struct{}),
  71. }
  72. m.lastBalancedTime = time.Now()
  73. go m.loopBalanceLoad()
  74. return m
  75. }
  76. func (m *Market) ShutdownMarket() {
  77. close(m.stopChan)
  78. close(m.AdjustmentChan)
  79. }
  80. func (m *Market) AddConsumerInstance(consumer *ConsumerGroupInstance) error {
  81. m.mu.Lock()
  82. defer m.mu.Unlock()
  83. if _, exists := m.consumerInstances[consumer.InstanceId]; exists {
  84. return errors.New("consumer instance already exists")
  85. }
  86. m.consumerInstances[consumer.InstanceId] = consumer
  87. m.balanceRequestChan <- struct{}{}
  88. return nil
  89. }
  90. func (m *Market) RemoveConsumerInstance(consumerId ConsumerGroupInstanceId) error {
  91. m.mu.Lock()
  92. defer m.mu.Unlock()
  93. consumer, exists := m.consumerInstances[consumerId]
  94. if !exists {
  95. return nil
  96. }
  97. delete(m.consumerInstances, consumerId)
  98. for _, partition := range consumer.AssignedPartitions {
  99. if partitionSlot, exists := m.partitions[partition]; exists {
  100. partitionSlot.AssignedTo = nil
  101. }
  102. }
  103. m.balanceRequestChan <- struct{}{}
  104. return nil
  105. }
  106. func (m *Market) assignPartitionToConsumer(partition *PartitionSlot) {
  107. var bestConsumer *ConsumerGroupInstance
  108. var minLoad = int(^uint(0) >> 1) // Max int value
  109. inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
  110. for _, adjustment := range m.inflightAdjustments {
  111. if adjustment.isAssign {
  112. inflightConsumerAdjustments[adjustment.consumer]++
  113. } else {
  114. inflightConsumerAdjustments[adjustment.consumer]--
  115. }
  116. }
  117. for _, consumer := range m.consumerInstances {
  118. consumerLoad := len(consumer.AssignedPartitions)
  119. if inflightAdjustments, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
  120. consumerLoad += inflightAdjustments
  121. }
  122. // fmt.Printf("Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
  123. if consumerLoad < int(consumer.MaxPartitionCount) {
  124. if consumerLoad < minLoad {
  125. bestConsumer = consumer
  126. minLoad = consumerLoad
  127. // fmt.Printf("picked: Consumer %+v has load %d, max %d, min %d\n", consumer.InstanceId, consumerLoad, consumer.MaxPartitionCount, minLoad)
  128. }
  129. }
  130. }
  131. if bestConsumer != nil {
  132. // change consumer assigned partitions later when the adjustment is confirmed
  133. adjustment := &Adjustment{
  134. isAssign: true,
  135. partition: partition.Partition,
  136. consumer: bestConsumer.InstanceId,
  137. ts: time.Now(),
  138. }
  139. m.AdjustmentChan <- adjustment
  140. m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
  141. m.lastBalancedTime = adjustment.ts
  142. }
  143. }
  144. func (m *Market) loopBalanceLoad() {
  145. ticker := time.NewTicker(500 * time.Millisecond)
  146. defer ticker.Stop()
  147. for {
  148. select {
  149. case <-ticker.C:
  150. if m.hasBalanceRequest {
  151. m.hasBalanceRequest = false
  152. inflightAdjustments := make([]*Adjustment, 0, len(m.inflightAdjustments))
  153. for _, adjustment := range m.inflightAdjustments {
  154. if adjustment.ts.Add(m.inflightAdjustmentTTL).After(time.Now()) {
  155. inflightAdjustments = append(inflightAdjustments, adjustment)
  156. }
  157. }
  158. m.inflightAdjustments = inflightAdjustments
  159. m.doBalanceLoad()
  160. // println("Balance load completed.")
  161. m.Status()
  162. }
  163. case <-m.balanceRequestChan:
  164. m.hasBalanceRequest = true
  165. case <-m.stopChan:
  166. return
  167. }
  168. }
  169. }
  170. // doBalanceLoad will balance the load of the partitions among the consumer group instances.
  171. // It will try to unassign partitions from the consumer group instances that have more partitions than the average.
  172. // It will try to assign partitions to the consumer group instances that have less partitions than the average.
  173. func (m *Market) doBalanceLoad() {
  174. if len(m.consumerInstances) == 0 {
  175. return
  176. }
  177. // find the average load for all consumers
  178. averageLoad := m.findAverageLoad()
  179. // find the consumers with the higher load than average
  180. if m.adjustBusyConsumers(averageLoad) {
  181. return
  182. }
  183. // find partitions with no consumer assigned
  184. m.adjustUnassignedPartitions()
  185. }
  186. func (m *Market) findAverageLoad() (averageLoad float32) {
  187. var totalLoad int
  188. for _, consumer := range m.consumerInstances {
  189. totalLoad += len(consumer.AssignedPartitions)
  190. }
  191. for _, adjustment := range m.inflightAdjustments {
  192. if adjustment.isAssign {
  193. totalLoad++
  194. } else {
  195. totalLoad--
  196. }
  197. }
  198. averageLoad = float32(totalLoad) / float32(len(m.consumerInstances))
  199. return
  200. }
  201. func (m *Market) adjustBusyConsumers(averageLoad float32) (hasAdjustments bool) {
  202. inflightConsumerAdjustments := make(map[ConsumerGroupInstanceId]int)
  203. for _, adjustment := range m.inflightAdjustments {
  204. if adjustment.isAssign {
  205. inflightConsumerAdjustments[adjustment.consumer]++
  206. } else {
  207. inflightConsumerAdjustments[adjustment.consumer]--
  208. }
  209. }
  210. for _, consumer := range m.consumerInstances {
  211. consumerLoad := len(consumer.AssignedPartitions)
  212. if inflightAdjustment, exists := inflightConsumerAdjustments[consumer.InstanceId]; exists {
  213. consumerLoad += inflightAdjustment
  214. }
  215. delta := int(float32(consumerLoad) - averageLoad)
  216. if delta <= 0 {
  217. continue
  218. }
  219. adjustTime := time.Now()
  220. for i := 0; i < delta; i++ {
  221. adjustment := &Adjustment{
  222. isAssign: false,
  223. partition: consumer.AssignedPartitions[i],
  224. consumer: consumer.InstanceId,
  225. ts: adjustTime,
  226. }
  227. m.AdjustmentChan <- adjustment
  228. m.inflightAdjustments = append(m.inflightAdjustments, adjustment)
  229. m.lastBalancedTime = adjustment.ts
  230. }
  231. hasAdjustments = true
  232. }
  233. return
  234. }
  235. func (m *Market) adjustUnassignedPartitions() {
  236. inflightPartitionAdjustments := make(map[topic.Partition]bool)
  237. for _, adjustment := range m.inflightAdjustments {
  238. inflightPartitionAdjustments[adjustment.partition] = true
  239. }
  240. for _, partitionSlot := range m.partitions {
  241. if partitionSlot.AssignedTo == nil {
  242. if _, exists := inflightPartitionAdjustments[partitionSlot.Partition]; exists {
  243. continue
  244. }
  245. // fmt.Printf("Assigning partition %+v to consumer\n", partitionSlot.Partition)
  246. m.assignPartitionToConsumer(partitionSlot)
  247. }
  248. }
  249. }
  250. func (m *Market) ConfirmAdjustment(adjustment *Adjustment) {
  251. if adjustment.isAssign {
  252. m.confirmAssignPartition(adjustment.partition, adjustment.consumer)
  253. } else {
  254. m.unassignPartitionSlot(adjustment.partition)
  255. }
  256. glog.V(1).Infof("ConfirmAdjustment %+v", adjustment)
  257. m.Status()
  258. }
  259. func (m *Market) unassignPartitionSlot(partition topic.Partition) {
  260. m.mu.Lock()
  261. defer m.mu.Unlock()
  262. partitionSlot, exists := m.partitions[partition]
  263. if !exists {
  264. glog.V(0).Infof("partition %+v slot is not tracked", partition)
  265. return
  266. }
  267. if partitionSlot.AssignedTo == nil {
  268. glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition)
  269. return
  270. }
  271. consumer := partitionSlot.AssignedTo
  272. for i, p := range consumer.AssignedPartitions {
  273. if p == partition {
  274. consumer.AssignedPartitions = append(consumer.AssignedPartitions[:i], consumer.AssignedPartitions[i+1:]...)
  275. partitionSlot.AssignedTo = nil
  276. m.balanceRequestChan <- struct{}{}
  277. return
  278. }
  279. }
  280. glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition)
  281. }
  282. func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInstanceId ConsumerGroupInstanceId) {
  283. m.mu.Lock()
  284. defer m.mu.Unlock()
  285. partitionSlot, exists := m.partitions[partition]
  286. if !exists {
  287. glog.V(0).Infof("partition %+v slot is not tracked", partition)
  288. return
  289. }
  290. if partitionSlot.AssignedTo != nil {
  291. glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId)
  292. return
  293. }
  294. consumerInstance, exists := m.consumerInstances[consumerInstanceId]
  295. if !exists {
  296. glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId)
  297. return
  298. }
  299. partitionSlot.AssignedTo = consumerInstance
  300. consumerInstance.AssignedPartitions = append(consumerInstance.AssignedPartitions, partition)
  301. }
  302. func (m *Market) Status() {
  303. m.mu.Lock()
  304. defer m.mu.Unlock()
  305. glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances))
  306. for partition, slot := range m.partitions {
  307. if slot.AssignedTo == nil {
  308. glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition)
  309. } else {
  310. glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId)
  311. }
  312. }
  313. for _, consumer := range m.consumerInstances {
  314. glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions))
  315. }
  316. }