12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package pub_client
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "log"
- )
- // broker => publish client
- // send init message
- // save the publishing client
- func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
- log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
- if err != nil {
- return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
- if err != nil {
- return publishClient, fmt.Errorf("create publish client: %v", err)
- }
- publishClient = &PublishClient{
- SeaweedMessaging_PublishClient: stream,
- Broker: brokerAddress,
- }
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- },
- AckInterval: 128,
- },
- },
- }); err != nil {
- return publishClient, fmt.Errorf("send init message: %v", err)
- }
- resp, err := stream.Recv()
- if err != nil {
- return publishClient, fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return publishClient, fmt.Errorf("init response error: %v", resp.Error)
- }
- go func() {
- for {
- _, err := publishClient.Recv()
- if err != nil {
- e, ok := status.FromError(err)
- if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
- return
- }
- publishClient.Err = err
- fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
- return
- }
- }
- }()
- return publishClient, nil
- }
|