gocdk_pub_sub.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
  2. // which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
  3. // Google Cloud PubSub, and RabbitMQ.
  4. //
  5. // In the config, select a provider and topic using a URL. See
  6. // https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
  7. //
  8. // The Go CDK PubSub API does not support administrative operations like topic
  9. // creation. Create the topic using a UI, CLI or provider-specific API before running
  10. // weed.
  11. //
  12. // The Go CDK obtains credentials via environment variables and other
  13. // provider-specific default mechanisms. See the provider's documentation for
  14. // details.
  15. package gocdk_pub_sub
  16. import (
  17. "context"
  18. "fmt"
  19. "github.com/golang/protobuf/proto"
  20. "github.com/streadway/amqp"
  21. "gocloud.dev/pubsub"
  22. _ "gocloud.dev/pubsub/awssnssqs"
  23. "gocloud.dev/pubsub/rabbitpubsub"
  24. "net/url"
  25. "path"
  26. "time"
  27. "github.com/chrislusf/seaweedfs/weed/glog"
  28. "github.com/chrislusf/seaweedfs/weed/notification"
  29. "github.com/chrislusf/seaweedfs/weed/util"
  30. // _ "gocloud.dev/pubsub/azuresb"
  31. _ "gocloud.dev/pubsub/gcppubsub"
  32. _ "gocloud.dev/pubsub/natspubsub"
  33. _ "gocloud.dev/pubsub/rabbitpubsub"
  34. "os"
  35. )
  36. func init() {
  37. notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
  38. }
  39. func getPath(rawUrl string) string {
  40. parsedUrl, _ := url.Parse(rawUrl)
  41. return path.Join(parsedUrl.Host, parsedUrl.Path)
  42. }
  43. type GoCDKPubSub struct {
  44. topicURL string
  45. topic *pubsub.Topic
  46. }
  47. func (k *GoCDKPubSub) GetName() string {
  48. return "gocdk_pub_sub"
  49. }
  50. func (k *GoCDKPubSub) doReconnect() {
  51. var conn *amqp.Connection
  52. if k.topic.As(&conn) {
  53. go func() {
  54. <-conn.NotifyClose(make(chan *amqp.Error))
  55. conn.Close()
  56. k.topic.Shutdown(context.Background())
  57. for {
  58. glog.Info("Try reconnect")
  59. conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
  60. if err == nil {
  61. k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
  62. k.doReconnect()
  63. break
  64. }
  65. glog.Error(err)
  66. time.Sleep(time.Second)
  67. }
  68. }()
  69. }
  70. }
  71. func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
  72. k.topicURL = configuration.GetString(prefix + "topic_url")
  73. glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
  74. topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
  75. if err != nil {
  76. glog.Fatalf("Failed to open topic: %v", err)
  77. }
  78. k.topic = topic
  79. k.doReconnect()
  80. return nil
  81. }
  82. func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
  83. bytes, err := proto.Marshal(message)
  84. if err != nil {
  85. return err
  86. }
  87. err = k.topic.Send(context.Background(), &pubsub.Message{
  88. Body: bytes,
  89. Metadata: map[string]string{"key": key},
  90. })
  91. if err != nil {
  92. return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
  93. }
  94. return nil
  95. }