client.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. package msgclient
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "google.golang.org/grpc"
  7. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  8. "github.com/chrislusf/seaweedfs/weed/pb"
  9. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. type MessagingClient struct {
  14. bootstrapBrokers []string
  15. grpcConnections map[broker.TopicPartition]*grpc.ClientConn
  16. grpcDialOption grpc.DialOption
  17. }
  18. func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
  19. return &MessagingClient{
  20. bootstrapBrokers: bootstrapBrokers,
  21. grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
  22. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
  23. }
  24. }
  25. func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
  26. for _, broker := range mc.bootstrapBrokers {
  27. grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
  28. if err != nil {
  29. log.Printf("dial broker %s: %v", broker, err)
  30. continue
  31. }
  32. defer grpcConnection.Close()
  33. resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
  34. &messaging_pb.FindBrokerRequest{
  35. Namespace: tp.Namespace,
  36. Topic: tp.Topic,
  37. Parition: tp.Partition,
  38. })
  39. if err != nil {
  40. return nil, err
  41. }
  42. targetBroker := resp.Broker
  43. return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
  44. }
  45. return nil, fmt.Errorf("no broker found for %+v", tp)
  46. }