broker_grpc_pub.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "google.golang.org/grpc/peer"
  9. "io"
  10. "math/rand"
  11. "net"
  12. "sync/atomic"
  13. "time"
  14. )
  15. // PUB
  16. // 1. gRPC API to configure a topic
  17. // 1.1 create a topic with existing partition count
  18. // 1.2 assign partitions to brokers
  19. // 2. gRPC API to lookup topic partitions
  20. // 3. gRPC API to publish by topic partitions
  21. // SUB
  22. // 1. gRPC API to lookup a topic partitions
  23. // Re-balance topic partitions for publishing
  24. // 1. collect stats from all the brokers
  25. // 2. Rebalance and configure new generation of partitions on brokers
  26. // 3. Tell brokers to close current gneration of publishing.
  27. // Publishers needs to lookup again and publish to the new generation of partitions.
  28. // Re-balance topic partitions for subscribing
  29. // 1. collect stats from all the brokers
  30. // Subscribers needs to listen for new partitions and connect to the brokers.
  31. // Each subscription may not get data. It can act as a backup.
  32. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  33. req, err := stream.Recv()
  34. if err != nil {
  35. return err
  36. }
  37. response := &mq_pb.PublishMessageResponse{}
  38. // TODO check whether current broker should be the leader for the topic partition
  39. initMessage := req.GetInit()
  40. if initMessage == nil {
  41. response.Error = fmt.Sprintf("missing init message")
  42. glog.Errorf("missing init message")
  43. return stream.Send(response)
  44. }
  45. // get or generate a local partition
  46. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  47. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
  48. if getOrGenErr != nil {
  49. response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
  50. glog.Errorf("topic %v not found: %v", t, getOrGenErr)
  51. return stream.Send(response)
  52. }
  53. // connect to follower brokers
  54. if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
  55. response.Error = followerErr.Error()
  56. glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
  57. return stream.Send(response)
  58. }
  59. var receivedSequence, acknowledgedSequence int64
  60. var isClosed bool
  61. // start sending ack to publisher
  62. ackInterval := int64(1)
  63. if initMessage.AckInterval > 0 {
  64. ackInterval = int64(initMessage.AckInterval)
  65. }
  66. go func() {
  67. defer func() {
  68. // println("stop sending ack to publisher", initMessage.PublisherName)
  69. }()
  70. lastAckTime := time.Now()
  71. for !isClosed {
  72. receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
  73. if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
  74. acknowledgedSequence = receivedSequence
  75. response := &mq_pb.PublishMessageResponse{
  76. AckSequence: acknowledgedSequence,
  77. }
  78. if err := stream.Send(response); err != nil {
  79. glog.Errorf("Error sending response %v: %v", response, err)
  80. }
  81. // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
  82. lastAckTime = time.Now()
  83. } else {
  84. time.Sleep(1 * time.Second)
  85. }
  86. }
  87. }()
  88. // process each published messages
  89. clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
  90. localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
  91. defer func() {
  92. // remove the publisher
  93. localTopicPartition.Publishers.RemovePublisher(clientName)
  94. if localTopicPartition.MaybeShutdownLocalPartition() {
  95. b.localTopicManager.RemoveLocalPartition(t, p)
  96. glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
  97. }
  98. }()
  99. // send a hello message
  100. stream.Send(&mq_pb.PublishMessageResponse{})
  101. defer func() {
  102. isClosed = true
  103. }()
  104. // process each published messages
  105. for {
  106. // receive a message
  107. req, err := stream.Recv()
  108. if err != nil {
  109. if err == io.EOF {
  110. break
  111. }
  112. glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
  113. break
  114. }
  115. // Process the received message
  116. dataMessage := req.GetData()
  117. if dataMessage == nil {
  118. continue
  119. }
  120. // The control message should still be sent to the follower
  121. // to avoid timing issue when ack messages.
  122. // send to the local partition
  123. if err = localTopicPartition.Publish(dataMessage); err != nil {
  124. return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
  125. }
  126. }
  127. glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
  128. return nil
  129. }
  130. // duplicated from master_grpc_server.go
  131. func findClientAddress(ctx context.Context) string {
  132. // fmt.Printf("FromContext %+v\n", ctx)
  133. pr, ok := peer.FromContext(ctx)
  134. if !ok {
  135. glog.Error("failed to get peer from ctx")
  136. return ""
  137. }
  138. if pr.Addr == net.Addr(nil) {
  139. glog.Error("failed to get peer address")
  140. return ""
  141. }
  142. return pr.Addr.String()
  143. }