agent_grpc_sub_session.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package agent
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "google.golang.org/grpc"
  8. "google.golang.org/grpc/credentials/insecure"
  9. "math/rand/v2"
  10. "time"
  11. )
  12. func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_agent_pb.StartSubscribeSessionRequest) (*mq_agent_pb.StartSubscribeSessionResponse, error) {
  13. sessionId := rand.Int64()
  14. subscriberConfig := &sub_client.SubscriberConfiguration{
  15. ConsumerGroup: req.ConsumerGroup,
  16. ConsumerGroupInstanceId: req.ConsumerGroupInstanceId,
  17. GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  18. MaxPartitionCount: req.MaxSubscribedPartitions,
  19. SlidingWindowSize: req.SlidingWindowSize,
  20. }
  21. contentConfig := &sub_client.ContentConfiguration{
  22. Topic: topic.FromPbTopic(req.Topic),
  23. Filter: req.Filter,
  24. PartitionOffsets: req.PartitionOffsets,
  25. }
  26. topicSubscriber := sub_client.NewTopicSubscriber(
  27. a.brokersList(),
  28. subscriberConfig,
  29. contentConfig,
  30. make(chan sub_client.KeyedOffset, 1024),
  31. )
  32. a.subscribersLock.Lock()
  33. // remove inactive publishers to avoid memory leak
  34. for k, entry := range a.subscribers {
  35. if entry.lastActiveTsNs == 0 {
  36. // this is an active session
  37. continue
  38. }
  39. if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
  40. delete(a.subscribers, k)
  41. }
  42. }
  43. a.subscribers[SessionId(sessionId)] = &SessionEntry[*sub_client.TopicSubscriber]{
  44. entry: topicSubscriber,
  45. }
  46. a.subscribersLock.Unlock()
  47. return &mq_agent_pb.StartSubscribeSessionResponse{
  48. SessionId: sessionId,
  49. }, nil
  50. }