broker_server.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/cluster"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. )
  17. type MessageQueueBrokerOption struct {
  18. Masters map[string]pb.ServerAddress
  19. FilerGroup string
  20. DataCenter string
  21. Rack string
  22. DefaultReplication string
  23. MaxMB int
  24. Ip string
  25. Port int
  26. Cipher bool
  27. }
  28. type MessageQueueBroker struct {
  29. mq_pb.UnimplementedSeaweedMessagingServer
  30. option *MessageQueueBrokerOption
  31. grpcDialOption grpc.DialOption
  32. MasterClient *wdclient.MasterClient
  33. filers map[pb.ServerAddress]struct{}
  34. currentFiler pb.ServerAddress
  35. localTopicManager *topic.LocalTopicManager
  36. Balancer *pub_balancer.Balancer
  37. lockAsBalancer *cluster.LiveLock
  38. currentBalancer pb.ServerAddress
  39. Coordinator *sub_coordinator.Coordinator
  40. }
  41. func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
  42. pub_broker_balancer := pub_balancer.NewBalancer()
  43. mqBroker = &MessageQueueBroker{
  44. option: option,
  45. grpcDialOption: grpcDialOption,
  46. MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
  47. filers: make(map[pb.ServerAddress]struct{}),
  48. localTopicManager: topic.NewLocalTopicManager(),
  49. Balancer: pub_broker_balancer,
  50. Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer),
  51. }
  52. mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
  53. go mqBroker.MasterClient.KeepConnectedToMaster()
  54. existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
  55. for _, newNode := range existingNodes {
  56. mqBroker.OnBrokerUpdate(newNode, time.Now())
  57. }
  58. // keep connecting to balancer
  59. go func() {
  60. for mqBroker.currentFiler == "" {
  61. time.Sleep(time.Millisecond * 237)
  62. }
  63. self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
  64. glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
  65. lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
  66. mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, self)
  67. for {
  68. err := mqBroker.BrokerConnectToBalancer(self)
  69. if err != nil {
  70. fmt.Printf("BrokerConnectToBalancer: %v\n", err)
  71. }
  72. time.Sleep(time.Second)
  73. }
  74. }()
  75. return mqBroker, nil
  76. }
  77. func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  78. if update.NodeType != cluster.FilerType {
  79. return
  80. }
  81. address := pb.ServerAddress(update.Address)
  82. if update.IsAdd {
  83. b.filers[address] = struct{}{}
  84. if b.currentFiler == "" {
  85. b.currentFiler = address
  86. }
  87. } else {
  88. delete(b.filers, address)
  89. if b.currentFiler == address {
  90. for filer := range b.filers {
  91. b.currentFiler = filer
  92. break
  93. }
  94. }
  95. }
  96. }
  97. func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
  98. return b.currentFiler
  99. }
  100. func (b *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  101. return pb.WithFilerClient(streamingMode, 0, b.GetFiler(), b.grpcDialOption, fn)
  102. }
  103. func (b *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
  104. return location.Url
  105. }
  106. func (b *MessageQueueBroker) GetDataCenter() string {
  107. return ""
  108. }
  109. func (b *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  110. return pb.WithMasterClient(streamingMode, master, b.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  111. return fn(client)
  112. })
  113. }
  114. func (b *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
  115. return pb.WithBrokerGrpcClient(streamingMode, server.String(), b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  116. return fn(client)
  117. })
  118. }