123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- package pub_client
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- )
- func (p *TopicPublisher) doLookup(brokerAddress string) error {
- err := pb.WithBrokerGrpcClient(true,
- brokerAddress,
- p.grpcDialOption,
- func(client mq_pb.SeaweedMessagingClient) error {
- lookupResp, err := client.LookupTopicBrokers(context.Background(),
- &mq_pb.LookupTopicBrokersRequest{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- IsForPublish: true,
- })
- if err != nil {
- return err
- }
- for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
- // partition => publishClient
- publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
- if err != nil {
- return err
- }
- for redirectTo != "" {
- publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
- if err != nil {
- return err
- }
- }
- p.partition2Broker.Insert(
- brokerPartitionAssignment.Partition.RangeStart,
- brokerPartitionAssignment.Partition.RangeStop,
- publishClient)
- }
- return nil
- })
- if err != nil {
- return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
- }
- return nil
- }
- // broker => publish client
- // send init message
- // save the publishing client
- func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
- }
- publishClient = &PublishClient{
- SeaweedMessaging_PublishClient: stream,
- Broker: brokerAddress,
- }
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- },
- AckInterval: 128,
- },
- },
- }); err != nil {
- return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
- }
- resp, err := stream.Recv()
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
- }
- if resp.RedirectToBroker != "" {
- redirectTo = resp.RedirectToBroker
- return publishClient, redirectTo, nil
- }
- go func() {
- for {
- _, err := publishClient.Recv()
- if err != nil {
- e, ok := status.FromError(err)
- if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
- return
- }
- publishClient.Err = err
- fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
- return
- }
- }
- }()
- return publishClient, redirectTo, nil
- }
|