kafka_queue.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package kafka
  2. import (
  3. "github.com/Shopify/sarama"
  4. "github.com/chrislusf/seaweedfs/weed/util/log"
  5. "github.com/chrislusf/seaweedfs/weed/notification"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "github.com/golang/protobuf/proto"
  8. )
  9. func init() {
  10. notification.MessageQueues = append(notification.MessageQueues, &KafkaQueue{})
  11. }
  12. type KafkaQueue struct {
  13. topic string
  14. producer sarama.AsyncProducer
  15. }
  16. func (k *KafkaQueue) GetName() string {
  17. return "kafka"
  18. }
  19. func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
  20. log.Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
  21. log.Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
  22. return k.initialize(
  23. configuration.GetStringSlice(prefix+"hosts"),
  24. configuration.GetString(prefix+"topic"),
  25. )
  26. }
  27. func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
  28. config := sarama.NewConfig()
  29. config.Producer.RequiredAcks = sarama.WaitForLocal
  30. config.Producer.Partitioner = sarama.NewHashPartitioner
  31. config.Producer.Return.Successes = true
  32. config.Producer.Return.Errors = true
  33. k.producer, err = sarama.NewAsyncProducer(hosts, config)
  34. if err != nil {
  35. return err
  36. }
  37. k.topic = topic
  38. go k.handleSuccess()
  39. go k.handleError()
  40. return nil
  41. }
  42. func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
  43. bytes, err := proto.Marshal(message)
  44. if err != nil {
  45. return
  46. }
  47. msg := &sarama.ProducerMessage{
  48. Topic: k.topic,
  49. Key: sarama.StringEncoder(key),
  50. Value: sarama.ByteEncoder(bytes),
  51. }
  52. k.producer.Input() <- msg
  53. return nil
  54. }
  55. func (k *KafkaQueue) handleSuccess() {
  56. for {
  57. pm := <-k.producer.Successes()
  58. if pm != nil {
  59. log.Tracef("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
  60. }
  61. }
  62. }
  63. func (k *KafkaQueue) handleError() {
  64. for {
  65. err := <-k.producer.Errors()
  66. if err != nil {
  67. log.Errorf("producer message error, partition:%d offset:%d key:%v value:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
  68. }
  69. }
  70. }