topic_manager.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package broker
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/filer2"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  10. )
  11. type TopicPartition struct {
  12. Namespace string
  13. Topic string
  14. Partition int32
  15. }
  16. const (
  17. TopicPartitionFmt = "%s/%s_%02d"
  18. )
  19. func (tp *TopicPartition) String() string {
  20. return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
  21. }
  22. type TopicControl struct {
  23. sync.Mutex
  24. cond *sync.Cond
  25. subscriberCount int
  26. publisherCount int
  27. logBuffer *log_buffer.LogBuffer
  28. }
  29. type TopicManager struct {
  30. sync.Mutex
  31. topicControls map[TopicPartition]*TopicControl
  32. broker *MessageBroker
  33. }
  34. func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
  35. return &TopicManager{
  36. topicControls: make(map[TopicPartition]*TopicControl),
  37. broker: messageBroker,
  38. }
  39. }
  40. func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
  41. flushFn := func(startTime, stopTime time.Time, buf []byte) {
  42. if topicConfig.IsTransient {
  43. // return
  44. }
  45. // fmt.Printf("flushing with topic config %+v\n", topicConfig)
  46. targetFile := fmt.Sprintf(
  47. "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
  48. filer2.TopicsDir, tp.Namespace, tp.Topic,
  49. startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
  50. tp.Partition,
  51. )
  52. if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
  53. glog.V(0).Infof("log write failed %s: %v", targetFile, err)
  54. }
  55. }
  56. logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
  57. tl.cond.Broadcast()
  58. })
  59. return logBuffer
  60. }
  61. func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
  62. tm.Lock()
  63. defer tm.Unlock()
  64. tc, found := tm.topicControls[partition]
  65. if !found {
  66. tc = &TopicControl{}
  67. tc.cond = sync.NewCond(&tc.Mutex)
  68. tm.topicControls[partition] = tc
  69. tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
  70. }
  71. if isPublisher {
  72. tc.publisherCount++
  73. } else {
  74. tc.subscriberCount++
  75. }
  76. return tc
  77. }
  78. func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
  79. tm.Lock()
  80. defer tm.Unlock()
  81. lock, found := tm.topicControls[partition]
  82. if !found {
  83. return
  84. }
  85. if isPublisher {
  86. lock.publisherCount--
  87. } else {
  88. lock.subscriberCount--
  89. }
  90. if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
  91. delete(tm.topicControls, partition)
  92. lock.logBuffer.Shutdown()
  93. }
  94. }
  95. func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
  96. tm.Lock()
  97. defer tm.Unlock()
  98. for k := range tm.topicControls {
  99. tps = append(tps, k)
  100. }
  101. return
  102. }