publisher.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package pub_client
  2. import (
  3. "github.com/rdleal/intervalst/interval"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/credentials/insecure"
  8. "sync"
  9. "time"
  10. )
  11. type PublisherConfiguration struct {
  12. }
  13. type PublishClient struct {
  14. mq_pb.SeaweedMessaging_PublishClient
  15. Broker string
  16. Err error
  17. }
  18. type TopicPublisher struct {
  19. namespace string
  20. topic string
  21. partition2Broker *interval.SearchTree[*PublishClient, int32]
  22. grpcDialOption grpc.DialOption
  23. sync.Mutex // protects grpc
  24. }
  25. func NewTopicPublisher(namespace, topic string) *TopicPublisher {
  26. return &TopicPublisher{
  27. namespace: namespace,
  28. topic: topic,
  29. partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
  30. return int(a - b)
  31. }),
  32. grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  33. }
  34. }
  35. func (p *TopicPublisher) Connect(bootstrapBroker string) error {
  36. if err := p.doLookup(bootstrapBroker); err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (p *TopicPublisher) Shutdown() error {
  42. if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found {
  43. for _, client := range clients {
  44. client.CloseSend()
  45. }
  46. }
  47. time.Sleep(1100 * time.Millisecond)
  48. return nil
  49. }