client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Binary client for xDS interop tests.
  19. package main
  20. import (
  21. "context"
  22. "flag"
  23. "fmt"
  24. "log"
  25. "net"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "google.golang.org/grpc"
  31. "google.golang.org/grpc/admin"
  32. "google.golang.org/grpc/credentials/insecure"
  33. "google.golang.org/grpc/credentials/xds"
  34. "google.golang.org/grpc/grpclog"
  35. "google.golang.org/grpc/metadata"
  36. "google.golang.org/grpc/peer"
  37. "google.golang.org/grpc/reflection"
  38. "google.golang.org/grpc/status"
  39. _ "google.golang.org/grpc/xds"
  40. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  41. testpb "google.golang.org/grpc/interop/grpc_testing"
  42. _ "google.golang.org/grpc/interop/xds" // to register Custom LB.
  43. )
  44. func init() {
  45. rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
  46. }
  47. type statsWatcherKey struct {
  48. startID int32
  49. endID int32
  50. }
  51. // rpcInfo contains the rpc type and the hostname where the response is received
  52. // from.
  53. type rpcInfo struct {
  54. typ string
  55. hostname string
  56. }
  57. type statsWatcher struct {
  58. rpcsByPeer map[string]int32
  59. rpcsByType map[string]map[string]int32
  60. numFailures int32
  61. remainingRPCs int32
  62. chanHosts chan *rpcInfo
  63. }
  64. func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
  65. rpcsByType := make(map[string]*testpb.LoadBalancerStatsResponse_RpcsByPeer, len(watcher.rpcsByType))
  66. for t, rpcsByPeer := range watcher.rpcsByType {
  67. rpcsByType[t] = &testpb.LoadBalancerStatsResponse_RpcsByPeer{
  68. RpcsByPeer: rpcsByPeer,
  69. }
  70. }
  71. return &testpb.LoadBalancerStatsResponse{
  72. NumFailures: watcher.numFailures + watcher.remainingRPCs,
  73. RpcsByPeer: watcher.rpcsByPeer,
  74. RpcsByMethod: rpcsByType,
  75. }
  76. }
  77. type accumulatedStats struct {
  78. mu sync.Mutex
  79. numRPCsStartedByMethod map[string]int32
  80. numRPCsSucceededByMethod map[string]int32
  81. numRPCsFailedByMethod map[string]int32
  82. rpcStatusByMethod map[string]map[int32]int32
  83. }
  84. func convertRPCName(in string) string {
  85. switch in {
  86. case unaryCall:
  87. return testpb.ClientConfigureRequest_UNARY_CALL.String()
  88. case emptyCall:
  89. return testpb.ClientConfigureRequest_EMPTY_CALL.String()
  90. }
  91. logger.Warningf("unrecognized rpc type: %s", in)
  92. return in
  93. }
  94. // copyStatsMap makes a copy of the map.
  95. func copyStatsMap(originalMap map[string]int32) map[string]int32 {
  96. newMap := make(map[string]int32, len(originalMap))
  97. for k, v := range originalMap {
  98. newMap[k] = v
  99. }
  100. return newMap
  101. }
  102. // copyStatsIntMap makes a copy of the map.
  103. func copyStatsIntMap(originalMap map[int32]int32) map[int32]int32 {
  104. newMap := make(map[int32]int32, len(originalMap))
  105. for k, v := range originalMap {
  106. newMap[k] = v
  107. }
  108. return newMap
  109. }
  110. func (as *accumulatedStats) makeStatsMap() map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats {
  111. m := make(map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats)
  112. for k, v := range as.numRPCsStartedByMethod {
  113. m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{RpcsStarted: v}
  114. }
  115. for k, v := range as.rpcStatusByMethod {
  116. if m[k] == nil {
  117. m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{}
  118. }
  119. m[k].Result = copyStatsIntMap(v)
  120. }
  121. return m
  122. }
  123. func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
  124. as.mu.Lock()
  125. defer as.mu.Unlock()
  126. return &testpb.LoadBalancerAccumulatedStatsResponse{
  127. NumRpcsStartedByMethod: copyStatsMap(as.numRPCsStartedByMethod),
  128. NumRpcsSucceededByMethod: copyStatsMap(as.numRPCsSucceededByMethod),
  129. NumRpcsFailedByMethod: copyStatsMap(as.numRPCsFailedByMethod),
  130. StatsPerMethod: as.makeStatsMap(),
  131. }
  132. }
  133. func (as *accumulatedStats) startRPC(rpcType string) {
  134. as.mu.Lock()
  135. defer as.mu.Unlock()
  136. as.numRPCsStartedByMethod[convertRPCName(rpcType)]++
  137. }
  138. func (as *accumulatedStats) finishRPC(rpcType string, err error) {
  139. as.mu.Lock()
  140. defer as.mu.Unlock()
  141. name := convertRPCName(rpcType)
  142. if as.rpcStatusByMethod[name] == nil {
  143. as.rpcStatusByMethod[name] = make(map[int32]int32)
  144. }
  145. as.rpcStatusByMethod[name][int32(status.Convert(err).Code())]++
  146. if err != nil {
  147. as.numRPCsFailedByMethod[name]++
  148. return
  149. }
  150. as.numRPCsSucceededByMethod[name]++
  151. }
  152. var (
  153. failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
  154. numChannels = flag.Int("num_channels", 1, "Num of channels")
  155. printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
  156. qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
  157. rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
  158. rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
  159. rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
  160. server = flag.String("server", "localhost:8080", "Address of server to connect to")
  161. statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")
  162. secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
  163. rpcCfgs atomic.Value
  164. mu sync.Mutex
  165. currentRequestID int32
  166. watchers = make(map[statsWatcherKey]*statsWatcher)
  167. accStats = accumulatedStats{
  168. numRPCsStartedByMethod: make(map[string]int32),
  169. numRPCsSucceededByMethod: make(map[string]int32),
  170. numRPCsFailedByMethod: make(map[string]int32),
  171. rpcStatusByMethod: make(map[string]map[int32]int32),
  172. }
  173. // 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
  174. // setRPCSucceeded to access in a safe manner.
  175. rpcSucceeded uint32
  176. logger = grpclog.Component("interop")
  177. )
  178. type statsService struct {
  179. testgrpc.UnimplementedLoadBalancerStatsServiceServer
  180. }
  181. func hasRPCSucceeded() bool {
  182. return atomic.LoadUint32(&rpcSucceeded) > 0
  183. }
  184. func setRPCSucceeded() {
  185. atomic.StoreUint32(&rpcSucceeded, 1)
  186. }
  187. // Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete,
  188. // and return the distribution of remote peers. This is essentially a clientside
  189. // LB reporting mechanism that is designed to be queried by an external test
  190. // driver when verifying that the client is distributing RPCs as expected.
  191. func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalancerStatsRequest) (*testpb.LoadBalancerStatsResponse, error) {
  192. mu.Lock()
  193. watcherKey := statsWatcherKey{currentRequestID, currentRequestID + in.GetNumRpcs()}
  194. watcher, ok := watchers[watcherKey]
  195. if !ok {
  196. watcher = &statsWatcher{
  197. rpcsByPeer: make(map[string]int32),
  198. rpcsByType: make(map[string]map[string]int32),
  199. numFailures: 0,
  200. remainingRPCs: in.GetNumRpcs(),
  201. chanHosts: make(chan *rpcInfo),
  202. }
  203. watchers[watcherKey] = watcher
  204. }
  205. mu.Unlock()
  206. ctx, cancel := context.WithTimeout(ctx, time.Duration(in.GetTimeoutSec())*time.Second)
  207. defer cancel()
  208. defer func() {
  209. mu.Lock()
  210. delete(watchers, watcherKey)
  211. mu.Unlock()
  212. }()
  213. // Wait until the requested RPCs have all been recorded or timeout occurs.
  214. for {
  215. select {
  216. case info := <-watcher.chanHosts:
  217. if info != nil {
  218. watcher.rpcsByPeer[info.hostname]++
  219. rpcsByPeerForType := watcher.rpcsByType[info.typ]
  220. if rpcsByPeerForType == nil {
  221. rpcsByPeerForType = make(map[string]int32)
  222. watcher.rpcsByType[info.typ] = rpcsByPeerForType
  223. }
  224. rpcsByPeerForType[info.hostname]++
  225. } else {
  226. watcher.numFailures++
  227. }
  228. watcher.remainingRPCs--
  229. if watcher.remainingRPCs == 0 {
  230. return watcher.buildResp(), nil
  231. }
  232. case <-ctx.Done():
  233. logger.Info("Timed out, returning partial stats")
  234. return watcher.buildResp(), nil
  235. }
  236. }
  237. }
  238. func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
  239. return accStats.buildResp(), nil
  240. }
  241. type configureService struct {
  242. testgrpc.UnimplementedXdsUpdateClientConfigureServiceServer
  243. }
  244. func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
  245. rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
  246. for _, typ := range in.GetTypes() {
  247. rpcsToMD[typ] = nil
  248. }
  249. for _, md := range in.GetMetadata() {
  250. typ := md.GetType()
  251. strs, ok := rpcsToMD[typ]
  252. if !ok {
  253. continue
  254. }
  255. rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
  256. }
  257. cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
  258. for typ, md := range rpcsToMD {
  259. var rpcType string
  260. switch typ {
  261. case testpb.ClientConfigureRequest_UNARY_CALL:
  262. rpcType = unaryCall
  263. case testpb.ClientConfigureRequest_EMPTY_CALL:
  264. rpcType = emptyCall
  265. default:
  266. return nil, fmt.Errorf("unsupported RPC type: %v", typ)
  267. }
  268. cfgs = append(cfgs, &rpcConfig{
  269. typ: rpcType,
  270. md: metadata.Pairs(md...),
  271. timeout: in.GetTimeoutSec(),
  272. })
  273. }
  274. rpcCfgs.Store(cfgs)
  275. return &testpb.ClientConfigureResponse{}, nil
  276. }
  277. const (
  278. unaryCall string = "UnaryCall"
  279. emptyCall string = "EmptyCall"
  280. )
  281. func parseRPCTypes(rpcStr string) []string {
  282. if len(rpcStr) == 0 {
  283. return []string{unaryCall}
  284. }
  285. rpcs := strings.Split(rpcStr, ",")
  286. ret := make([]string, 0, len(rpcStr))
  287. for _, r := range rpcs {
  288. switch r {
  289. case unaryCall, emptyCall:
  290. ret = append(ret, r)
  291. default:
  292. flag.PrintDefaults()
  293. log.Fatalf("unsupported RPC type: %v", r)
  294. }
  295. }
  296. return ret
  297. }
  298. type rpcConfig struct {
  299. typ string
  300. md metadata.MD
  301. timeout int32
  302. }
  303. // parseRPCMetadata turns EmptyCall:key1:value1 into
  304. //
  305. // {typ: emptyCall, md: {key1:value1}}.
  306. func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {
  307. rpcMetadataSplit := strings.Split(rpcMetadataStr, ",")
  308. rpcsToMD := make(map[string][]string)
  309. for _, rm := range rpcMetadataSplit {
  310. rmSplit := strings.Split(rm, ":")
  311. if len(rmSplit)%2 != 1 {
  312. log.Fatalf("invalid metadata config %v, want EmptyCall:key1:value1", rm)
  313. }
  314. rpcsToMD[rmSplit[0]] = append(rpcsToMD[rmSplit[0]], rmSplit[1:]...)
  315. }
  316. ret := make([]*rpcConfig, 0, len(rpcs))
  317. for _, rpcT := range rpcs {
  318. rpcC := &rpcConfig{
  319. typ: rpcT,
  320. }
  321. if md := rpcsToMD[string(rpcT)]; len(md) > 0 {
  322. rpcC.md = metadata.Pairs(md...)
  323. }
  324. ret = append(ret, rpcC)
  325. }
  326. return ret
  327. }
  328. func main() {
  329. flag.Parse()
  330. rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))
  331. lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
  332. if err != nil {
  333. logger.Fatalf("failed to listen: %v", err)
  334. }
  335. s := grpc.NewServer()
  336. defer s.Stop()
  337. testgrpc.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
  338. testgrpc.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
  339. reflection.Register(s)
  340. cleanup, err := admin.Register(s)
  341. if err != nil {
  342. logger.Fatalf("Failed to register admin: %v", err)
  343. }
  344. defer cleanup()
  345. go s.Serve(lis)
  346. creds := insecure.NewCredentials()
  347. if *secureMode {
  348. var err error
  349. creds, err = xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
  350. if err != nil {
  351. logger.Fatalf("Failed to create xDS credentials: %v", err)
  352. }
  353. }
  354. clients := make([]testgrpc.TestServiceClient, *numChannels)
  355. for i := 0; i < *numChannels; i++ {
  356. conn, err := grpc.Dial(*server, grpc.WithTransportCredentials(creds))
  357. if err != nil {
  358. logger.Fatalf("Fail to dial: %v", err)
  359. }
  360. defer conn.Close()
  361. clients[i] = testgrpc.NewTestServiceClient(conn)
  362. }
  363. ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
  364. defer ticker.Stop()
  365. sendRPCs(clients, ticker)
  366. }
  367. func makeOneRPC(c testgrpc.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
  368. timeout := *rpcTimeout
  369. if cfg.timeout != 0 {
  370. timeout = time.Duration(cfg.timeout) * time.Second
  371. }
  372. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  373. defer cancel()
  374. if len(cfg.md) != 0 {
  375. ctx = metadata.NewOutgoingContext(ctx, cfg.md)
  376. }
  377. info := rpcInfo{typ: cfg.typ}
  378. var (
  379. p peer.Peer
  380. header metadata.MD
  381. err error
  382. )
  383. accStats.startRPC(cfg.typ)
  384. switch cfg.typ {
  385. case unaryCall:
  386. var resp *testpb.SimpleResponse
  387. resp, err = c.UnaryCall(ctx, &testpb.SimpleRequest{FillServerId: true}, grpc.Peer(&p), grpc.Header(&header))
  388. // For UnaryCall, also read hostname from response, in case the server
  389. // isn't updated to send headers.
  390. if resp != nil {
  391. info.hostname = resp.Hostname
  392. }
  393. case emptyCall:
  394. _, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
  395. }
  396. accStats.finishRPC(cfg.typ, err)
  397. if err != nil {
  398. return nil, nil, err
  399. }
  400. hosts := header["hostname"]
  401. if len(hosts) > 0 {
  402. info.hostname = hosts[0]
  403. }
  404. return &p, &info, err
  405. }
  406. func sendRPCs(clients []testgrpc.TestServiceClient, ticker *time.Ticker) {
  407. var i int
  408. for range ticker.C {
  409. // Get and increment request ID, and save a list of watchers that are
  410. // interested in this RPC.
  411. mu.Lock()
  412. savedRequestID := currentRequestID
  413. currentRequestID++
  414. savedWatchers := []*statsWatcher{}
  415. for key, value := range watchers {
  416. if key.startID <= savedRequestID && savedRequestID < key.endID {
  417. savedWatchers = append(savedWatchers, value)
  418. }
  419. }
  420. mu.Unlock()
  421. // Get the RPC metadata configurations from the Configure RPC.
  422. cfgs := rpcCfgs.Load().([]*rpcConfig)
  423. c := clients[i]
  424. for _, cfg := range cfgs {
  425. go func(cfg *rpcConfig) {
  426. p, info, err := makeOneRPC(c, cfg)
  427. for _, watcher := range savedWatchers {
  428. // This sends an empty string if the RPC failed.
  429. watcher.chanHosts <- info
  430. }
  431. if err != nil && *failOnFailedRPC && hasRPCSucceeded() {
  432. logger.Fatalf("RPC failed: %v", err)
  433. }
  434. if err == nil {
  435. setRPCSucceeded()
  436. }
  437. if *printResponse {
  438. if err == nil {
  439. if cfg.typ == unaryCall {
  440. // Need to keep this format, because some tests are
  441. // relying on stdout.
  442. fmt.Printf("Greeting: Hello world, this is %s, from %v\n", info.hostname, p.Addr)
  443. } else {
  444. fmt.Printf("RPC %q, from host %s, addr %v\n", cfg.typ, info.hostname, p.Addr)
  445. }
  446. } else {
  447. fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
  448. }
  449. }
  450. }(cfg)
  451. }
  452. i = (i + 1) % len(clients)
  453. }
  454. }