connect.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "google.golang.org/grpc/codes"
  8. "google.golang.org/grpc/status"
  9. "log"
  10. )
  11. // broker => publish client
  12. // send init message
  13. // save the publishing client
  14. func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
  15. log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
  16. grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
  17. if err != nil {
  18. return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
  19. }
  20. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  21. stream, err := brokerClient.Publish(context.Background())
  22. if err != nil {
  23. return publishClient, fmt.Errorf("create publish client: %v", err)
  24. }
  25. publishClient = &PublishClient{
  26. SeaweedMessaging_PublishClient: stream,
  27. Broker: brokerAddress,
  28. }
  29. if err = publishClient.Send(&mq_pb.PublishRequest{
  30. Message: &mq_pb.PublishRequest_Init{
  31. Init: &mq_pb.PublishRequest_InitMessage{
  32. Topic: &mq_pb.Topic{
  33. Namespace: p.namespace,
  34. Name: p.topic,
  35. },
  36. Partition: &mq_pb.Partition{
  37. RingSize: partition.RingSize,
  38. RangeStart: partition.RangeStart,
  39. RangeStop: partition.RangeStop,
  40. },
  41. AckInterval: 128,
  42. },
  43. },
  44. }); err != nil {
  45. return publishClient, fmt.Errorf("send init message: %v", err)
  46. }
  47. resp, err := stream.Recv()
  48. if err != nil {
  49. return publishClient, fmt.Errorf("recv init response: %v", err)
  50. }
  51. if resp.Error != "" {
  52. return publishClient, fmt.Errorf("init response error: %v", resp.Error)
  53. }
  54. go func() {
  55. for {
  56. _, err := publishClient.Recv()
  57. if err != nil {
  58. e, ok := status.FromError(err)
  59. if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
  60. return
  61. }
  62. publishClient.Err = err
  63. fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
  64. return
  65. }
  66. }
  67. }()
  68. return publishClient, nil
  69. }