msg_broker_server.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. type MessageBrokerOption struct {
  14. Filers []string
  15. DefaultReplication string
  16. MaxMB int
  17. Port int
  18. }
  19. type MessageBroker struct {
  20. option *MessageBrokerOption
  21. grpcDialOption grpc.DialOption
  22. }
  23. func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) {
  24. messageBroker = &MessageBroker{
  25. option: option,
  26. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"),
  27. }
  28. go messageBroker.loopForEver()
  29. return messageBroker, nil
  30. }
  31. func (broker *MessageBroker) loopForEver() {
  32. for {
  33. broker.checkPeers()
  34. time.Sleep(3 * time.Second)
  35. }
  36. }
  37. func (broker *MessageBroker) checkPeers() {
  38. // contact a filer about masters
  39. var masters []string
  40. for _, filer := range broker.option.Filers {
  41. err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
  42. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  43. if err != nil {
  44. return err
  45. }
  46. masters = append(masters, resp.Masters...)
  47. return nil
  48. })
  49. if err != nil {
  50. fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
  51. return
  52. }
  53. }
  54. // contact each masters for filers
  55. var filers []string
  56. for _, master := range masters {
  57. err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
  58. resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
  59. ClientType: "filer",
  60. })
  61. if err != nil {
  62. return err
  63. }
  64. fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
  65. filers = append(filers, resp.GrpcAddresses...)
  66. return nil
  67. })
  68. if err != nil {
  69. fmt.Printf("failed to list filers: %v\n", err)
  70. return
  71. }
  72. }
  73. // contact each filer about brokers
  74. for _, filer := range filers {
  75. err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
  76. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  77. if err != nil {
  78. return err
  79. }
  80. masters = append(masters, resp.Masters...)
  81. return nil
  82. })
  83. if err != nil {
  84. fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
  85. return
  86. }
  87. }
  88. }
  89. func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
  90. return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
  91. }
  92. func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
  93. return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
  94. return fn(client)
  95. })
  96. }