publisher.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package msgclient
  2. import (
  3. "context"
  4. "github.com/OneOfOne/xxhash"
  5. "google.golang.org/grpc"
  6. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. )
  9. type Publisher struct {
  10. publishClients []messaging_pb.SeaweedMessaging_PublishClient
  11. topicConfiguration *messaging_pb.TopicConfiguration
  12. messageCount uint64
  13. publisherId string
  14. }
  15. func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
  16. // read topic configuration
  17. topicConfiguration := &messaging_pb.TopicConfiguration{
  18. PartitionCount: 4,
  19. }
  20. publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
  21. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  22. tp := broker.TopicPartition{
  23. Namespace: namespace,
  24. Topic: topic,
  25. Partition: int32(i),
  26. }
  27. grpcClientConn, err := mc.findBroker(tp)
  28. if err != nil {
  29. return nil, err
  30. }
  31. client, err := setupPublisherClient(grpcClientConn, tp)
  32. if err != nil {
  33. return nil, err
  34. }
  35. publishClients[i] = client
  36. }
  37. return &Publisher{
  38. publishClients: publishClients,
  39. topicConfiguration: topicConfiguration,
  40. }, nil
  41. }
  42. func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
  43. stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
  44. if err != nil {
  45. return nil, err
  46. }
  47. // send init message
  48. err = stream.Send(&messaging_pb.PublishRequest{
  49. Init: &messaging_pb.PublishRequest_InitMessage{
  50. Namespace: tp.Namespace,
  51. Topic: tp.Topic,
  52. Partition: tp.Partition,
  53. },
  54. })
  55. if err != nil {
  56. return nil, err
  57. }
  58. // process init response
  59. initResponse, err := stream.Recv()
  60. if err != nil {
  61. return nil, err
  62. }
  63. if initResponse.Redirect != nil {
  64. // TODO follow redirection
  65. }
  66. if initResponse.Config != nil {
  67. }
  68. // setup looks for control messages
  69. doneChan := make(chan error, 1)
  70. go func() {
  71. for {
  72. in, err := stream.Recv()
  73. if err != nil {
  74. doneChan <- err
  75. return
  76. }
  77. if in.Redirect != nil {
  78. }
  79. if in.Config != nil {
  80. }
  81. }
  82. }()
  83. return stream, nil
  84. }
  85. func (p *Publisher) Publish(m *messaging_pb.Message) error {
  86. hashValue := p.messageCount
  87. p.messageCount++
  88. if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
  89. if m.Key != nil {
  90. hashValue = xxhash.Checksum64(m.Key)
  91. }
  92. } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
  93. hashValue = xxhash.Checksum64(m.Key)
  94. } else {
  95. // round robin
  96. }
  97. idx := int(hashValue) % len(p.publishClients)
  98. if idx < 0 {
  99. idx += len(p.publishClients)
  100. }
  101. return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
  102. Data: m,
  103. })
  104. }