123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- /*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- // Binary client for xDS interop tests.
- package main
- import (
- "context"
- "flag"
- "fmt"
- "log"
- "net"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/admin"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/credentials/xds"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/reflection"
- "google.golang.org/grpc/status"
- _ "google.golang.org/grpc/xds"
- testgrpc "google.golang.org/grpc/interop/grpc_testing"
- testpb "google.golang.org/grpc/interop/grpc_testing"
- _ "google.golang.org/grpc/interop/xds" // to register Custom LB.
- )
- func init() {
- rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
- }
- type statsWatcherKey struct {
- startID int32
- endID int32
- }
- // rpcInfo contains the rpc type and the hostname where the response is received
- // from.
- type rpcInfo struct {
- typ string
- hostname string
- }
- type statsWatcher struct {
- rpcsByPeer map[string]int32
- rpcsByType map[string]map[string]int32
- numFailures int32
- remainingRPCs int32
- chanHosts chan *rpcInfo
- }
- func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
- rpcsByType := make(map[string]*testpb.LoadBalancerStatsResponse_RpcsByPeer, len(watcher.rpcsByType))
- for t, rpcsByPeer := range watcher.rpcsByType {
- rpcsByType[t] = &testpb.LoadBalancerStatsResponse_RpcsByPeer{
- RpcsByPeer: rpcsByPeer,
- }
- }
- return &testpb.LoadBalancerStatsResponse{
- NumFailures: watcher.numFailures + watcher.remainingRPCs,
- RpcsByPeer: watcher.rpcsByPeer,
- RpcsByMethod: rpcsByType,
- }
- }
- type accumulatedStats struct {
- mu sync.Mutex
- numRPCsStartedByMethod map[string]int32
- numRPCsSucceededByMethod map[string]int32
- numRPCsFailedByMethod map[string]int32
- rpcStatusByMethod map[string]map[int32]int32
- }
- func convertRPCName(in string) string {
- switch in {
- case unaryCall:
- return testpb.ClientConfigureRequest_UNARY_CALL.String()
- case emptyCall:
- return testpb.ClientConfigureRequest_EMPTY_CALL.String()
- }
- logger.Warningf("unrecognized rpc type: %s", in)
- return in
- }
- // copyStatsMap makes a copy of the map.
- func copyStatsMap(originalMap map[string]int32) map[string]int32 {
- newMap := make(map[string]int32, len(originalMap))
- for k, v := range originalMap {
- newMap[k] = v
- }
- return newMap
- }
- // copyStatsIntMap makes a copy of the map.
- func copyStatsIntMap(originalMap map[int32]int32) map[int32]int32 {
- newMap := make(map[int32]int32, len(originalMap))
- for k, v := range originalMap {
- newMap[k] = v
- }
- return newMap
- }
- func (as *accumulatedStats) makeStatsMap() map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats {
- m := make(map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats)
- for k, v := range as.numRPCsStartedByMethod {
- m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{RpcsStarted: v}
- }
- for k, v := range as.rpcStatusByMethod {
- if m[k] == nil {
- m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{}
- }
- m[k].Result = copyStatsIntMap(v)
- }
- return m
- }
- func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
- as.mu.Lock()
- defer as.mu.Unlock()
- return &testpb.LoadBalancerAccumulatedStatsResponse{
- NumRpcsStartedByMethod: copyStatsMap(as.numRPCsStartedByMethod),
- NumRpcsSucceededByMethod: copyStatsMap(as.numRPCsSucceededByMethod),
- NumRpcsFailedByMethod: copyStatsMap(as.numRPCsFailedByMethod),
- StatsPerMethod: as.makeStatsMap(),
- }
- }
- func (as *accumulatedStats) startRPC(rpcType string) {
- as.mu.Lock()
- defer as.mu.Unlock()
- as.numRPCsStartedByMethod[convertRPCName(rpcType)]++
- }
- func (as *accumulatedStats) finishRPC(rpcType string, err error) {
- as.mu.Lock()
- defer as.mu.Unlock()
- name := convertRPCName(rpcType)
- if as.rpcStatusByMethod[name] == nil {
- as.rpcStatusByMethod[name] = make(map[int32]int32)
- }
- as.rpcStatusByMethod[name][int32(status.Convert(err).Code())]++
- if err != nil {
- as.numRPCsFailedByMethod[name]++
- return
- }
- as.numRPCsSucceededByMethod[name]++
- }
- var (
- failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
- numChannels = flag.Int("num_channels", 1, "Num of channels")
- printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
- qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
- rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
- rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
- rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
- server = flag.String("server", "localhost:8080", "Address of server to connect to")
- statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")
- secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
- rpcCfgs atomic.Value
- mu sync.Mutex
- currentRequestID int32
- watchers = make(map[statsWatcherKey]*statsWatcher)
- accStats = accumulatedStats{
- numRPCsStartedByMethod: make(map[string]int32),
- numRPCsSucceededByMethod: make(map[string]int32),
- numRPCsFailedByMethod: make(map[string]int32),
- rpcStatusByMethod: make(map[string]map[int32]int32),
- }
- // 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
- // setRPCSucceeded to access in a safe manner.
- rpcSucceeded uint32
- logger = grpclog.Component("interop")
- )
- type statsService struct {
- testgrpc.UnimplementedLoadBalancerStatsServiceServer
- }
- func hasRPCSucceeded() bool {
- return atomic.LoadUint32(&rpcSucceeded) > 0
- }
- func setRPCSucceeded() {
- atomic.StoreUint32(&rpcSucceeded, 1)
- }
- // Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete,
- // and return the distribution of remote peers. This is essentially a clientside
- // LB reporting mechanism that is designed to be queried by an external test
- // driver when verifying that the client is distributing RPCs as expected.
- func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalancerStatsRequest) (*testpb.LoadBalancerStatsResponse, error) {
- mu.Lock()
- watcherKey := statsWatcherKey{currentRequestID, currentRequestID + in.GetNumRpcs()}
- watcher, ok := watchers[watcherKey]
- if !ok {
- watcher = &statsWatcher{
- rpcsByPeer: make(map[string]int32),
- rpcsByType: make(map[string]map[string]int32),
- numFailures: 0,
- remainingRPCs: in.GetNumRpcs(),
- chanHosts: make(chan *rpcInfo),
- }
- watchers[watcherKey] = watcher
- }
- mu.Unlock()
- ctx, cancel := context.WithTimeout(ctx, time.Duration(in.GetTimeoutSec())*time.Second)
- defer cancel()
- defer func() {
- mu.Lock()
- delete(watchers, watcherKey)
- mu.Unlock()
- }()
- // Wait until the requested RPCs have all been recorded or timeout occurs.
- for {
- select {
- case info := <-watcher.chanHosts:
- if info != nil {
- watcher.rpcsByPeer[info.hostname]++
- rpcsByPeerForType := watcher.rpcsByType[info.typ]
- if rpcsByPeerForType == nil {
- rpcsByPeerForType = make(map[string]int32)
- watcher.rpcsByType[info.typ] = rpcsByPeerForType
- }
- rpcsByPeerForType[info.hostname]++
- } else {
- watcher.numFailures++
- }
- watcher.remainingRPCs--
- if watcher.remainingRPCs == 0 {
- return watcher.buildResp(), nil
- }
- case <-ctx.Done():
- logger.Info("Timed out, returning partial stats")
- return watcher.buildResp(), nil
- }
- }
- }
- func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
- return accStats.buildResp(), nil
- }
- type configureService struct {
- testgrpc.UnimplementedXdsUpdateClientConfigureServiceServer
- }
- func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
- rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
- for _, typ := range in.GetTypes() {
- rpcsToMD[typ] = nil
- }
- for _, md := range in.GetMetadata() {
- typ := md.GetType()
- strs, ok := rpcsToMD[typ]
- if !ok {
- continue
- }
- rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
- }
- cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
- for typ, md := range rpcsToMD {
- var rpcType string
- switch typ {
- case testpb.ClientConfigureRequest_UNARY_CALL:
- rpcType = unaryCall
- case testpb.ClientConfigureRequest_EMPTY_CALL:
- rpcType = emptyCall
- default:
- return nil, fmt.Errorf("unsupported RPC type: %v", typ)
- }
- cfgs = append(cfgs, &rpcConfig{
- typ: rpcType,
- md: metadata.Pairs(md...),
- timeout: in.GetTimeoutSec(),
- })
- }
- rpcCfgs.Store(cfgs)
- return &testpb.ClientConfigureResponse{}, nil
- }
- const (
- unaryCall string = "UnaryCall"
- emptyCall string = "EmptyCall"
- )
- func parseRPCTypes(rpcStr string) []string {
- if len(rpcStr) == 0 {
- return []string{unaryCall}
- }
- rpcs := strings.Split(rpcStr, ",")
- ret := make([]string, 0, len(rpcStr))
- for _, r := range rpcs {
- switch r {
- case unaryCall, emptyCall:
- ret = append(ret, r)
- default:
- flag.PrintDefaults()
- log.Fatalf("unsupported RPC type: %v", r)
- }
- }
- return ret
- }
- type rpcConfig struct {
- typ string
- md metadata.MD
- timeout int32
- }
- // parseRPCMetadata turns EmptyCall:key1:value1 into
- //
- // {typ: emptyCall, md: {key1:value1}}.
- func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {
- rpcMetadataSplit := strings.Split(rpcMetadataStr, ",")
- rpcsToMD := make(map[string][]string)
- for _, rm := range rpcMetadataSplit {
- rmSplit := strings.Split(rm, ":")
- if len(rmSplit)%2 != 1 {
- log.Fatalf("invalid metadata config %v, want EmptyCall:key1:value1", rm)
- }
- rpcsToMD[rmSplit[0]] = append(rpcsToMD[rmSplit[0]], rmSplit[1:]...)
- }
- ret := make([]*rpcConfig, 0, len(rpcs))
- for _, rpcT := range rpcs {
- rpcC := &rpcConfig{
- typ: rpcT,
- }
- if md := rpcsToMD[string(rpcT)]; len(md) > 0 {
- rpcC.md = metadata.Pairs(md...)
- }
- ret = append(ret, rpcC)
- }
- return ret
- }
- func main() {
- flag.Parse()
- rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
- if err != nil {
- logger.Fatalf("failed to listen: %v", err)
- }
- s := grpc.NewServer()
- defer s.Stop()
- testgrpc.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
- testgrpc.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
- reflection.Register(s)
- cleanup, err := admin.Register(s)
- if err != nil {
- logger.Fatalf("Failed to register admin: %v", err)
- }
- defer cleanup()
- go s.Serve(lis)
- creds := insecure.NewCredentials()
- if *secureMode {
- var err error
- creds, err = xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
- if err != nil {
- logger.Fatalf("Failed to create xDS credentials: %v", err)
- }
- }
- clients := make([]testgrpc.TestServiceClient, *numChannels)
- for i := 0; i < *numChannels; i++ {
- conn, err := grpc.Dial(*server, grpc.WithTransportCredentials(creds))
- if err != nil {
- logger.Fatalf("Fail to dial: %v", err)
- }
- defer conn.Close()
- clients[i] = testgrpc.NewTestServiceClient(conn)
- }
- ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
- defer ticker.Stop()
- sendRPCs(clients, ticker)
- }
- func makeOneRPC(c testgrpc.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
- timeout := *rpcTimeout
- if cfg.timeout != 0 {
- timeout = time.Duration(cfg.timeout) * time.Second
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
- if len(cfg.md) != 0 {
- ctx = metadata.NewOutgoingContext(ctx, cfg.md)
- }
- info := rpcInfo{typ: cfg.typ}
- var (
- p peer.Peer
- header metadata.MD
- err error
- )
- accStats.startRPC(cfg.typ)
- switch cfg.typ {
- case unaryCall:
- var resp *testpb.SimpleResponse
- resp, err = c.UnaryCall(ctx, &testpb.SimpleRequest{FillServerId: true}, grpc.Peer(&p), grpc.Header(&header))
- // For UnaryCall, also read hostname from response, in case the server
- // isn't updated to send headers.
- if resp != nil {
- info.hostname = resp.Hostname
- }
- case emptyCall:
- _, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
- }
- accStats.finishRPC(cfg.typ, err)
- if err != nil {
- return nil, nil, err
- }
- hosts := header["hostname"]
- if len(hosts) > 0 {
- info.hostname = hosts[0]
- }
- return &p, &info, err
- }
- func sendRPCs(clients []testgrpc.TestServiceClient, ticker *time.Ticker) {
- var i int
- for range ticker.C {
- // Get and increment request ID, and save a list of watchers that are
- // interested in this RPC.
- mu.Lock()
- savedRequestID := currentRequestID
- currentRequestID++
- savedWatchers := []*statsWatcher{}
- for key, value := range watchers {
- if key.startID <= savedRequestID && savedRequestID < key.endID {
- savedWatchers = append(savedWatchers, value)
- }
- }
- mu.Unlock()
- // Get the RPC metadata configurations from the Configure RPC.
- cfgs := rpcCfgs.Load().([]*rpcConfig)
- c := clients[i]
- for _, cfg := range cfgs {
- go func(cfg *rpcConfig) {
- p, info, err := makeOneRPC(c, cfg)
- for _, watcher := range savedWatchers {
- // This sends an empty string if the RPC failed.
- watcher.chanHosts <- info
- }
- if err != nil && *failOnFailedRPC && hasRPCSucceeded() {
- logger.Fatalf("RPC failed: %v", err)
- }
- if err == nil {
- setRPCSucceeded()
- }
- if *printResponse {
- if err == nil {
- if cfg.typ == unaryCall {
- // Need to keep this format, because some tests are
- // relying on stdout.
- fmt.Printf("Greeting: Hello world, this is %s, from %v\n", info.hostname, p.Addr)
- } else {
- fmt.Printf("RPC %q, from host %s, addr %v\n", cfg.typ, info.hostname, p.Addr)
- }
- } else {
- fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
- }
- }
- }(cfg)
- }
- i = (i + 1) % len(clients)
- }
- }
|