msg_broker.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "google.golang.org/grpc/reflection"
  8. "github.com/chrislusf/seaweedfs/weed/util/grace"
  9. "github.com/chrislusf/seaweedfs/weed/util/log"
  10. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. var (
  18. messageBrokerStandaloneOptions MessageBrokerOptions
  19. )
  20. type MessageBrokerOptions struct {
  21. filer *string
  22. ip *string
  23. port *int
  24. cpuprofile *string
  25. memprofile *string
  26. }
  27. func init() {
  28. cmdMsgBroker.Run = runMsgBroker // break init cycle
  29. messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
  30. messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
  31. messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
  32. messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
  33. messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
  34. }
  35. var cmdMsgBroker = &Command{
  36. UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
  37. Short: "start a message queue broker",
  38. Long: `start a message queue broker
  39. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
  40. The brokers are stateless. To scale up, just add more brokers.
  41. `,
  42. }
  43. func runMsgBroker(cmd *Command, args []string) bool {
  44. util.LoadConfiguration("security", false)
  45. return messageBrokerStandaloneOptions.startQueueServer()
  46. }
  47. func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
  48. grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
  49. filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
  50. if err != nil {
  51. log.Fatal(err)
  52. return false
  53. }
  54. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
  55. cipher := false
  56. for {
  57. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  58. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  59. if err != nil {
  60. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  61. }
  62. cipher = resp.Cipher
  63. return nil
  64. })
  65. if err != nil {
  66. log.Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  67. time.Sleep(time.Second)
  68. } else {
  69. log.Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  70. break
  71. }
  72. }
  73. qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
  74. Filers: []string{*msgBrokerOpt.filer},
  75. DefaultReplication: "",
  76. MaxMB: 0,
  77. Ip: *msgBrokerOpt.ip,
  78. Port: *msgBrokerOpt.port,
  79. Cipher: cipher,
  80. }, grpcDialOption)
  81. // start grpc listener
  82. grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
  83. if err != nil {
  84. log.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
  85. }
  86. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
  87. messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
  88. reflection.Register(grpcS)
  89. grpcS.Serve(grpcL)
  90. return true
  91. }