broker_server.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/cluster"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. )
  16. type MessageQueueBrokerOption struct {
  17. Masters map[string]pb.ServerAddress
  18. FilerGroup string
  19. DataCenter string
  20. Rack string
  21. DefaultReplication string
  22. MaxMB int
  23. Ip string
  24. Port int
  25. Cipher bool
  26. }
  27. type MessageQueueBroker struct {
  28. mq_pb.UnimplementedSeaweedMessagingServer
  29. option *MessageQueueBrokerOption
  30. grpcDialOption grpc.DialOption
  31. MasterClient *wdclient.MasterClient
  32. filers map[pb.ServerAddress]struct{}
  33. currentFiler pb.ServerAddress
  34. localTopicManager *topic.LocalTopicManager
  35. Balancer *balancer.Balancer
  36. lockAsBalancer *cluster.LiveLock
  37. currentBalancer pb.ServerAddress
  38. }
  39. func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
  40. mqBroker = &MessageQueueBroker{
  41. option: option,
  42. grpcDialOption: grpcDialOption,
  43. MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
  44. filers: make(map[pb.ServerAddress]struct{}),
  45. localTopicManager: topic.NewLocalTopicManager(),
  46. Balancer: balancer.NewBalancer(),
  47. }
  48. mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
  49. go mqBroker.MasterClient.KeepConnectedToMaster()
  50. existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
  51. for _, newNode := range existingNodes {
  52. mqBroker.OnBrokerUpdate(newNode, time.Now())
  53. }
  54. // keep connecting to balancer
  55. go func() {
  56. for mqBroker.currentFiler == "" {
  57. time.Sleep(time.Millisecond * 237)
  58. }
  59. self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
  60. glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
  61. lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
  62. mqBroker.lockAsBalancer = lockClient.StartLock(balancer.LockBrokerBalancer, self)
  63. for {
  64. err := mqBroker.BrokerConnectToBalancer(self)
  65. if err != nil {
  66. fmt.Printf("BrokerConnectToBalancer: %v\n", err)
  67. }
  68. time.Sleep(time.Second)
  69. }
  70. }()
  71. return mqBroker, nil
  72. }
  73. func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  74. if update.NodeType != cluster.FilerType {
  75. return
  76. }
  77. address := pb.ServerAddress(update.Address)
  78. if update.IsAdd {
  79. broker.filers[address] = struct{}{}
  80. if broker.currentFiler == "" {
  81. broker.currentFiler = address
  82. }
  83. } else {
  84. delete(broker.filers, address)
  85. if broker.currentFiler == address {
  86. for filer := range broker.filers {
  87. broker.currentFiler = filer
  88. break
  89. }
  90. }
  91. }
  92. }
  93. func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
  94. return broker.currentFiler
  95. }
  96. func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  97. return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
  98. }
  99. func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
  100. return location.Url
  101. }
  102. func (broker *MessageQueueBroker) GetDataCenter() string {
  103. return ""
  104. }
  105. func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  106. return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  107. return fn(client)
  108. })
  109. }
  110. func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
  111. return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  112. return fn(client)
  113. })
  114. }