google_pub_sub.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package google_pub_sub
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "cloud.google.com/go/pubsub"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/notification"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/api/option"
  11. "google.golang.org/protobuf/proto"
  12. )
  13. func init() {
  14. notification.MessageQueues = append(notification.MessageQueues, &GooglePubSub{})
  15. }
  16. type GooglePubSub struct {
  17. topic *pubsub.Topic
  18. }
  19. func (k *GooglePubSub) GetName() string {
  20. return "google_pub_sub"
  21. }
  22. func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) {
  23. glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
  24. glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
  25. return k.initialize(
  26. configuration.GetString(prefix+"google_application_credentials"),
  27. configuration.GetString(prefix+"project_id"),
  28. configuration.GetString(prefix+"topic"),
  29. )
  30. }
  31. func (k *GooglePubSub) initialize(google_application_credentials, projectId, topicName string) (err error) {
  32. ctx := context.Background()
  33. // Creates a client.
  34. if google_application_credentials == "" {
  35. var found bool
  36. google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
  37. if !found {
  38. glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
  39. }
  40. }
  41. client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials))
  42. if err != nil {
  43. glog.Fatalf("Failed to create client: %v", err)
  44. }
  45. k.topic = client.Topic(topicName)
  46. if exists, err := k.topic.Exists(ctx); err == nil {
  47. if !exists {
  48. k.topic, err = client.CreateTopic(ctx, topicName)
  49. if err != nil {
  50. glog.Fatalf("Failed to create topic %s: %v", topicName, err)
  51. }
  52. }
  53. } else {
  54. glog.Fatalf("Failed to check topic %s: %v", topicName, err)
  55. }
  56. return nil
  57. }
  58. func (k *GooglePubSub) SendMessage(key string, message proto.Message) (err error) {
  59. bytes, err := proto.Marshal(message)
  60. if err != nil {
  61. return
  62. }
  63. ctx := context.Background()
  64. result := k.topic.Publish(ctx, &pubsub.Message{
  65. Data: bytes,
  66. Attributes: map[string]string{"key": key},
  67. })
  68. _, err = result.Get(ctx)
  69. if err != nil {
  70. return fmt.Errorf("send message to google pub sub %s: %v", k.topic.String(), err)
  71. }
  72. return nil
  73. }