local_topic.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package topic
  2. import "sync"
  3. type LocalTopic struct {
  4. Topic
  5. Partitions []*LocalPartition
  6. }
  7. func NewLocalTopic(topic Topic) *LocalTopic {
  8. return &LocalTopic{
  9. Topic: topic,
  10. Partitions: make([]*LocalPartition, 0),
  11. }
  12. }
  13. func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
  14. for _, localPartition := range localTopic.Partitions {
  15. if localPartition.Partition.Equals(partition) {
  16. return localPartition
  17. }
  18. }
  19. return nil
  20. }
  21. func (localTopic *LocalTopic) removePartition(partition Partition) bool {
  22. foundPartitionIndex := -1
  23. for i, localPartition := range localTopic.Partitions {
  24. if localPartition.Partition.Equals(partition) {
  25. foundPartitionIndex = i
  26. localPartition.Shutdown()
  27. break
  28. }
  29. }
  30. if foundPartitionIndex == -1 {
  31. return false
  32. }
  33. localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
  34. return true
  35. }
  36. func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
  37. var wg sync.WaitGroup
  38. for _, localPartition := range localTopic.Partitions {
  39. if localPartition.UnixTimeNs != unixTsNs {
  40. continue
  41. }
  42. wg.Add(1)
  43. go func(localPartition *LocalPartition) {
  44. defer wg.Done()
  45. localPartition.closePublishers()
  46. }(localPartition)
  47. }
  48. wg.Wait()
  49. return true
  50. }
  51. func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool {
  52. var wg sync.WaitGroup
  53. for _, localPartition := range localTopic.Partitions {
  54. if localPartition.UnixTimeNs != unixTsNs {
  55. continue
  56. }
  57. wg.Add(1)
  58. go func(localPartition *LocalPartition) {
  59. defer wg.Done()
  60. localPartition.closeSubscribers()
  61. }(localPartition)
  62. }
  63. wg.Wait()
  64. return true
  65. }
  66. func (localTopic *LocalTopic) WaitUntilNoPublishers() {
  67. for {
  68. var wg sync.WaitGroup
  69. for _, localPartition := range localTopic.Partitions {
  70. wg.Add(1)
  71. go func(localPartition *LocalPartition) {
  72. defer wg.Done()
  73. localPartition.WaitUntilNoPublishers()
  74. }(localPartition)
  75. }
  76. wg.Wait()
  77. if len(localTopic.Partitions) == 0 {
  78. return
  79. }
  80. }
  81. }