broker_grpc_pub.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "google.golang.org/grpc/peer"
  9. "io"
  10. "math/rand"
  11. "net"
  12. "sync/atomic"
  13. "time"
  14. )
  15. // PUB
  16. // 1. gRPC API to configure a topic
  17. // 1.1 create a topic with existing partition count
  18. // 1.2 assign partitions to brokers
  19. // 2. gRPC API to lookup topic partitions
  20. // 3. gRPC API to publish by topic partitions
  21. // SUB
  22. // 1. gRPC API to lookup a topic partitions
  23. // Re-balance topic partitions for publishing
  24. // 1. collect stats from all the brokers
  25. // 2. Rebalance and configure new generation of partitions on brokers
  26. // 3. Tell brokers to close current gneration of publishing.
  27. // Publishers needs to lookup again and publish to the new generation of partitions.
  28. // Re-balance topic partitions for subscribing
  29. // 1. collect stats from all the brokers
  30. // Subscribers needs to listen for new partitions and connect to the brokers.
  31. // Each subscription may not get data. It can act as a backup.
  32. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  33. // 1. write to the volume server
  34. // 2. find the topic metadata owning filer
  35. // 3. write to the filer
  36. req, err := stream.Recv()
  37. if err != nil {
  38. return err
  39. }
  40. response := &mq_pb.PublishMessageResponse{}
  41. // TODO check whether current broker should be the leader for the topic partition
  42. initMessage := req.GetInit()
  43. if initMessage == nil {
  44. response.Error = fmt.Sprintf("missing init message")
  45. glog.Errorf("missing init message")
  46. return stream.Send(response)
  47. }
  48. // get or generate a local partition
  49. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  50. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
  51. if getOrGenErr != nil {
  52. response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
  53. glog.Errorf("topic %v not found: %v", t, getOrGenErr)
  54. return stream.Send(response)
  55. }
  56. // connect to follower brokers
  57. if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
  58. response.Error = followerErr.Error()
  59. glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
  60. return stream.Send(response)
  61. }
  62. var receivedSequence, acknowledgedSequence int64
  63. var isClosed bool
  64. // start sending ack to publisher
  65. ackInterval := int64(1)
  66. if initMessage.AckInterval > 0 {
  67. ackInterval = int64(initMessage.AckInterval)
  68. }
  69. go func() {
  70. defer func() {
  71. // println("stop sending ack to publisher", initMessage.PublisherName)
  72. }()
  73. lastAckTime := time.Now()
  74. for !isClosed {
  75. receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
  76. if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
  77. acknowledgedSequence = receivedSequence
  78. response := &mq_pb.PublishMessageResponse{
  79. AckSequence: acknowledgedSequence,
  80. }
  81. if err := stream.Send(response); err != nil {
  82. glog.Errorf("Error sending response %v: %v", response, err)
  83. }
  84. // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
  85. lastAckTime = time.Now()
  86. } else {
  87. time.Sleep(1 * time.Second)
  88. }
  89. }
  90. }()
  91. // process each published messages
  92. clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
  93. localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
  94. defer func() {
  95. // remove the publisher
  96. localTopicPartition.Publishers.RemovePublisher(clientName)
  97. if localTopicPartition.MaybeShutdownLocalPartition() {
  98. b.localTopicManager.RemoveLocalPartition(t, p)
  99. glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
  100. }
  101. }()
  102. // send a hello message
  103. stream.Send(&mq_pb.PublishMessageResponse{})
  104. defer func() {
  105. isClosed = true
  106. }()
  107. // process each published messages
  108. for {
  109. // receive a message
  110. req, err := stream.Recv()
  111. if err != nil {
  112. if err == io.EOF {
  113. break
  114. }
  115. glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
  116. break
  117. }
  118. // Process the received message
  119. dataMessage := req.GetData()
  120. if dataMessage == nil {
  121. continue
  122. }
  123. // The control message should still be sent to the follower
  124. // to avoid timing issue when ack messages.
  125. // send to the local partition
  126. if err = localTopicPartition.Publish(dataMessage); err != nil {
  127. return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
  128. }
  129. }
  130. glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
  131. return nil
  132. }
  133. // duplicated from master_grpc_server.go
  134. func findClientAddress(ctx context.Context) string {
  135. // fmt.Printf("FromContext %+v\n", ctx)
  136. pr, ok := peer.FromContext(ctx)
  137. if !ok {
  138. glog.Error("failed to get peer from ctx")
  139. return ""
  140. }
  141. if pr.Addr == net.Addr(nil) {
  142. glog.Error("failed to get peer address")
  143. return ""
  144. }
  145. return pr.Addr.String()
  146. }