123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package sub_coordinator
- import (
- "fmt"
- cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/filer_client"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- )
- type TopicConsumerGroups struct {
- // map a consumer group name to a consumer group
- ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
- }
- // SubCoordinator coordinates the instances in the consumer group for one topic.
- // It is responsible for:
- // 1. (Maybe) assigning partitions when a consumer instance is up/down.
- type SubCoordinator struct {
- // map topic name to consumer groups
- TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
- FilerClientAccessor *filer_client.FilerClientAccessor
- }
- func NewSubCoordinator() *SubCoordinator {
- return &SubCoordinator{
- TopicSubscribers: cmap.New[*TopicConsumerGroups](),
- }
- }
- func (c *SubCoordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
- topicName := toTopicName(topic)
- tcg, _ := c.TopicSubscribers.Get(topicName)
- if tcg == nil && createIfMissing {
- tcg = &TopicConsumerGroups{
- ConsumerGroups: cmap.New[*ConsumerGroup](),
- }
- if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
- tcg, _ = c.TopicSubscribers.Get(topicName)
- }
- }
- return tcg
- }
- func (c *SubCoordinator) RemoveTopic(topic *mq_pb.Topic) {
- topicName := toTopicName(topic)
- c.TopicSubscribers.Remove(topicName)
- }
- func toTopicName(topic *mq_pb.Topic) string {
- topicName := topic.Namespace + "." + topic.Name
- return topicName
- }
- func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) (*ConsumerGroup, *ConsumerGroupInstance, error) {
- tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
- cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
- if cg == nil {
- cg = NewConsumerGroup(initMessage.Topic, initMessage.RebalanceSeconds, c.FilerClientAccessor)
- if cg != nil {
- tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg)
- }
- cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
- }
- if cg == nil {
- return nil, nil, fmt.Errorf("fail to create consumer group %s: topic %s not found", initMessage.ConsumerGroup, initMessage.Topic)
- }
- cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
- if cgi == nil {
- cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.MaxPartitionCount)
- if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
- cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
- }
- }
- cgi.MaxPartitionCount = initMessage.MaxPartitionCount
- cg.Market.AddConsumerInstance(cgi)
- return cg, cgi, nil
- }
- func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
- tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
- if tcg == nil {
- return
- }
- cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
- if cg == nil {
- return
- }
- cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
- cg.Market.RemoveConsumerInstance(ConsumerGroupInstanceId(initMessage.ConsumerGroupInstanceId))
- if cg.ConsumerGroupInstances.Count() == 0 {
- tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
- cg.Shutdown()
- }
- if tcg.ConsumerGroups.Count() == 0 {
- c.RemoveTopic(initMessage.Topic)
- }
- }
- func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
- tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
- if tcg == nil {
- return
- }
- for _, cg := range tcg.ConsumerGroups.Items() {
- cg.OnPartitionListChange(assignments)
- }
- }
|