grpc_client_server.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package util
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/keepalive"
  12. )
  13. var (
  14. // cache grpc connections
  15. grpcClients = make(map[string]*grpc.ClientConn)
  16. grpcClientsLock sync.Mutex
  17. )
  18. func init() {
  19. http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
  20. }
  21. func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
  22. var options []grpc.ServerOption
  23. options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
  24. Time: 10 * time.Second, // wait time before ping if no activity
  25. Timeout: 20 * time.Second, // ping timeout
  26. }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  27. MinTime: 60 * time.Second, // min time a client should wait before sending a ping
  28. }))
  29. for _, opt := range opts {
  30. if opt != nil {
  31. options = append(options, opt)
  32. }
  33. }
  34. return grpc.NewServer(options...)
  35. }
  36. func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  37. // opts = append(opts, grpc.WithBlock())
  38. // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
  39. var options []grpc.DialOption
  40. options = append(options,
  41. // grpc.WithInsecure(),
  42. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  43. Time: 30 * time.Second, // client ping server if no activity for this long
  44. Timeout: 20 * time.Second,
  45. }))
  46. for _, opt := range opts {
  47. if opt != nil {
  48. options = append(options, opt)
  49. }
  50. }
  51. return grpc.DialContext(ctx, address, options...)
  52. }
  53. func WithCachedGrpcClient(ctx context.Context, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
  54. grpcClientsLock.Lock()
  55. existingConnection, found := grpcClients[address]
  56. if found {
  57. grpcClientsLock.Unlock()
  58. return fn(existingConnection)
  59. }
  60. grpcConnection, err := GrpcDial(ctx, address, opts...)
  61. if err != nil {
  62. grpcClientsLock.Unlock()
  63. return fmt.Errorf("fail to dial %s: %v", address, err)
  64. }
  65. grpcClients[address] = grpcConnection
  66. grpcClientsLock.Unlock()
  67. err = fn(grpcConnection)
  68. if err != nil {
  69. grpcClientsLock.Lock()
  70. delete(grpcClients, address)
  71. grpcClientsLock.Unlock()
  72. grpcConnection.Close()
  73. }
  74. return err
  75. }
  76. func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
  77. colonIndex := strings.LastIndex(server, ":")
  78. if colonIndex < 0 {
  79. return "", fmt.Errorf("server should have hostname:port format: %v", server)
  80. }
  81. port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
  82. if parseErr != nil {
  83. return "", fmt.Errorf("server port parse error: %v", parseErr)
  84. }
  85. grpcPort := int(port) + 10000
  86. return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
  87. }
  88. func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
  89. hostnameAndPort := strings.Split(server, ":")
  90. if len(hostnameAndPort) != 2 {
  91. return fmt.Sprintf("unexpected server address: %s", server)
  92. }
  93. port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
  94. if parseErr != nil {
  95. return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
  96. }
  97. grpcPort := int(port) + 10000
  98. return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
  99. }