notification_gocdk_pub_sub.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package sub
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "github.com/golang/protobuf/proto"
  8. "github.com/streadway/amqp"
  9. "gocloud.dev/pubsub"
  10. _ "gocloud.dev/pubsub/awssnssqs"
  11. "gocloud.dev/pubsub/rabbitpubsub"
  12. "net/url"
  13. "os"
  14. "path"
  15. "strings"
  16. "time"
  17. // _ "gocloud.dev/pubsub/azuresb"
  18. _ "gocloud.dev/pubsub/gcppubsub"
  19. _ "gocloud.dev/pubsub/natspubsub"
  20. _ "gocloud.dev/pubsub/rabbitpubsub"
  21. )
  22. func init() {
  23. NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
  24. }
  25. func getPath(rawUrl string) string {
  26. parsedUrl, _ := url.Parse(rawUrl)
  27. return path.Join(parsedUrl.Host, parsedUrl.Path)
  28. }
  29. func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error {
  30. exchangeName := getPath(exchangeUrl)
  31. queueName := getPath(queueUrl)
  32. exchangeNameDLX := "DLX." + exchangeName
  33. queueNameDLX := "DLX." + queueName
  34. ch, err := conn.Channel()
  35. if err != nil {
  36. glog.Error(err)
  37. return err
  38. }
  39. defer ch.Close()
  40. if err := ch.ExchangeDeclare(
  41. exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil {
  42. glog.Error(err)
  43. return err
  44. }
  45. if err := ch.ExchangeDeclare(
  46. exchangeName, "fanout", false, false, false, false, nil); err != nil {
  47. glog.Error(err)
  48. return err
  49. }
  50. if _, err := ch.QueueDeclare(
  51. queueName, false, false, false, false,
  52. amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
  53. glog.Error(err)
  54. return err
  55. }
  56. if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
  57. glog.Error(err)
  58. return err
  59. }
  60. if _, err := ch.QueueDeclare(
  61. queueNameDLX, false, false, false, false,
  62. amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
  63. glog.Error(err)
  64. return err
  65. }
  66. if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
  67. glog.Error(err)
  68. return err
  69. }
  70. return nil
  71. }
  72. type GoCDKPubSubInput struct {
  73. sub *pubsub.Subscription
  74. subURL string
  75. }
  76. func (k *GoCDKPubSubInput) GetName() string {
  77. return "gocdk_pub_sub"
  78. }
  79. func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
  80. topicUrl := configuration.GetString(prefix + "topic_url")
  81. k.subURL = configuration.GetString(prefix + "sub_url")
  82. glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
  83. sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
  84. if err != nil {
  85. return err
  86. }
  87. var conn *amqp.Connection
  88. if sub.As(&conn) {
  89. ch, err := conn.Channel()
  90. if err != nil {
  91. return err
  92. }
  93. defer ch.Close()
  94. _, err = ch.QueueInspect(getPath(k.subURL))
  95. if err != nil {
  96. if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
  97. if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
  98. return err
  99. }
  100. } else {
  101. return err
  102. }
  103. }
  104. }
  105. k.sub = sub
  106. return nil
  107. }
  108. func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
  109. ctx := context.Background()
  110. msg, err := k.sub.Receive(ctx)
  111. if err != nil {
  112. var conn *amqp.Connection
  113. if k.sub.As(&conn) && conn.IsClosed() {
  114. conn.Close()
  115. k.sub.Shutdown(ctx)
  116. conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
  117. if err != nil {
  118. glog.Error(err)
  119. time.Sleep(time.Second)
  120. return
  121. }
  122. k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
  123. return
  124. }
  125. // This is permanent cached sub err
  126. glog.Fatal(err)
  127. }
  128. onFailureFn = func() {
  129. if msg.Nackable() {
  130. isRedelivered := false
  131. var delivery amqp.Delivery
  132. if msg.As(&delivery) {
  133. isRedelivered = delivery.Redelivered
  134. glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
  135. }
  136. if isRedelivered {
  137. if err := delivery.Nack(false, false); err != nil {
  138. glog.Error(err)
  139. }
  140. } else {
  141. msg.Nack()
  142. }
  143. }
  144. }
  145. onSuccessFn = func() {
  146. msg.Ack()
  147. }
  148. key = msg.Metadata["key"]
  149. message = &filer_pb.EventNotification{}
  150. err = proto.Unmarshal(msg.Body, message)
  151. if err != nil {
  152. return "", nil, onSuccessFn, onFailureFn, err
  153. }
  154. return key, message, onSuccessFn, onFailureFn, nil
  155. }