broker_grpc_pub.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "google.golang.org/grpc/peer"
  9. "math/rand"
  10. "net"
  11. "sync/atomic"
  12. "time"
  13. )
  14. // PUB
  15. // 1. gRPC API to configure a topic
  16. // 1.1 create a topic with existing partition count
  17. // 1.2 assign partitions to brokers
  18. // 2. gRPC API to lookup topic partitions
  19. // 3. gRPC API to publish by topic partitions
  20. // SUB
  21. // 1. gRPC API to lookup a topic partitions
  22. // Re-balance topic partitions for publishing
  23. // 1. collect stats from all the brokers
  24. // 2. Rebalance and configure new generation of partitions on brokers
  25. // 3. Tell brokers to close current gneration of publishing.
  26. // Publishers needs to lookup again and publish to the new generation of partitions.
  27. // Re-balance topic partitions for subscribing
  28. // 1. collect stats from all the brokers
  29. // Subscribers needs to listen for new partitions and connect to the brokers.
  30. // Each subscription may not get data. It can act as a backup.
  31. func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
  32. // 1. write to the volume server
  33. // 2. find the topic metadata owning filer
  34. // 3. write to the filer
  35. var localTopicPartition *topic.LocalPartition
  36. req, err := stream.Recv()
  37. if err != nil {
  38. return err
  39. }
  40. response := &mq_pb.PublishResponse{}
  41. // TODO check whether current broker should be the leader for the topic partition
  42. ackInterval := 1
  43. initMessage := req.GetInit()
  44. if initMessage != nil {
  45. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  46. localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
  47. if localTopicPartition == nil {
  48. response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
  49. glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
  50. return stream.Send(response)
  51. }
  52. ackInterval = int(initMessage.AckInterval)
  53. stream.Send(response)
  54. } else {
  55. response.Error = fmt.Sprintf("missing init message")
  56. glog.Errorf("missing init message")
  57. return stream.Send(response)
  58. }
  59. clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
  60. localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
  61. ackCounter := 0
  62. var ackSequence int64
  63. var isStopping int32
  64. respChan := make(chan *mq_pb.PublishResponse, 128)
  65. defer func() {
  66. atomic.StoreInt32(&isStopping, 1)
  67. close(respChan)
  68. localTopicPartition.Publishers.RemovePublisher(clientName)
  69. }()
  70. go func() {
  71. ticker := time.NewTicker(1 * time.Second)
  72. for {
  73. select {
  74. case resp := <-respChan:
  75. if resp != nil {
  76. if err := stream.Send(resp); err != nil {
  77. glog.Errorf("Error sending response %v: %v", resp, err)
  78. }
  79. } else {
  80. return
  81. }
  82. case <-ticker.C:
  83. if atomic.LoadInt32(&isStopping) == 0 {
  84. response := &mq_pb.PublishResponse{
  85. AckSequence: ackSequence,
  86. }
  87. respChan <- response
  88. } else {
  89. return
  90. }
  91. case <-localTopicPartition.StopPublishersCh:
  92. respChan <- &mq_pb.PublishResponse{
  93. AckSequence: ackSequence,
  94. ShouldClose: true,
  95. }
  96. }
  97. }
  98. }()
  99. // process each published messages
  100. for {
  101. // receive a message
  102. req, err := stream.Recv()
  103. if err != nil {
  104. return err
  105. }
  106. // Process the received message
  107. if dataMessage := req.GetData(); dataMessage != nil {
  108. localTopicPartition.Publish(dataMessage)
  109. }
  110. ackCounter++
  111. ackSequence++
  112. if ackCounter >= ackInterval {
  113. ackCounter = 0
  114. // send back the ack
  115. response := &mq_pb.PublishResponse{
  116. AckSequence: ackSequence,
  117. }
  118. respChan <- response
  119. }
  120. }
  121. glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
  122. return nil
  123. }
  124. // duplicated from master_grpc_server.go
  125. func findClientAddress(ctx context.Context) string {
  126. // fmt.Printf("FromContext %+v\n", ctx)
  127. pr, ok := peer.FromContext(ctx)
  128. if !ok {
  129. glog.Error("failed to get peer from ctx")
  130. return ""
  131. }
  132. if pr.Addr == net.Addr(nil) {
  133. glog.Error("failed to get peer address")
  134. return ""
  135. }
  136. return pr.Addr.String()
  137. }