local_partition_subscribers.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package topic
  2. import "sync"
  3. type LocalPartitionSubscribers struct {
  4. Subscribers map[string]*LocalSubscriber
  5. SubscribersLock sync.RWMutex
  6. }
  7. type LocalSubscriber struct {
  8. stopCh chan struct{}
  9. }
  10. func NewLocalSubscriber() *LocalSubscriber {
  11. return &LocalSubscriber{
  12. stopCh: make(chan struct{}, 1),
  13. }
  14. }
  15. func (p *LocalSubscriber) SignalShutdown() {
  16. close(p.stopCh)
  17. }
  18. func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
  19. return &LocalPartitionSubscribers{
  20. Subscribers: make(map[string]*LocalSubscriber),
  21. }
  22. }
  23. func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
  24. p.SubscribersLock.Lock()
  25. defer p.SubscribersLock.Unlock()
  26. p.Subscribers[clientName] = Subscriber
  27. }
  28. func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
  29. p.SubscribersLock.Lock()
  30. defer p.SubscribersLock.Unlock()
  31. delete(p.Subscribers, clientName)
  32. }
  33. func (p *LocalPartitionSubscribers) SignalShutdown() {
  34. p.SubscribersLock.RLock()
  35. defer p.SubscribersLock.RUnlock()
  36. for _, Subscriber := range p.Subscribers {
  37. Subscriber.SignalShutdown()
  38. }
  39. }
  40. func (p *LocalPartitionSubscribers) Size() int {
  41. p.SubscribersLock.RLock()
  42. defer p.SubscribersLock.RUnlock()
  43. return len(p.Subscribers)
  44. }