subscribe.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package sub_client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "sync"
  7. "time"
  8. )
  9. type ProcessorState struct {
  10. stopCh chan struct{}
  11. }
  12. // Subscribe subscribes to a topic's specified partitions.
  13. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
  14. func (sub *TopicSubscriber) Subscribe() error {
  15. go sub.startProcessors()
  16. // loop forever
  17. sub.doKeepConnectedToSubCoordinator()
  18. return nil
  19. }
  20. func (sub *TopicSubscriber) startProcessors() {
  21. // listen to the messages from the sub coordinator
  22. // start one processor per partition
  23. var wg sync.WaitGroup
  24. semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount)
  25. for message := range sub.brokerPartitionAssignmentChan {
  26. if assigned := message.GetAssignment(); assigned != nil {
  27. wg.Add(1)
  28. semaphore <- struct{}{}
  29. topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition)
  30. // wait until no covering partition is still in progress
  31. sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
  32. // start a processors
  33. stopChan := make(chan struct{})
  34. sub.activeProcessorsLock.Lock()
  35. sub.activeProcessors[topicPartition] = &ProcessorState{
  36. stopCh: stopChan,
  37. }
  38. sub.activeProcessorsLock.Unlock()
  39. go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
  40. defer func() {
  41. sub.activeProcessorsLock.Lock()
  42. delete(sub.activeProcessors, topicPartition)
  43. sub.activeProcessorsLock.Unlock()
  44. <-semaphore
  45. wg.Done()
  46. }()
  47. glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
  48. sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
  49. Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
  50. AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
  51. Partition: assigned.Partition,
  52. },
  53. },
  54. }
  55. err := sub.onEachPartition(assigned, stopChan)
  56. if err != nil {
  57. glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
  58. } else {
  59. glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
  60. }
  61. sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
  62. Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
  63. AckUnAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{
  64. Partition: assigned.Partition,
  65. },
  66. },
  67. }
  68. }(assigned.PartitionAssignment, topicPartition)
  69. }
  70. if unAssignment := message.GetUnAssignment(); unAssignment != nil {
  71. topicPartition := topic.FromPbPartition(unAssignment.Partition)
  72. sub.activeProcessorsLock.Lock()
  73. if processor, found := sub.activeProcessors[topicPartition]; found {
  74. close(processor.stopCh)
  75. delete(sub.activeProcessors, topicPartition)
  76. }
  77. sub.activeProcessorsLock.Unlock()
  78. }
  79. }
  80. wg.Wait()
  81. }
  82. func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
  83. foundOverlapping := true
  84. for foundOverlapping {
  85. sub.activeProcessorsLock.Lock()
  86. foundOverlapping = false
  87. var overlappedPartition topic.Partition
  88. for partition, _ := range sub.activeProcessors {
  89. if partition.Overlaps(topicPartition) {
  90. if partition.Equals(topicPartition) {
  91. continue
  92. }
  93. foundOverlapping = true
  94. overlappedPartition = partition
  95. break
  96. }
  97. }
  98. sub.activeProcessorsLock.Unlock()
  99. if foundOverlapping {
  100. glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
  101. time.Sleep(1 * time.Second)
  102. }
  103. }
  104. }