publisher.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package pub_client
  2. import (
  3. "fmt"
  4. "github.com/rdleal/intervalst/interval"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/credentials/insecure"
  9. "sync"
  10. "time"
  11. )
  12. type PublisherConfiguration struct {
  13. CreateTopic bool
  14. CreateTopicPartitionCount int32
  15. }
  16. type PublishClient struct {
  17. mq_pb.SeaweedMessaging_PublishClient
  18. Broker string
  19. Err error
  20. }
  21. type TopicPublisher struct {
  22. namespace string
  23. topic string
  24. partition2Broker *interval.SearchTree[*PublishClient, int32]
  25. grpcDialOption grpc.DialOption
  26. sync.Mutex // protects grpc
  27. config *PublisherConfiguration
  28. }
  29. func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher {
  30. return &TopicPublisher{
  31. namespace: namespace,
  32. topic: topic,
  33. partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
  34. return int(a - b)
  35. }),
  36. grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  37. config: config,
  38. }
  39. }
  40. func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) {
  41. if len(bootstrapBrokers) == 0 {
  42. return nil
  43. }
  44. for _, b := range bootstrapBrokers {
  45. err = p.doLookupAndConnect(b)
  46. if err == nil {
  47. return nil
  48. }
  49. fmt.Printf("failed to connect to %s: %v\n\n", b, err)
  50. }
  51. return err
  52. }
  53. func (p *TopicPublisher) Shutdown() error {
  54. if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
  55. for _, client := range clients {
  56. client.CloseSend()
  57. }
  58. }
  59. time.Sleep(1100 * time.Millisecond)
  60. return nil
  61. }