msg_broker.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "google.golang.org/grpc/reflection"
  8. "github.com/chrislusf/seaweedfs/weed/pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. weed_server "github.com/chrislusf/seaweedfs/weed/server"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. messageBrokerStandaloneOptions QueueOptions
  18. )
  19. type QueueOptions struct {
  20. filer *string
  21. port *int
  22. defaultTtl *string
  23. }
  24. func init() {
  25. cmdMsgBroker.Run = runMsgBroker // break init cycle
  26. messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
  27. messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
  28. messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  29. }
  30. var cmdMsgBroker = &Command{
  31. UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
  32. Short: "<WIP> start a message queue broker",
  33. Long: `start a message queue broker
  34. The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
  35. The brokers are stateless. To scale up, just add more brokers.
  36. `,
  37. }
  38. func runMsgBroker(cmd *Command, args []string) bool {
  39. util.LoadConfiguration("security", false)
  40. return messageBrokerStandaloneOptions.startQueueServer()
  41. }
  42. func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
  43. filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
  44. if err != nil {
  45. glog.Fatal(err)
  46. return false
  47. }
  48. filerQueuesPath := "/queues"
  49. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  50. for {
  51. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  52. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  53. if err != nil {
  54. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  55. }
  56. filerQueuesPath = resp.DirQueues
  57. glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
  58. return nil
  59. })
  60. if err != nil {
  61. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  62. time.Sleep(time.Second)
  63. } else {
  64. glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
  65. break
  66. }
  67. }
  68. qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
  69. Filers: []string{*msgBrokerOpt.filer},
  70. DefaultReplication: "",
  71. MaxMB: 0,
  72. Port: *msgBrokerOpt.port,
  73. })
  74. // start grpc listener
  75. grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
  76. if err != nil {
  77. glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
  78. }
  79. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
  80. queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
  81. reflection.Register(grpcS)
  82. grpcS.Serve(grpcL)
  83. return true
  84. }