123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package pub_client
- import (
- "github.com/rdleal/intervalst/interval"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "sync"
- "time"
- )
- type PublisherConfiguration struct {
- }
- type PublishClient struct {
- mq_pb.SeaweedMessaging_PublishClient
- Broker string
- Err error
- }
- type TopicPublisher struct {
- namespace string
- topic string
- partition2Broker *interval.SearchTree[*PublishClient, int32]
- grpcDialOption grpc.DialOption
- sync.Mutex // protects grpc
- }
- func NewTopicPublisher(namespace, topic string) *TopicPublisher {
- return &TopicPublisher{
- namespace: namespace,
- topic: topic,
- partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
- return int(a - b)
- }),
- grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
- }
- }
- func (p *TopicPublisher) Connect(bootstrapBroker string) error {
- if err := p.doLookup(bootstrapBroker); err != nil {
- return err
- }
- return nil
- }
- func (p *TopicPublisher) Shutdown() error {
- if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found {
- for _, client := range clients {
- client.CloseSend()
- }
- }
- time.Sleep(1100 * time.Millisecond)
- return nil
- }
|