topic_manager.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package broker
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  10. )
  11. type TopicPartition struct {
  12. Namespace string
  13. Topic string
  14. Partition int32
  15. }
  16. const (
  17. TopicPartitionFmt = "%s/%s_%02d"
  18. )
  19. func (tp *TopicPartition) String() string {
  20. return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
  21. }
  22. type TopicControl struct {
  23. sync.Mutex
  24. cond *sync.Cond
  25. subscriberCount int
  26. publisherCount int
  27. logBuffer *log_buffer.LogBuffer
  28. }
  29. type TopicManager struct {
  30. sync.Mutex
  31. topicControls map[TopicPartition]*TopicControl
  32. broker *MessageBroker
  33. }
  34. func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
  35. return &TopicManager{
  36. topicControls: make(map[TopicPartition]*TopicControl),
  37. broker: messageBroker,
  38. }
  39. }
  40. func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
  41. flushFn := func(startTime, stopTime time.Time, buf []byte) {
  42. if topicConfig.IsTransient {
  43. // return
  44. }
  45. // fmt.Printf("flushing with topic config %+v\n", topicConfig)
  46. startTime, stopTime = startTime.UTC(), stopTime.UTC()
  47. targetFile := fmt.Sprintf(
  48. "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
  49. filer.TopicsDir, tp.Namespace, tp.Topic,
  50. startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
  51. tp.Partition,
  52. )
  53. if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
  54. glog.V(0).Infof("log write failed %s: %v", targetFile, err)
  55. }
  56. }
  57. logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
  58. tl.cond.Broadcast()
  59. })
  60. return logBuffer
  61. }
  62. func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
  63. tm.Lock()
  64. defer tm.Unlock()
  65. tc, found := tm.topicControls[partition]
  66. if !found {
  67. tc = &TopicControl{}
  68. tc.cond = sync.NewCond(&tc.Mutex)
  69. tm.topicControls[partition] = tc
  70. tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
  71. }
  72. if isPublisher {
  73. tc.publisherCount++
  74. } else {
  75. tc.subscriberCount++
  76. }
  77. return tc
  78. }
  79. func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
  80. tm.Lock()
  81. defer tm.Unlock()
  82. lock, found := tm.topicControls[partition]
  83. if !found {
  84. return
  85. }
  86. if isPublisher {
  87. lock.publisherCount--
  88. } else {
  89. lock.subscriberCount--
  90. }
  91. if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
  92. delete(tm.topicControls, partition)
  93. lock.logBuffer.Shutdown()
  94. }
  95. }
  96. func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
  97. tm.Lock()
  98. defer tm.Unlock()
  99. for k := range tm.topicControls {
  100. tps = append(tps, k)
  101. }
  102. return
  103. }