gocdk_pub_sub.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. // Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
  2. // which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
  3. // Google Cloud PubSub, and RabbitMQ.
  4. //
  5. // In the config, select a provider and topic using a URL. See
  6. // https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
  7. //
  8. // The Go CDK PubSub API does not support administrative operations like topic
  9. // creation. Create the topic using a UI, CLI or provider-specific API before running
  10. // weed.
  11. //
  12. // The Go CDK obtains credentials via environment variables and other
  13. // provider-specific default mechanisms. See the provider's documentation for
  14. // details.
  15. package gocdk_pub_sub
  16. import (
  17. "context"
  18. "fmt"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/notification"
  21. "github.com/chrislusf/seaweedfs/weed/util"
  22. "github.com/golang/protobuf/proto"
  23. "gocloud.dev/pubsub"
  24. _ "gocloud.dev/pubsub/awssnssqs"
  25. // _ "gocloud.dev/pubsub/azuresb"
  26. _ "gocloud.dev/pubsub/gcppubsub"
  27. _ "gocloud.dev/pubsub/natspubsub"
  28. _ "gocloud.dev/pubsub/rabbitpubsub"
  29. )
  30. func init() {
  31. notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
  32. }
  33. type GoCDKPubSub struct {
  34. topicURL string
  35. topic *pubsub.Topic
  36. }
  37. func (k *GoCDKPubSub) GetName() string {
  38. return "gocdk_pub_sub"
  39. }
  40. func (k *GoCDKPubSub) Initialize(config util.Configuration) error {
  41. k.topicURL = config.GetString("topic_url")
  42. glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
  43. topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
  44. if err != nil {
  45. glog.Fatalf("Failed to open topic: %v", err)
  46. }
  47. k.topic = topic
  48. return nil
  49. }
  50. func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
  51. bytes, err := proto.Marshal(message)
  52. if err != nil {
  53. return err
  54. }
  55. ctx := context.Background()
  56. err = k.topic.Send(ctx, &pubsub.Message{
  57. Body: bytes,
  58. Metadata: map[string]string{"key": key},
  59. })
  60. if err != nil {
  61. return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
  62. }
  63. return nil
  64. }