123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- /*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- // Package fakegrpclb provides a fake implementation of the grpclb server.
- package fakegrpclb
- import (
- "errors"
- "fmt"
- "io"
- "net"
- "strconv"
- "sync"
- "time"
- "google.golang.org/grpc"
- lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
- lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal/pretty"
- "google.golang.org/grpc/status"
- )
- var logger = grpclog.Component("fake_grpclb")
- // ServerParams wraps options passed while creating a Server.
- type ServerParams struct {
- ListenPort int // Listening port for the balancer server.
- ServerOptions []grpc.ServerOption // gRPC options for the balancer server.
- LoadBalancedServiceName string // Service name being load balanced for.
- LoadBalancedServicePort int // Service port being load balanced for.
- BackendAddresses []string // Service backends to balance load across.
- ShortStream bool // End balancer stream after sending server list.
- }
- // Server is a fake implementation of the grpclb LoadBalancer service. It does
- // not support stats reporting from clients, and always sends back a static list
- // of backends to the client to balance load across.
- //
- // It is safe for concurrent access.
- type Server struct {
- lbgrpc.UnimplementedLoadBalancerServer
- // Options copied over from ServerParams passed to NewServer.
- sOpts []grpc.ServerOption // gRPC server options.
- serviceName string // Service name being load balanced for.
- servicePort int // Service port being load balanced for.
- shortStream bool // End balancer stream after sending server list.
- // Values initialized using ServerParams passed to NewServer.
- backends []*lbpb.Server // Service backends to balance load across.
- lis net.Listener // Listener for grpc connections to the LoadBalancer service.
- // mu guards access to below fields.
- mu sync.Mutex
- grpcServer *grpc.Server // Underlying grpc server.
- address string // Actual listening address.
- stopped chan struct{} // Closed when Stop() is called.
- }
- // NewServer creates a new Server with passed in params. Returns a non-nil error
- // if the params are invalid.
- func NewServer(params ServerParams) (*Server, error) {
- var servers []*lbpb.Server
- for _, addr := range params.BackendAddresses {
- ipStr, portStr, err := net.SplitHostPort(addr)
- if err != nil {
- return nil, fmt.Errorf("failed to parse list of backend address %q: %v", addr, err)
- }
- ip := net.ParseIP(ipStr)
- if ip == nil {
- return nil, fmt.Errorf("failed to parse ip: %q", ipStr)
- }
- port, err := strconv.Atoi(portStr)
- if err != nil {
- return nil, fmt.Errorf("failed to convert port %q to int", portStr)
- }
- logger.Infof("Adding backend ip: %q, port: %d to server list", ip.String(), port)
- servers = append(servers, &lbpb.Server{
- IpAddress: ip,
- Port: int32(port),
- })
- }
- lis, err := net.Listen("tcp", "localhost:"+strconv.Itoa(params.ListenPort))
- if err != nil {
- return nil, fmt.Errorf("failed to listen on port %q: %v", params.ListenPort, err)
- }
- return &Server{
- sOpts: params.ServerOptions,
- serviceName: params.LoadBalancedServiceName,
- servicePort: params.LoadBalancedServicePort,
- shortStream: params.ShortStream,
- backends: servers,
- lis: lis,
- address: lis.Addr().String(),
- stopped: make(chan struct{}),
- }, nil
- }
- // Serve starts serving the LoadBalancer service on a gRPC server.
- //
- // It returns early with a non-nil error if it is unable to start serving.
- // Otherwise, it blocks until Stop() is called, at which point it returns the
- // error returned by the underlying grpc.Server's Serve() method.
- func (s *Server) Serve() error {
- s.mu.Lock()
- if s.grpcServer != nil {
- s.mu.Unlock()
- return errors.New("Serve() called multiple times")
- }
- server := grpc.NewServer(s.sOpts...)
- s.grpcServer = server
- s.mu.Unlock()
- logger.Infof("Begin listening on %s", s.lis.Addr().String())
- lbgrpc.RegisterLoadBalancerServer(server, s)
- return server.Serve(s.lis) // This call will block.
- }
- // Stop stops serving the LoadBalancer service and unblocks the preceding call
- // to Serve().
- func (s *Server) Stop() {
- defer close(s.stopped)
- s.mu.Lock()
- if s.grpcServer != nil {
- s.grpcServer.Stop()
- s.grpcServer = nil
- }
- s.mu.Unlock()
- }
- // Address returns the host:port on which the LoadBalancer service is serving.
- func (s *Server) Address() string {
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.address
- }
- // BalanceLoad provides a fake implementation of the LoadBalancer service.
- func (s *Server) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
- logger.Info("New BalancerLoad stream started")
- req, err := stream.Recv()
- if err == io.EOF {
- logger.Warning("Received EOF when reading from the stream")
- return nil
- }
- if err != nil {
- logger.Warning("Failed to read LoadBalanceRequest from stream: %v", err)
- return err
- }
- logger.Infof("Received LoadBalancerRequest:\n%s", pretty.ToJSON(req))
- // Initial request contains the service being load balanced for.
- initialReq := req.GetInitialRequest()
- if initialReq == nil {
- logger.Info("First message on the stream does not contain an InitialLoadBalanceRequest")
- return status.Error(codes.Unknown, "First request not an InitialLoadBalanceRequest")
- }
- // Basic validation of the service name and port from the incoming request.
- //
- // Clients targeting service:port can sometimes include the ":port" suffix in
- // their requested names; handle this case.
- serviceName, port, err := net.SplitHostPort(initialReq.Name)
- if err != nil {
- // Requested name did not contain a port. So, use the name as is.
- serviceName = initialReq.Name
- } else {
- p, err := strconv.Atoi(port)
- if err != nil {
- logger.Info("Failed to parse requested service port %q to integer", port)
- return status.Error(codes.Unknown, "Bad requested service port number")
- }
- if p != s.servicePort {
- logger.Info("Requested service port number %q does not match expected", port, s.servicePort)
- return status.Error(codes.Unknown, "Bad requested service port number")
- }
- }
- if serviceName != s.serviceName {
- logger.Info("Requested service name %q does not match expected %q", serviceName, s.serviceName)
- return status.Error(codes.NotFound, "Bad requested service name")
- }
- // Empty initial response disables stats reporting from the client. Stats
- // reporting from the client is used to determine backend load and is not
- // required for the purposes of this fake.
- initResp := &lbpb.LoadBalanceResponse{
- LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
- InitialResponse: &lbpb.InitialLoadBalanceResponse{},
- },
- }
- if err := stream.Send(initResp); err != nil {
- logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
- return err
- }
- resp := &lbpb.LoadBalanceResponse{
- LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
- ServerList: &lbpb.ServerList{Servers: s.backends},
- },
- }
- logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
- if err := stream.Send(resp); err != nil {
- logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
- return err
- }
- if s.shortStream {
- logger.Info("Ending stream early as the short stream option was set")
- return nil
- }
- for {
- select {
- case <-stream.Context().Done():
- return nil
- case <-s.stopped:
- return nil
- case <-time.After(10 * time.Second):
- logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
- if err := stream.Send(resp); err != nil {
- logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
- return err
- }
- }
- }
- }
|