grpc_client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. connectionPool = make(map[string]*util.ResourcePool)
  18. connectionPoolLock sync.Mutex
  19. )
  20. func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
  21. ctx := context.Background()
  22. grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
  23. if err != nil {
  24. return err
  25. }
  26. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  27. client := volume_server_pb.NewVolumeServerClient(grpcConnection)
  28. return fn(ctx2, client)
  29. }, grpcAddress, grpcDialOption)
  30. }
  31. func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
  32. sepIndex := strings.LastIndex(volumeServer, ":")
  33. port, err := strconv.Atoi(volumeServer[sepIndex+1:])
  34. if err != nil {
  35. glog.Errorf("failed to parse volume server address: %v", volumeServer)
  36. return "", err
  37. }
  38. return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
  39. }
  40. func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) error) error {
  41. tcpAddress, err := toVolumeServerTcpAddress(volumeServer)
  42. if err != nil {
  43. return err
  44. }
  45. conn, err := getConnection(tcpAddress)
  46. if err != nil {
  47. return err
  48. }
  49. defer releaseConnection(conn, tcpAddress)
  50. err = fn(conn)
  51. return err
  52. }
  53. func getConnection(tcpAddress string) (net.Conn, error) {
  54. connectionPoolLock.Lock()
  55. defer connectionPoolLock.Unlock()
  56. pool, found := connectionPool[tcpAddress]
  57. if !found {
  58. println("creating pool for", tcpAddress)
  59. raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
  60. if err != nil {
  61. glog.Fatal(err)
  62. }
  63. pool = util.NewResourcePool(16, func() (interface{}, error) {
  64. conn, err := net.DialTCP("tcp", nil, raddr)
  65. if err != nil {
  66. return conn, err
  67. }
  68. conn.SetKeepAlive(true)
  69. conn.SetNoDelay(true)
  70. println("connected", tcpAddress, "=>", conn.LocalAddr().String())
  71. return conn, nil
  72. })
  73. connectionPool[tcpAddress] = pool
  74. }
  75. connObj, err := pool.Get(time.Minute)
  76. if err != nil {
  77. return nil, err
  78. }
  79. // println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
  80. return connObj.(net.Conn), nil
  81. }
  82. func releaseConnection(conn net.Conn, tcpAddress string) {
  83. connectionPoolLock.Lock()
  84. defer connectionPoolLock.Unlock()
  85. pool, found := connectionPool[tcpAddress]
  86. if !found {
  87. println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
  88. return
  89. }
  90. pool.Release(conn)
  91. // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
  92. }
  93. func toVolumeServerTcpAddress(volumeServer string) (grpcAddress string, err error) {
  94. sepIndex := strings.LastIndex(volumeServer, ":")
  95. port, err := strconv.Atoi(volumeServer[sepIndex+1:])
  96. if err != nil {
  97. glog.Errorf("failed to parse volume server address: %v", volumeServer)
  98. return "", err
  99. }
  100. return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+20000), nil
  101. }
  102. func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
  103. ctx := context.Background()
  104. masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
  105. if parseErr != nil {
  106. return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
  107. }
  108. return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
  109. client := master_pb.NewSeaweedClient(grpcConnection)
  110. return fn(ctx2, client)
  111. }, masterGrpcAddress, grpcDialOption)
  112. }