notification_kafka.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package sub
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/Shopify/sarama"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "google.golang.org/protobuf/proto"
  13. )
  14. func init() {
  15. NotificationInputs = append(NotificationInputs, &KafkaInput{})
  16. }
  17. type KafkaInput struct {
  18. topic string
  19. consumer sarama.Consumer
  20. messageChan chan *sarama.ConsumerMessage
  21. }
  22. func (k *KafkaInput) GetName() string {
  23. return "kafka"
  24. }
  25. func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
  26. glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
  27. glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
  28. return k.initialize(
  29. configuration.GetStringSlice(prefix+"hosts"),
  30. configuration.GetString(prefix+"topic"),
  31. configuration.GetString(prefix+"offsetFile"),
  32. configuration.GetInt(prefix+"offsetSaveIntervalSeconds"),
  33. )
  34. }
  35. func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) {
  36. config := sarama.NewConfig()
  37. config.Consumer.Return.Errors = true
  38. k.consumer, err = sarama.NewConsumer(hosts, config)
  39. if err != nil {
  40. panic(err)
  41. } else {
  42. glog.V(0).Infof("connected to %v", hosts)
  43. }
  44. k.topic = topic
  45. k.messageChan = make(chan *sarama.ConsumerMessage, 1)
  46. partitions, err := k.consumer.Partitions(topic)
  47. if err != nil {
  48. panic(err)
  49. }
  50. progress := loadProgress(offsetFile)
  51. if progress == nil || progress.Topic != topic {
  52. progress = &KafkaProgress{
  53. Topic: topic,
  54. PartitionOffsets: make(map[int32]int64),
  55. }
  56. }
  57. progress.lastSaveTime = time.Now()
  58. progress.offsetFile = offsetFile
  59. progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds
  60. for _, partition := range partitions {
  61. offset, found := progress.PartitionOffsets[partition]
  62. if !found {
  63. offset = sarama.OffsetOldest
  64. } else {
  65. offset += 1
  66. }
  67. partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset)
  68. if err != nil {
  69. panic(err)
  70. }
  71. go func() {
  72. for {
  73. select {
  74. case err := <-partitionConsumer.Errors():
  75. fmt.Println(err)
  76. case msg := <-partitionConsumer.Messages():
  77. k.messageChan <- msg
  78. if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
  79. glog.Warningf("set kafka offset: %v", err)
  80. }
  81. }
  82. }
  83. }()
  84. }
  85. return nil
  86. }
  87. func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
  88. msg := <-k.messageChan
  89. key = string(msg.Key)
  90. message = &filer_pb.EventNotification{}
  91. err = proto.Unmarshal(msg.Value, message)
  92. return
  93. }
  94. type KafkaProgress struct {
  95. Topic string `json:"topic"`
  96. PartitionOffsets map[int32]int64 `json:"partitionOffsets"`
  97. offsetFile string
  98. lastSaveTime time.Time
  99. offsetSaveIntervalSeconds int
  100. sync.Mutex
  101. }
  102. func loadProgress(offsetFile string) *KafkaProgress {
  103. progress := &KafkaProgress{}
  104. data, err := os.ReadFile(offsetFile)
  105. if err != nil {
  106. glog.Warningf("failed to read kafka progress file: %s", offsetFile)
  107. return nil
  108. }
  109. err = json.Unmarshal(data, progress)
  110. if err != nil {
  111. glog.Warningf("failed to read kafka progress message: %s", string(data))
  112. return nil
  113. }
  114. return progress
  115. }
  116. func (progress *KafkaProgress) saveProgress() error {
  117. data, err := json.Marshal(progress)
  118. if err != nil {
  119. return fmt.Errorf("failed to marshal progress: %v", err)
  120. }
  121. err = util.WriteFile(progress.offsetFile, data, 0640)
  122. if err != nil {
  123. return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err)
  124. }
  125. progress.lastSaveTime = time.Now()
  126. return nil
  127. }
  128. func (progress *KafkaProgress) setOffset(partition int32, offset int64) error {
  129. progress.Lock()
  130. defer progress.Unlock()
  131. progress.PartitionOffsets[partition] = offset
  132. if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds {
  133. return progress.saveProgress()
  134. }
  135. return nil
  136. }