grpc_client_server.go 3.3 KB

  1. package util
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. ""
  11. ""
  12. )
  13. var (
  14. // cache grpc connections
  15. grpcClients = make(map[string]*grpc.ClientConn)
  16. grpcClientsLock sync.Mutex
  17. )
  18. func init() {
  19. http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
  20. }
  21. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  22. var options []grpc.ServerOption
  23. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  24. Time: 10 * time.Second, // wait time before ping if no activity
  25. Timeout: 20 * time.Second, // ping timeout
  26. }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  27. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  28. }))
  29. for _, opt := range opts {
  30. if opt != nil {
  31. options = append(options, opt)
  32. }
  33. }
  34. return grpc.NewServer(options...)
  35. }
  36. func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  37. // opts = append(opts, grpc.WithBlock())
  38. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  39. var options []grpc.DialOption
  40. options = append(options,
  41. // grpc.WithInsecure(),
  42. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  43. Time: 30 * time.Second, // client ping server if no activity for this long
  44. Timeout: 20 * time.Second,
  45. }))
  46. for _, opt := range opts {
  47. if opt != nil {
  48. options = append(options, opt)
  49. }
  50. }
  51. return grpc.DialContext(ctx, address, options...)
  52. }
  53. func WithCachedGrpcClient(ctx context.Context, fn func(context.Context, *grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  54. grpcClientsLock.Lock()
  55. existingConnection, found := grpcClients[address]
  56. if found {
  57. grpcClientsLock.Unlock()
  58. err := fn(ctx, existingConnection)
  59. if err != nil {
  60. grpcClientsLock.Lock()
  61. delete(grpcClients, address)
  62. grpcClientsLock.Unlock()
  63. existingConnection.Close()
  64. }
  65. return err
  66. }
  67. grpcConnection, err := GrpcDial(ctx, address, opts...)
  68. if err != nil {
  69. grpcClientsLock.Unlock()
  70. return fmt.Errorf("fail to dial %s: %v", address, err)
  71. }
  72. grpcClients[address] = grpcConnection
  73. grpcClientsLock.Unlock()
  74. err = fn(ctx, grpcConnection)
  75. if err != nil {
  76. grpcClientsLock.Lock()
  77. delete(grpcClients, address)
  78. grpcClientsLock.Unlock()
  79. grpcConnection.Close()
  80. }
  81. return err
  82. }
  83. func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
  84. colonIndex := strings.LastIndex(server, ":")
  85. if colonIndex < 0 {
  86. return "", fmt.Errorf("server should have hostname:port format: %v", server)
  87. }
  88. port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
  89. if parseErr != nil {
  90. return "", fmt.Errorf("server port parse error: %v", parseErr)
  91. }
  92. grpcPort := int(port) + 10000
  93. return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
  94. }
  95. func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
  96. hostnameAndPort := strings.Split(server, ":")
  97. if len(hostnameAndPort) != 2 {
  98. return fmt.Sprintf("unexpected server address: %s", server)
  99. }
  100. port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  101. if parseErr != nil {
  102. return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
  103. }
  104. grpcPort := int(port) + 10000
  105. return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
  106. }