broker_grpc_topic_partition_control.go 838 B

12345678910111213141516171819202122232425262728
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. func (b *MessageQueueBroker) ClosePublishers(ctx context.Context, request *mq_pb.ClosePublishersRequest) (resp *mq_pb.ClosePublishersResponse, err error) {
  8. resp = &mq_pb.ClosePublishersResponse{}
  9. t := topic.FromPbTopic(request.Topic)
  10. b.localTopicManager.ClosePublishers(t, request.UnixTimeNs)
  11. // wait until all publishers are closed
  12. b.localTopicManager.WaitUntilNoPublishers(t)
  13. return
  14. }
  15. func (b *MessageQueueBroker) CloseSubscribers(ctx context.Context, request *mq_pb.CloseSubscribersRequest) (resp *mq_pb.CloseSubscribersResponse, err error) {
  16. resp = &mq_pb.CloseSubscribersResponse{}
  17. b.localTopicManager.CloseSubscribers(topic.FromPbTopic(request.Topic), request.UnixTimeNs)
  18. return
  19. }