1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- package sub_client
- import (
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
- )
- type SubscriberConfiguration struct {
- ClientId string
- GroupId string
- GroupInstanceId string
- BootstrapServers []string
- GrpcDialOption grpc.DialOption
- }
- type ContentConfiguration struct {
- Namespace string
- Topic string
- Filter string
- }
- type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
- type OnCompletionFunc func()
- type TopicSubscriber struct {
- SubscriberConfig *SubscriberConfiguration
- ContentConfig *ContentConfiguration
- brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
- OnEachMessageFunc OnEachMessageFunc
- OnCompletionFunc OnCompletionFunc
- }
- func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
- return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- }
- }
- func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
- if err := sub.doLookup(bootstrapBroker); err != nil {
- return err
- }
- return nil
- }
- func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
- sub.OnEachMessageFunc = onEachMessageFn
- }
- func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
- sub.OnCompletionFunc = onCompeletionFn
- }
|