scheduler.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "google.golang.org/grpc/codes"
  10. "google.golang.org/grpc/status"
  11. "log"
  12. "sort"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. type EachPartitionError struct {
  18. *mq_pb.BrokerPartitionAssignment
  19. Err error
  20. generation int
  21. }
  22. type EachPartitionPublishJob struct {
  23. *mq_pb.BrokerPartitionAssignment
  24. stopChan chan bool
  25. wg sync.WaitGroup
  26. generation int
  27. inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
  28. }
  29. func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
  30. if err := p.doConfigureTopic(); err != nil {
  31. return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
  32. }
  33. log.Printf("start scheduler thread for topic %s", p.config.Topic)
  34. generation := 0
  35. var errChan chan EachPartitionError
  36. for {
  37. glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
  38. if assignments, err := p.doLookupTopicPartitions(); err == nil {
  39. generation++
  40. glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
  41. if errChan == nil {
  42. errChan = make(chan EachPartitionError, len(assignments))
  43. }
  44. p.onEachAssignments(generation, assignments, errChan)
  45. } else {
  46. glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
  47. time.Sleep(5 * time.Second)
  48. continue
  49. }
  50. if generation == 1 {
  51. wg.Done()
  52. }
  53. // wait for any error to happen. If so, consume all remaining errors, and retry
  54. for {
  55. select {
  56. case eachErr := <-errChan:
  57. glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
  58. if eachErr.generation < generation {
  59. continue
  60. }
  61. break
  62. }
  63. }
  64. }
  65. }
  66. func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
  67. // TODO assuming this is not re-configured so the partitions are fixed.
  68. sort.Slice(assignments, func(i, j int) bool {
  69. return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
  70. })
  71. var jobs []*EachPartitionPublishJob
  72. hasExistingJob := len(p.jobs) == len(assignments)
  73. for i, assignment := range assignments {
  74. if assignment.LeaderBroker == "" {
  75. continue
  76. }
  77. if hasExistingJob {
  78. var existingJob *EachPartitionPublishJob
  79. existingJob = p.jobs[i]
  80. if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
  81. existingJob.generation = generation
  82. jobs = append(jobs, existingJob)
  83. continue
  84. } else {
  85. if existingJob.LeaderBroker != "" {
  86. close(existingJob.stopChan)
  87. existingJob.LeaderBroker = ""
  88. existingJob.wg.Wait()
  89. }
  90. }
  91. }
  92. // start a go routine to publish to this partition
  93. job := &EachPartitionPublishJob{
  94. BrokerPartitionAssignment: assignment,
  95. stopChan: make(chan bool, 1),
  96. generation: generation,
  97. inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
  98. }
  99. job.wg.Add(1)
  100. go func(job *EachPartitionPublishJob) {
  101. defer job.wg.Done()
  102. if err := p.doPublishToPartition(job); err != nil {
  103. errChan <- EachPartitionError{assignment, err, generation}
  104. }
  105. }(job)
  106. jobs = append(jobs, job)
  107. // TODO assuming this is not re-configured so the partitions are fixed.
  108. // better just re-use the existing job
  109. p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
  110. }
  111. p.jobs = jobs
  112. }
  113. func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
  114. log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
  115. grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
  116. if err != nil {
  117. return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
  118. }
  119. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  120. stream, err := brokerClient.PublishMessage(context.Background())
  121. if err != nil {
  122. return fmt.Errorf("create publish client: %v", err)
  123. }
  124. publishClient := &PublishClient{
  125. SeaweedMessaging_PublishMessageClient: stream,
  126. Broker: job.LeaderBroker,
  127. }
  128. if err = publishClient.Send(&mq_pb.PublishMessageRequest{
  129. Message: &mq_pb.PublishMessageRequest_Init{
  130. Init: &mq_pb.PublishMessageRequest_InitMessage{
  131. Topic: p.config.Topic.ToPbTopic(),
  132. Partition: job.Partition,
  133. AckInterval: 128,
  134. FollowerBroker: job.FollowerBroker,
  135. PublisherName: p.config.PublisherName,
  136. },
  137. },
  138. }); err != nil {
  139. return fmt.Errorf("send init message: %v", err)
  140. }
  141. // process the hello message
  142. resp, err := stream.Recv()
  143. if err != nil {
  144. return fmt.Errorf("recv init response: %v", err)
  145. }
  146. if resp.Error != "" {
  147. return fmt.Errorf("init response error: %v", resp.Error)
  148. }
  149. var publishedTsNs int64
  150. hasMoreData := int32(1)
  151. var wg sync.WaitGroup
  152. wg.Add(1)
  153. go func() {
  154. defer wg.Done()
  155. for {
  156. ackResp, err := publishClient.Recv()
  157. if err != nil {
  158. e, _ := status.FromError(err)
  159. if e.Code() == codes.Unknown && e.Message() == "EOF" {
  160. log.Printf("publish to %s EOF", publishClient.Broker)
  161. return
  162. }
  163. publishClient.Err = err
  164. log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
  165. return
  166. }
  167. if ackResp.Error != "" {
  168. publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
  169. log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
  170. return
  171. }
  172. if ackResp.AckSequence > 0 {
  173. log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
  174. }
  175. if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
  176. return
  177. }
  178. }
  179. }()
  180. publishCounter := 0
  181. for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
  182. if data.Ctrl != nil && data.Ctrl.IsClose {
  183. // need to set this before sending to brokers, to avoid timing issue
  184. atomic.StoreInt32(&hasMoreData, 0)
  185. }
  186. if err := publishClient.Send(&mq_pb.PublishMessageRequest{
  187. Message: &mq_pb.PublishMessageRequest_Data{
  188. Data: data,
  189. },
  190. }); err != nil {
  191. return fmt.Errorf("send publish data: %v", err)
  192. }
  193. publishCounter++
  194. atomic.StoreInt64(&publishedTsNs, data.TsNs)
  195. }
  196. if publishCounter > 0 {
  197. wg.Wait()
  198. } else {
  199. // CloseSend would cancel the context on the server side
  200. if err := publishClient.CloseSend(); err != nil {
  201. return fmt.Errorf("close send: %v", err)
  202. }
  203. }
  204. log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
  205. return nil
  206. }
  207. func (p *TopicPublisher) doConfigureTopic() (err error) {
  208. if len(p.config.Brokers) == 0 {
  209. return fmt.Errorf("no bootstrap brokers")
  210. }
  211. var lastErr error
  212. for _, brokerAddress := range p.config.Brokers {
  213. err = pb.WithBrokerGrpcClient(false,
  214. brokerAddress,
  215. p.grpcDialOption,
  216. func(client mq_pb.SeaweedMessagingClient) error {
  217. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  218. Topic: p.config.Topic.ToPbTopic(),
  219. PartitionCount: p.config.PartitionCount,
  220. RecordType: p.config.RecordType, // TODO schema upgrade
  221. })
  222. return err
  223. })
  224. if err == nil {
  225. lastErr = nil
  226. return nil
  227. } else {
  228. lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
  229. }
  230. }
  231. if lastErr != nil {
  232. return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
  233. }
  234. return nil
  235. }
  236. func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  237. if len(p.config.Brokers) == 0 {
  238. return nil, fmt.Errorf("no bootstrap brokers")
  239. }
  240. var lastErr error
  241. for _, brokerAddress := range p.config.Brokers {
  242. err := pb.WithBrokerGrpcClient(false,
  243. brokerAddress,
  244. p.grpcDialOption,
  245. func(client mq_pb.SeaweedMessagingClient) error {
  246. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  247. &mq_pb.LookupTopicBrokersRequest{
  248. Topic: p.config.Topic.ToPbTopic(),
  249. })
  250. glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
  251. if err != nil {
  252. return err
  253. }
  254. if len(lookupResp.BrokerPartitionAssignments) == 0 {
  255. return fmt.Errorf("no broker partition assignments")
  256. }
  257. assignments = lookupResp.BrokerPartitionAssignments
  258. return nil
  259. })
  260. if err == nil {
  261. return assignments, nil
  262. } else {
  263. lastErr = err
  264. }
  265. }
  266. return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
  267. }