broker_server.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package broker
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  5. "time"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. )
  12. type MessageBrokerOption struct {
  13. Filers []pb.ServerAddress
  14. DefaultReplication string
  15. MaxMB int
  16. Ip string
  17. Port int
  18. Cipher bool
  19. }
  20. type MessageBroker struct {
  21. messaging_pb.UnimplementedSeaweedMessagingServer
  22. option *MessageBrokerOption
  23. grpcDialOption grpc.DialOption
  24. topicManager *TopicManager
  25. }
  26. func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
  27. messageBroker = &MessageBroker{
  28. option: option,
  29. grpcDialOption: grpcDialOption,
  30. }
  31. messageBroker.topicManager = NewTopicManager(messageBroker)
  32. messageBroker.checkFilers()
  33. go messageBroker.keepConnectedToOneFiler()
  34. return messageBroker, nil
  35. }
  36. func (broker *MessageBroker) keepConnectedToOneFiler() {
  37. for {
  38. for _, filer := range broker.option.Filers {
  39. broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
  40. ctx, cancel := context.WithCancel(context.Background())
  41. defer cancel()
  42. stream, err := client.KeepConnected(ctx)
  43. if err != nil {
  44. glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  45. return err
  46. }
  47. initRequest := &filer_pb.KeepConnectedRequest{
  48. Name: broker.option.Ip,
  49. GrpcPort: uint32(broker.option.Port),
  50. }
  51. for _, tp := range broker.topicManager.ListTopicPartitions() {
  52. initRequest.Resources = append(initRequest.Resources, tp.String())
  53. }
  54. if err := stream.Send(&filer_pb.KeepConnectedRequest{
  55. Name: broker.option.Ip,
  56. GrpcPort: uint32(broker.option.Port),
  57. }); err != nil {
  58. glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  59. return err
  60. }
  61. // TODO send events of adding/removing topics
  62. glog.V(0).Infof("conntected with filer: %v", filer)
  63. for {
  64. if err := stream.Send(&filer_pb.KeepConnectedRequest{
  65. Name: broker.option.Ip,
  66. GrpcPort: uint32(broker.option.Port),
  67. }); err != nil {
  68. glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  69. return err
  70. }
  71. // println("send heartbeat")
  72. if _, err := stream.Recv(); err != nil {
  73. glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
  74. return err
  75. }
  76. // println("received reply")
  77. time.Sleep(11 * time.Second)
  78. // println("woke up")
  79. }
  80. return nil
  81. })
  82. time.Sleep(3 * time.Second)
  83. }
  84. }
  85. }
  86. func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
  87. return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
  88. }
  89. func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
  90. return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
  91. return fn(client)
  92. })
  93. }