subscriber.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package sub_client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  4. "google.golang.org/grpc"
  5. )
  6. type SubscriberConfiguration struct {
  7. ClientId string
  8. GroupId string
  9. GroupInstanceId string
  10. BootstrapServers []string
  11. GrpcDialOption grpc.DialOption
  12. }
  13. type ContentConfiguration struct {
  14. Namespace string
  15. Topic string
  16. Filter string
  17. }
  18. type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
  19. type OnCompletionFunc func()
  20. type TopicSubscriber struct {
  21. SubscriberConfig *SubscriberConfiguration
  22. ContentConfig *ContentConfiguration
  23. brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
  24. OnEachMessageFunc OnEachMessageFunc
  25. OnCompletionFunc OnCompletionFunc
  26. }
  27. func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
  28. return &TopicSubscriber{
  29. SubscriberConfig: subscriber,
  30. ContentConfig: content,
  31. }
  32. }
  33. func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
  34. if err := sub.doLookup(bootstrapBroker); err != nil {
  35. return err
  36. }
  37. return nil
  38. }
  39. func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
  40. sub.OnEachMessageFunc = onEachMessageFn
  41. }
  42. func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
  43. sub.OnCompletionFunc = onCompeletionFn
  44. }