server.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package fakegrpclb provides a fake implementation of the grpclb server.
  19. package fakegrpclb
  20. import (
  21. "errors"
  22. "fmt"
  23. "io"
  24. "net"
  25. "strconv"
  26. "sync"
  27. "time"
  28. "google.golang.org/grpc"
  29. lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  30. lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/grpclog"
  33. "google.golang.org/grpc/internal/pretty"
  34. "google.golang.org/grpc/status"
  35. )
  36. var logger = grpclog.Component("fake_grpclb")
  37. // ServerParams wraps options passed while creating a Server.
  38. type ServerParams struct {
  39. ListenPort int // Listening port for the balancer server.
  40. ServerOptions []grpc.ServerOption // gRPC options for the balancer server.
  41. LoadBalancedServiceName string // Service name being load balanced for.
  42. LoadBalancedServicePort int // Service port being load balanced for.
  43. BackendAddresses []string // Service backends to balance load across.
  44. ShortStream bool // End balancer stream after sending server list.
  45. }
  46. // Server is a fake implementation of the grpclb LoadBalancer service. It does
  47. // not support stats reporting from clients, and always sends back a static list
  48. // of backends to the client to balance load across.
  49. //
  50. // It is safe for concurrent access.
  51. type Server struct {
  52. lbgrpc.UnimplementedLoadBalancerServer
  53. // Options copied over from ServerParams passed to NewServer.
  54. sOpts []grpc.ServerOption // gRPC server options.
  55. serviceName string // Service name being load balanced for.
  56. servicePort int // Service port being load balanced for.
  57. shortStream bool // End balancer stream after sending server list.
  58. // Values initialized using ServerParams passed to NewServer.
  59. backends []*lbpb.Server // Service backends to balance load across.
  60. lis net.Listener // Listener for grpc connections to the LoadBalancer service.
  61. // mu guards access to below fields.
  62. mu sync.Mutex
  63. grpcServer *grpc.Server // Underlying grpc server.
  64. address string // Actual listening address.
  65. stopped chan struct{} // Closed when Stop() is called.
  66. }
  67. // NewServer creates a new Server with passed in params. Returns a non-nil error
  68. // if the params are invalid.
  69. func NewServer(params ServerParams) (*Server, error) {
  70. var servers []*lbpb.Server
  71. for _, addr := range params.BackendAddresses {
  72. ipStr, portStr, err := net.SplitHostPort(addr)
  73. if err != nil {
  74. return nil, fmt.Errorf("failed to parse list of backend address %q: %v", addr, err)
  75. }
  76. ip := net.ParseIP(ipStr)
  77. if ip == nil {
  78. return nil, fmt.Errorf("failed to parse ip: %q", ipStr)
  79. }
  80. port, err := strconv.Atoi(portStr)
  81. if err != nil {
  82. return nil, fmt.Errorf("failed to convert port %q to int", portStr)
  83. }
  84. logger.Infof("Adding backend ip: %q, port: %d to server list", ip.String(), port)
  85. servers = append(servers, &lbpb.Server{
  86. IpAddress: ip,
  87. Port: int32(port),
  88. })
  89. }
  90. lis, err := net.Listen("tcp", "localhost:"+strconv.Itoa(params.ListenPort))
  91. if err != nil {
  92. return nil, fmt.Errorf("failed to listen on port %q: %v", params.ListenPort, err)
  93. }
  94. return &Server{
  95. sOpts: params.ServerOptions,
  96. serviceName: params.LoadBalancedServiceName,
  97. servicePort: params.LoadBalancedServicePort,
  98. shortStream: params.ShortStream,
  99. backends: servers,
  100. lis: lis,
  101. address: lis.Addr().String(),
  102. stopped: make(chan struct{}),
  103. }, nil
  104. }
  105. // Serve starts serving the LoadBalancer service on a gRPC server.
  106. //
  107. // It returns early with a non-nil error if it is unable to start serving.
  108. // Otherwise, it blocks until Stop() is called, at which point it returns the
  109. // error returned by the underlying grpc.Server's Serve() method.
  110. func (s *Server) Serve() error {
  111. s.mu.Lock()
  112. if s.grpcServer != nil {
  113. s.mu.Unlock()
  114. return errors.New("Serve() called multiple times")
  115. }
  116. server := grpc.NewServer(s.sOpts...)
  117. s.grpcServer = server
  118. s.mu.Unlock()
  119. logger.Infof("Begin listening on %s", s.lis.Addr().String())
  120. lbgrpc.RegisterLoadBalancerServer(server, s)
  121. return server.Serve(s.lis) // This call will block.
  122. }
  123. // Stop stops serving the LoadBalancer service and unblocks the preceding call
  124. // to Serve().
  125. func (s *Server) Stop() {
  126. defer close(s.stopped)
  127. s.mu.Lock()
  128. if s.grpcServer != nil {
  129. s.grpcServer.Stop()
  130. s.grpcServer = nil
  131. }
  132. s.mu.Unlock()
  133. }
  134. // Address returns the host:port on which the LoadBalancer service is serving.
  135. func (s *Server) Address() string {
  136. s.mu.Lock()
  137. defer s.mu.Unlock()
  138. return s.address
  139. }
  140. // BalanceLoad provides a fake implementation of the LoadBalancer service.
  141. func (s *Server) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
  142. logger.Info("New BalancerLoad stream started")
  143. req, err := stream.Recv()
  144. if err == io.EOF {
  145. logger.Warning("Received EOF when reading from the stream")
  146. return nil
  147. }
  148. if err != nil {
  149. logger.Warning("Failed to read LoadBalanceRequest from stream: %v", err)
  150. return err
  151. }
  152. logger.Infof("Received LoadBalancerRequest:\n%s", pretty.ToJSON(req))
  153. // Initial request contains the service being load balanced for.
  154. initialReq := req.GetInitialRequest()
  155. if initialReq == nil {
  156. logger.Info("First message on the stream does not contain an InitialLoadBalanceRequest")
  157. return status.Error(codes.Unknown, "First request not an InitialLoadBalanceRequest")
  158. }
  159. // Basic validation of the service name and port from the incoming request.
  160. //
  161. // Clients targeting service:port can sometimes include the ":port" suffix in
  162. // their requested names; handle this case.
  163. serviceName, port, err := net.SplitHostPort(initialReq.Name)
  164. if err != nil {
  165. // Requested name did not contain a port. So, use the name as is.
  166. serviceName = initialReq.Name
  167. } else {
  168. p, err := strconv.Atoi(port)
  169. if err != nil {
  170. logger.Info("Failed to parse requested service port %q to integer", port)
  171. return status.Error(codes.Unknown, "Bad requested service port number")
  172. }
  173. if p != s.servicePort {
  174. logger.Info("Requested service port number %q does not match expected", port, s.servicePort)
  175. return status.Error(codes.Unknown, "Bad requested service port number")
  176. }
  177. }
  178. if serviceName != s.serviceName {
  179. logger.Info("Requested service name %q does not match expected %q", serviceName, s.serviceName)
  180. return status.Error(codes.NotFound, "Bad requested service name")
  181. }
  182. // Empty initial response disables stats reporting from the client. Stats
  183. // reporting from the client is used to determine backend load and is not
  184. // required for the purposes of this fake.
  185. initResp := &lbpb.LoadBalanceResponse{
  186. LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
  187. InitialResponse: &lbpb.InitialLoadBalanceResponse{},
  188. },
  189. }
  190. if err := stream.Send(initResp); err != nil {
  191. logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
  192. return err
  193. }
  194. resp := &lbpb.LoadBalanceResponse{
  195. LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
  196. ServerList: &lbpb.ServerList{Servers: s.backends},
  197. },
  198. }
  199. logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
  200. if err := stream.Send(resp); err != nil {
  201. logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
  202. return err
  203. }
  204. if s.shortStream {
  205. logger.Info("Ending stream early as the short stream option was set")
  206. return nil
  207. }
  208. for {
  209. select {
  210. case <-stream.Context().Done():
  211. return nil
  212. case <-s.stopped:
  213. return nil
  214. case <-time.After(10 * time.Second):
  215. logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
  216. if err := stream.Send(resp); err != nil {
  217. logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
  218. return err
  219. }
  220. }
  221. }
  222. }