broker_stats.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "io"
  11. "math/rand"
  12. "time"
  13. )
  14. // BrokerConnectToBalancer connects to the broker balancer and sends stats
  15. func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
  16. // find the lock owner
  17. var brokerBalancer string
  18. err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  19. resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
  20. Name: pub_balancer.LockBrokerBalancer,
  21. })
  22. if err != nil {
  23. return err
  24. }
  25. brokerBalancer = resp.Owner
  26. return nil
  27. })
  28. if err != nil {
  29. return err
  30. }
  31. b.currentBalancer = pb.ServerAddress(brokerBalancer)
  32. glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
  33. if brokerBalancer == "" {
  34. return fmt.Errorf("no balancer found")
  35. }
  36. // connect to the lock owner
  37. err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  38. stream, err := client.PublisherToPubBalancer(context.Background())
  39. if err != nil {
  40. return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
  41. }
  42. defer stream.CloseSend()
  43. err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
  44. Message: &mq_pb.PublisherToPubBalancerRequest_Init{
  45. Init: &mq_pb.PublisherToPubBalancerRequest_InitMessage{
  46. Broker: self,
  47. },
  48. },
  49. })
  50. if err != nil {
  51. return fmt.Errorf("send init message: %v", err)
  52. }
  53. for {
  54. stats := b.localTopicManager.CollectStats(time.Second * 5)
  55. err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
  56. Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
  57. Stats: stats,
  58. },
  59. })
  60. if err != nil {
  61. if err == io.EOF {
  62. return err
  63. }
  64. return fmt.Errorf("send stats message: %v", err)
  65. }
  66. glog.V(3).Infof("sent stats: %+v", stats)
  67. time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
  68. }
  69. return nil
  70. })
  71. return err
  72. }