subscriber.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package sub_client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  4. "google.golang.org/grpc"
  5. "time"
  6. )
  7. type SubscriberConfiguration struct {
  8. ClientId string
  9. GroupId string
  10. GroupInstanceId string
  11. GroupMinimumPeers int32
  12. GroupMaximumPeers int32
  13. BootstrapServers []string
  14. GrpcDialOption grpc.DialOption
  15. }
  16. type ContentConfiguration struct {
  17. Namespace string
  18. Topic string
  19. Filter string
  20. StartTime time.Time
  21. }
  22. type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
  23. type OnCompletionFunc func()
  24. type TopicSubscriber struct {
  25. SubscriberConfig *SubscriberConfiguration
  26. ContentConfig *ContentConfiguration
  27. brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
  28. OnEachMessageFunc OnEachMessageFunc
  29. OnCompletionFunc OnCompletionFunc
  30. bootstrapBrokers []string
  31. waitForMoreMessage bool
  32. alreadyProcessedTsNs int64
  33. }
  34. func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
  35. return &TopicSubscriber{
  36. SubscriberConfig: subscriber,
  37. ContentConfig: content,
  38. bootstrapBrokers: bootstrapBrokers,
  39. waitForMoreMessage: true,
  40. alreadyProcessedTsNs: content.StartTime.UnixNano(),
  41. }
  42. }
  43. func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
  44. sub.OnEachMessageFunc = onEachMessageFn
  45. }
  46. func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
  47. sub.OnCompletionFunc = onCompletionFn
  48. }