123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package topic
- import "sync"
- type LocalTopic struct {
- Topic
- Partitions []*LocalPartition
- partitionLock sync.RWMutex
- }
- func NewLocalTopic(topic Topic) *LocalTopic {
- return &LocalTopic{
- Topic: topic,
- Partitions: make([]*LocalPartition, 0),
- }
- }
- func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
- localTopic.partitionLock.RLock()
- defer localTopic.partitionLock.RUnlock()
- for _, localPartition := range localTopic.Partitions {
- if localPartition.Partition.Equals(partition) {
- return localPartition
- }
- }
- return nil
- }
- func (localTopic *LocalTopic) removePartition(partition Partition) bool {
- localTopic.partitionLock.Lock()
- defer localTopic.partitionLock.Unlock()
- foundPartitionIndex := -1
- for i, localPartition := range localTopic.Partitions {
- if localPartition.Partition.Equals(partition) {
- foundPartitionIndex = i
- localPartition.Shutdown()
- break
- }
- }
- if foundPartitionIndex == -1 {
- return false
- }
- localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
- return true
- }
- func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) {
- localTopic.partitionLock.Lock()
- defer localTopic.partitionLock.Unlock()
- for _, partition := range localTopic.Partitions {
- if localPartition.Partition.Equals(partition.Partition) {
- return
- }
- }
- localTopic.Partitions = append(localTopic.Partitions, localPartition)
- }
- func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
- var wg sync.WaitGroup
- for _, localPartition := range localTopic.Partitions {
- if localPartition.UnixTimeNs != unixTsNs {
- continue
- }
- wg.Add(1)
- go func(localPartition *LocalPartition) {
- defer wg.Done()
- localPartition.closePublishers()
- }(localPartition)
- }
- wg.Wait()
- return true
- }
- func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool {
- var wg sync.WaitGroup
- for _, localPartition := range localTopic.Partitions {
- if localPartition.UnixTimeNs != unixTsNs {
- continue
- }
- wg.Add(1)
- go func(localPartition *LocalPartition) {
- defer wg.Done()
- localPartition.closeSubscribers()
- }(localPartition)
- }
- wg.Wait()
- return true
- }
- func (localTopic *LocalTopic) WaitUntilNoPublishers() {
- for {
- var wg sync.WaitGroup
- for _, localPartition := range localTopic.Partitions {
- wg.Add(1)
- go func(localPartition *LocalPartition) {
- defer wg.Done()
- localPartition.WaitUntilNoPublishers()
- }(localPartition)
- }
- wg.Wait()
- if len(localTopic.Partitions) == 0 {
- return
- }
- }
- }
|