gocdk_pub_sub.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. //go:build gocdk
  2. // +build gocdk
  3. // Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
  4. // which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
  5. // Google Cloud PubSub, and RabbitMQ.
  6. //
  7. // In the config, select a provider and topic using a URL. See
  8. // https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
  9. //
  10. // The Go CDK PubSub API does not support administrative operations like topic
  11. // creation. Create the topic using a UI, CLI or provider-specific API before running
  12. // weed.
  13. //
  14. // The Go CDK obtains credentials via environment variables and other
  15. // provider-specific default mechanisms. See the provider's documentation for
  16. // details.
  17. package gocdk_pub_sub
  18. import (
  19. "context"
  20. "fmt"
  21. amqp "github.com/rabbitmq/amqp091-go"
  22. "gocloud.dev/pubsub"
  23. _ "gocloud.dev/pubsub/awssnssqs"
  24. "gocloud.dev/pubsub/rabbitpubsub"
  25. "google.golang.org/protobuf/proto"
  26. "net/url"
  27. "path"
  28. "sync"
  29. "time"
  30. "github.com/seaweedfs/seaweedfs/weed/glog"
  31. "github.com/seaweedfs/seaweedfs/weed/notification"
  32. "github.com/seaweedfs/seaweedfs/weed/util"
  33. // _ "gocloud.dev/pubsub/azuresb"
  34. _ "gocloud.dev/pubsub/gcppubsub"
  35. _ "gocloud.dev/pubsub/natspubsub"
  36. _ "gocloud.dev/pubsub/rabbitpubsub"
  37. "os"
  38. )
  39. func init() {
  40. notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
  41. }
  42. func getPath(rawUrl string) string {
  43. parsedUrl, _ := url.Parse(rawUrl)
  44. return path.Join(parsedUrl.Host, parsedUrl.Path)
  45. }
  46. type GoCDKPubSub struct {
  47. topicURL string
  48. topic *pubsub.Topic
  49. topicLock sync.RWMutex
  50. }
  51. func (k *GoCDKPubSub) GetName() string {
  52. return "gocdk_pub_sub"
  53. }
  54. func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
  55. k.topicLock.Lock()
  56. k.topic = topic
  57. k.topicLock.Unlock()
  58. k.doReconnect()
  59. }
  60. func (k *GoCDKPubSub) doReconnect() {
  61. var conn *amqp.Connection
  62. k.topicLock.RLock()
  63. defer k.topicLock.RUnlock()
  64. if k.topic.As(&conn) {
  65. go func(c *amqp.Connection) {
  66. <-c.NotifyClose(make(chan *amqp.Error))
  67. c.Close()
  68. k.topicLock.RLock()
  69. k.topic.Shutdown(context.Background())
  70. k.topicLock.RUnlock()
  71. for {
  72. glog.Info("Try reconnect")
  73. conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
  74. if err == nil {
  75. k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
  76. break
  77. }
  78. glog.Error(err)
  79. time.Sleep(time.Second)
  80. }
  81. }(conn)
  82. }
  83. }
  84. func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
  85. k.topicURL = configuration.GetString(prefix + "topic_url")
  86. glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
  87. topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
  88. if err != nil {
  89. glog.Fatalf("Failed to open topic: %v", err)
  90. }
  91. k.setTopic(topic)
  92. return nil
  93. }
  94. func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
  95. bytes, err := proto.Marshal(message)
  96. if err != nil {
  97. return err
  98. }
  99. k.topicLock.RLock()
  100. defer k.topicLock.RUnlock()
  101. err = k.topic.Send(context.Background(), &pubsub.Message{
  102. Body: bytes,
  103. Metadata: map[string]string{"key": key},
  104. })
  105. if err != nil {
  106. return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
  107. }
  108. return nil
  109. }