agent_server.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package agent
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "google.golang.org/grpc"
  8. "sync"
  9. )
  10. type SessionId int64
  11. type SessionEntry[T any] struct {
  12. entry T
  13. lastActiveTsNs int64
  14. }
  15. type MessageQueueAgentOptions struct {
  16. SeedBrokers []pb.ServerAddress
  17. }
  18. type MessageQueueAgent struct {
  19. mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
  20. option *MessageQueueAgentOptions
  21. brokers []pb.ServerAddress
  22. grpcDialOption grpc.DialOption
  23. publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
  24. publishersLock sync.RWMutex
  25. subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
  26. subscribersLock sync.RWMutex
  27. }
  28. func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
  29. // check masters to list all brokers
  30. return &MessageQueueAgent{
  31. option: option,
  32. brokers: []pb.ServerAddress{},
  33. grpcDialOption: grpcDialOption,
  34. publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
  35. subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
  36. }
  37. }
  38. func (a *MessageQueueAgent) brokersList() []string {
  39. var brokers []string
  40. for _, broker := range a.brokers {
  41. brokers = append(brokers, broker.String())
  42. }
  43. return brokers
  44. }