12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- package agent
- import (
- "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
- "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
- "google.golang.org/grpc"
- "sync"
- )
- type SessionId int64
- type SessionEntry[T any] struct {
- entry T
- lastActiveTsNs int64
- }
- type MessageQueueAgentOptions struct {
- SeedBrokers []pb.ServerAddress
- }
- type MessageQueueAgent struct {
- mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
- option *MessageQueueAgentOptions
- brokers []pb.ServerAddress
- grpcDialOption grpc.DialOption
- publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
- publishersLock sync.RWMutex
- subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
- subscribersLock sync.RWMutex
- }
- func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
- // check masters to list all brokers
- return &MessageQueueAgent{
- option: option,
- brokers: []pb.ServerAddress{},
- grpcDialOption: grpcDialOption,
- publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
- subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
- }
- }
- func (a *MessageQueueAgent) brokersList() []string {
- var brokers []string
- for _, broker := range a.brokers {
- brokers = append(brokers, broker.String())
- }
- return brokers
- }
|