server.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. *
  3. * Copyright 2021 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Binary server is the server used for xDS interop tests.
  19. package main
  20. import (
  21. "context"
  22. "flag"
  23. "fmt"
  24. "log"
  25. "net"
  26. "os"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/admin"
  29. "google.golang.org/grpc/credentials/insecure"
  30. "google.golang.org/grpc/grpclog"
  31. "google.golang.org/grpc/health"
  32. "google.golang.org/grpc/metadata"
  33. "google.golang.org/grpc/reflection"
  34. "google.golang.org/grpc/xds"
  35. xdscreds "google.golang.org/grpc/credentials/xds"
  36. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  37. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  38. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  39. testpb "google.golang.org/grpc/interop/grpc_testing"
  40. )
  41. var (
  42. port = flag.Int("port", 8080, "Listening port for test service")
  43. maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
  44. serverID = flag.String("server_id", "go_server", "Server ID included in response")
  45. secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
  46. hostNameOverride = flag.String("host_name_override", "", "If set, use this as the hostname instead of the real hostname")
  47. logger = grpclog.Component("interop")
  48. )
  49. func getHostname() string {
  50. if *hostNameOverride != "" {
  51. return *hostNameOverride
  52. }
  53. hostname, err := os.Hostname()
  54. if err != nil {
  55. log.Fatalf("failed to get hostname: %v", err)
  56. }
  57. return hostname
  58. }
  59. // testServiceImpl provides an implementation of the TestService defined in
  60. // grpc.testing package.
  61. type testServiceImpl struct {
  62. testgrpc.UnimplementedTestServiceServer
  63. hostname string
  64. serverID string
  65. }
  66. func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
  67. grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
  68. return &testpb.Empty{}, nil
  69. }
  70. func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  71. grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
  72. return &testpb.SimpleResponse{ServerId: s.serverID, Hostname: s.hostname}, nil
  73. }
  74. // xdsUpdateHealthServiceImpl provides an implementation of the
  75. // XdsUpdateHealthService defined in grpc.testing package.
  76. type xdsUpdateHealthServiceImpl struct {
  77. testgrpc.UnimplementedXdsUpdateHealthServiceServer
  78. healthServer *health.Server
  79. }
  80. func (x *xdsUpdateHealthServiceImpl) SetServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
  81. x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
  82. return &testpb.Empty{}, nil
  83. }
  84. func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
  85. x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
  86. return &testpb.Empty{}, nil
  87. }
  88. func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
  89. logger.Infof("Serving mode callback for xDS server at %q invoked with mode: %q, err: %v", addr.String(), args.Mode, args.Err)
  90. }
  91. func main() {
  92. flag.Parse()
  93. if *secureMode && *port == *maintenancePort {
  94. logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
  95. }
  96. testService := &testServiceImpl{hostname: getHostname(), serverID: *serverID}
  97. healthServer := health.NewServer()
  98. updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}
  99. // If -secure_mode is not set, expose all services on -port with a regular
  100. // gRPC server.
  101. if !*secureMode {
  102. addr := fmt.Sprintf(":%d", *port)
  103. lis, err := net.Listen("tcp4", addr)
  104. if err != nil {
  105. logger.Fatalf("net.Listen(%s) failed: %v", addr, err)
  106. }
  107. server := grpc.NewServer()
  108. testgrpc.RegisterTestServiceServer(server, testService)
  109. healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
  110. healthgrpc.RegisterHealthServer(server, healthServer)
  111. testgrpc.RegisterXdsUpdateHealthServiceServer(server, updateHealthService)
  112. reflection.Register(server)
  113. cleanup, err := admin.Register(server)
  114. if err != nil {
  115. logger.Fatalf("Failed to register admin services: %v", err)
  116. }
  117. defer cleanup()
  118. if err := server.Serve(lis); err != nil {
  119. logger.Errorf("Serve() failed: %v", err)
  120. }
  121. return
  122. }
  123. // Create a listener on -port to expose the test service.
  124. addr := fmt.Sprintf(":%d", *port)
  125. testLis, err := net.Listen("tcp4", addr)
  126. if err != nil {
  127. logger.Fatalf("net.Listen(%s) failed: %v", addr, err)
  128. }
  129. // Create server-side xDS credentials with a plaintext fallback.
  130. creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
  131. if err != nil {
  132. logger.Fatalf("Failed to create xDS credentials: %v", err)
  133. }
  134. // Create an xDS enabled gRPC server, register the test service
  135. // implementation and start serving.
  136. testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
  137. testgrpc.RegisterTestServiceServer(testServer, testService)
  138. go func() {
  139. if err := testServer.Serve(testLis); err != nil {
  140. logger.Errorf("test server Serve() failed: %v", err)
  141. }
  142. }()
  143. defer testServer.Stop()
  144. // Create a listener on -maintenance_port to expose other services.
  145. addr = fmt.Sprintf(":%d", *maintenancePort)
  146. maintenanceLis, err := net.Listen("tcp4", addr)
  147. if err != nil {
  148. logger.Fatalf("net.Listen(%s) failed: %v", addr, err)
  149. }
  150. // Create a regular gRPC server and register the maintenance services on
  151. // it and start serving.
  152. maintenanceServer := grpc.NewServer()
  153. healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
  154. healthgrpc.RegisterHealthServer(maintenanceServer, healthServer)
  155. testgrpc.RegisterXdsUpdateHealthServiceServer(maintenanceServer, updateHealthService)
  156. reflection.Register(maintenanceServer)
  157. cleanup, err := admin.Register(maintenanceServer)
  158. if err != nil {
  159. logger.Fatalf("Failed to register admin services: %v", err)
  160. }
  161. defer cleanup()
  162. if err := maintenanceServer.Serve(maintenanceLis); err != nil {
  163. logger.Errorf("maintenance server Serve() failed: %v", err)
  164. }
  165. }