test_utils.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package interop contains functions used by interop client/server.
  19. //
  20. // See interop test case descriptions [here].
  21. //
  22. // [here]: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
  23. package interop
  24. import (
  25. "bytes"
  26. "context"
  27. "fmt"
  28. "io"
  29. "os"
  30. "strings"
  31. "sync"
  32. "time"
  33. "github.com/golang/protobuf/proto"
  34. "golang.org/x/oauth2"
  35. "golang.org/x/oauth2/google"
  36. "google.golang.org/grpc"
  37. "google.golang.org/grpc/benchmark/stats"
  38. "google.golang.org/grpc/codes"
  39. "google.golang.org/grpc/grpclog"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/orca"
  42. "google.golang.org/grpc/peer"
  43. "google.golang.org/grpc/status"
  44. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  45. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  46. testpb "google.golang.org/grpc/interop/grpc_testing"
  47. )
  48. var (
  49. reqSizes = []int{27182, 8, 1828, 45904}
  50. respSizes = []int{31415, 9, 2653, 58979}
  51. largeReqSize = 271828
  52. largeRespSize = 314159
  53. initialMetadataKey = "x-grpc-test-echo-initial"
  54. trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
  55. logger = grpclog.Component("interop")
  56. )
  57. // ClientNewPayload returns a payload of the given type and size.
  58. func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
  59. if size < 0 {
  60. logger.Fatalf("Requested a response with invalid length %d", size)
  61. }
  62. body := make([]byte, size)
  63. switch t {
  64. case testpb.PayloadType_COMPRESSABLE:
  65. default:
  66. logger.Fatalf("Unsupported payload type: %d", t)
  67. }
  68. return &testpb.Payload{
  69. Type: t,
  70. Body: body,
  71. }
  72. }
  73. // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
  74. func DoEmptyUnaryCall(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  75. reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
  76. if err != nil {
  77. logger.Fatal("/TestService/EmptyCall RPC failed: ", err)
  78. }
  79. if !proto.Equal(&testpb.Empty{}, reply) {
  80. logger.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
  81. }
  82. }
  83. // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
  84. func DoLargeUnaryCall(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  85. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  86. req := &testpb.SimpleRequest{
  87. ResponseType: testpb.PayloadType_COMPRESSABLE,
  88. ResponseSize: int32(largeRespSize),
  89. Payload: pl,
  90. }
  91. reply, err := tc.UnaryCall(context.Background(), req, args...)
  92. if err != nil {
  93. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  94. }
  95. t := reply.GetPayload().GetType()
  96. s := len(reply.GetPayload().GetBody())
  97. if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
  98. logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
  99. }
  100. }
  101. // DoClientStreaming performs a client streaming RPC.
  102. func DoClientStreaming(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  103. stream, err := tc.StreamingInputCall(context.Background(), args...)
  104. if err != nil {
  105. logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  106. }
  107. var sum int
  108. for _, s := range reqSizes {
  109. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
  110. req := &testpb.StreamingInputCallRequest{
  111. Payload: pl,
  112. }
  113. if err := stream.Send(req); err != nil {
  114. logger.Fatalf("%v has error %v while sending %v", stream, err, req)
  115. }
  116. sum += s
  117. }
  118. reply, err := stream.CloseAndRecv()
  119. if err != nil {
  120. logger.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  121. }
  122. if reply.GetAggregatedPayloadSize() != int32(sum) {
  123. logger.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
  124. }
  125. }
  126. // DoServerStreaming performs a server streaming RPC.
  127. func DoServerStreaming(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  128. respParam := make([]*testpb.ResponseParameters, len(respSizes))
  129. for i, s := range respSizes {
  130. respParam[i] = &testpb.ResponseParameters{
  131. Size: int32(s),
  132. }
  133. }
  134. req := &testpb.StreamingOutputCallRequest{
  135. ResponseType: testpb.PayloadType_COMPRESSABLE,
  136. ResponseParameters: respParam,
  137. }
  138. stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
  139. if err != nil {
  140. logger.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
  141. }
  142. var rpcStatus error
  143. var respCnt int
  144. var index int
  145. for {
  146. reply, err := stream.Recv()
  147. if err != nil {
  148. rpcStatus = err
  149. break
  150. }
  151. t := reply.GetPayload().GetType()
  152. if t != testpb.PayloadType_COMPRESSABLE {
  153. logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  154. }
  155. size := len(reply.GetPayload().GetBody())
  156. if size != respSizes[index] {
  157. logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  158. }
  159. index++
  160. respCnt++
  161. }
  162. if rpcStatus != io.EOF {
  163. logger.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
  164. }
  165. if respCnt != len(respSizes) {
  166. logger.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
  167. }
  168. }
  169. // DoPingPong performs ping-pong style bi-directional streaming RPC.
  170. func DoPingPong(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  171. stream, err := tc.FullDuplexCall(context.Background(), args...)
  172. if err != nil {
  173. logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  174. }
  175. var index int
  176. for index < len(reqSizes) {
  177. respParam := []*testpb.ResponseParameters{
  178. {
  179. Size: int32(respSizes[index]),
  180. },
  181. }
  182. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
  183. req := &testpb.StreamingOutputCallRequest{
  184. ResponseType: testpb.PayloadType_COMPRESSABLE,
  185. ResponseParameters: respParam,
  186. Payload: pl,
  187. }
  188. if err := stream.Send(req); err != nil {
  189. logger.Fatalf("%v has error %v while sending %v", stream, err, req)
  190. }
  191. reply, err := stream.Recv()
  192. if err != nil {
  193. logger.Fatalf("%v.Recv() = %v", stream, err)
  194. }
  195. t := reply.GetPayload().GetType()
  196. if t != testpb.PayloadType_COMPRESSABLE {
  197. logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
  198. }
  199. size := len(reply.GetPayload().GetBody())
  200. if size != respSizes[index] {
  201. logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  202. }
  203. index++
  204. }
  205. if err := stream.CloseSend(); err != nil {
  206. logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  207. }
  208. if _, err := stream.Recv(); err != io.EOF {
  209. logger.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  210. }
  211. }
  212. // DoEmptyStream sets up a bi-directional streaming with zero message.
  213. func DoEmptyStream(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  214. stream, err := tc.FullDuplexCall(context.Background(), args...)
  215. if err != nil {
  216. logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  217. }
  218. if err := stream.CloseSend(); err != nil {
  219. logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  220. }
  221. if _, err := stream.Recv(); err != io.EOF {
  222. logger.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
  223. }
  224. }
  225. // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
  226. func DoTimeoutOnSleepingServer(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  227. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
  228. defer cancel()
  229. stream, err := tc.FullDuplexCall(ctx, args...)
  230. if err != nil {
  231. if status.Code(err) == codes.DeadlineExceeded {
  232. return
  233. }
  234. logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  235. }
  236. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  237. req := &testpb.StreamingOutputCallRequest{
  238. ResponseType: testpb.PayloadType_COMPRESSABLE,
  239. Payload: pl,
  240. }
  241. if err := stream.Send(req); err != nil && err != io.EOF {
  242. logger.Fatalf("%v.Send(_) = %v", stream, err)
  243. }
  244. if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
  245. logger.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
  246. }
  247. }
  248. // DoComputeEngineCreds performs a unary RPC with compute engine auth.
  249. func DoComputeEngineCreds(tc testgrpc.TestServiceClient, serviceAccount, oauthScope string) {
  250. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  251. req := &testpb.SimpleRequest{
  252. ResponseType: testpb.PayloadType_COMPRESSABLE,
  253. ResponseSize: int32(largeRespSize),
  254. Payload: pl,
  255. FillUsername: true,
  256. FillOauthScope: true,
  257. }
  258. reply, err := tc.UnaryCall(context.Background(), req)
  259. if err != nil {
  260. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  261. }
  262. user := reply.GetUsername()
  263. scope := reply.GetOauthScope()
  264. if user != serviceAccount {
  265. logger.Fatalf("Got user name %q, want %q.", user, serviceAccount)
  266. }
  267. if !strings.Contains(oauthScope, scope) {
  268. logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  269. }
  270. }
  271. func getServiceAccountJSONKey(keyFile string) []byte {
  272. jsonKey, err := os.ReadFile(keyFile)
  273. if err != nil {
  274. logger.Fatalf("Failed to read the service account key file: %v", err)
  275. }
  276. return jsonKey
  277. }
  278. // DoServiceAccountCreds performs a unary RPC with service account auth.
  279. func DoServiceAccountCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  280. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  281. req := &testpb.SimpleRequest{
  282. ResponseType: testpb.PayloadType_COMPRESSABLE,
  283. ResponseSize: int32(largeRespSize),
  284. Payload: pl,
  285. FillUsername: true,
  286. FillOauthScope: true,
  287. }
  288. reply, err := tc.UnaryCall(context.Background(), req)
  289. if err != nil {
  290. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  291. }
  292. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  293. user := reply.GetUsername()
  294. scope := reply.GetOauthScope()
  295. if !strings.Contains(string(jsonKey), user) {
  296. logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  297. }
  298. if !strings.Contains(oauthScope, scope) {
  299. logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  300. }
  301. }
  302. // DoJWTTokenCreds performs a unary RPC with JWT token auth.
  303. func DoJWTTokenCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile string) {
  304. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  305. req := &testpb.SimpleRequest{
  306. ResponseType: testpb.PayloadType_COMPRESSABLE,
  307. ResponseSize: int32(largeRespSize),
  308. Payload: pl,
  309. FillUsername: true,
  310. }
  311. reply, err := tc.UnaryCall(context.Background(), req)
  312. if err != nil {
  313. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  314. }
  315. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  316. user := reply.GetUsername()
  317. if !strings.Contains(string(jsonKey), user) {
  318. logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  319. }
  320. }
  321. // GetToken obtains an OAUTH token from the input.
  322. func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
  323. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  324. config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
  325. if err != nil {
  326. logger.Fatalf("Failed to get the config: %v", err)
  327. }
  328. token, err := config.TokenSource(context.Background()).Token()
  329. if err != nil {
  330. logger.Fatalf("Failed to get the token: %v", err)
  331. }
  332. return token
  333. }
  334. // DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
  335. func DoOauth2TokenCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  336. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  337. req := &testpb.SimpleRequest{
  338. ResponseType: testpb.PayloadType_COMPRESSABLE,
  339. ResponseSize: int32(largeRespSize),
  340. Payload: pl,
  341. FillUsername: true,
  342. FillOauthScope: true,
  343. }
  344. reply, err := tc.UnaryCall(context.Background(), req)
  345. if err != nil {
  346. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  347. }
  348. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  349. user := reply.GetUsername()
  350. scope := reply.GetOauthScope()
  351. if !strings.Contains(string(jsonKey), user) {
  352. logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  353. }
  354. if !strings.Contains(oauthScope, scope) {
  355. logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  356. }
  357. }
  358. // DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
  359. func DoPerRPCCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
  360. jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
  361. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  362. req := &testpb.SimpleRequest{
  363. ResponseType: testpb.PayloadType_COMPRESSABLE,
  364. ResponseSize: int32(largeRespSize),
  365. Payload: pl,
  366. FillUsername: true,
  367. FillOauthScope: true,
  368. }
  369. token := GetToken(serviceAccountKeyFile, oauthScope)
  370. kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
  371. ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
  372. reply, err := tc.UnaryCall(ctx, req)
  373. if err != nil {
  374. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  375. }
  376. user := reply.GetUsername()
  377. scope := reply.GetOauthScope()
  378. if !strings.Contains(string(jsonKey), user) {
  379. logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
  380. }
  381. if !strings.Contains(oauthScope, scope) {
  382. logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
  383. }
  384. }
  385. // DoGoogleDefaultCredentials performs an unary RPC with google default credentials
  386. func DoGoogleDefaultCredentials(tc testgrpc.TestServiceClient, defaultServiceAccount string) {
  387. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  388. req := &testpb.SimpleRequest{
  389. ResponseType: testpb.PayloadType_COMPRESSABLE,
  390. ResponseSize: int32(largeRespSize),
  391. Payload: pl,
  392. FillUsername: true,
  393. FillOauthScope: true,
  394. }
  395. reply, err := tc.UnaryCall(context.Background(), req)
  396. if err != nil {
  397. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  398. }
  399. if reply.GetUsername() != defaultServiceAccount {
  400. logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
  401. }
  402. }
  403. // DoComputeEngineChannelCredentials performs an unary RPC with compute engine channel credentials
  404. func DoComputeEngineChannelCredentials(tc testgrpc.TestServiceClient, defaultServiceAccount string) {
  405. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  406. req := &testpb.SimpleRequest{
  407. ResponseType: testpb.PayloadType_COMPRESSABLE,
  408. ResponseSize: int32(largeRespSize),
  409. Payload: pl,
  410. FillUsername: true,
  411. FillOauthScope: true,
  412. }
  413. reply, err := tc.UnaryCall(context.Background(), req)
  414. if err != nil {
  415. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  416. }
  417. if reply.GetUsername() != defaultServiceAccount {
  418. logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
  419. }
  420. }
  421. var testMetadata = metadata.MD{
  422. "key1": []string{"value1"},
  423. "key2": []string{"value2"},
  424. }
  425. // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
  426. func DoCancelAfterBegin(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  427. ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
  428. stream, err := tc.StreamingInputCall(ctx, args...)
  429. if err != nil {
  430. logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
  431. }
  432. cancel()
  433. _, err = stream.CloseAndRecv()
  434. if status.Code(err) != codes.Canceled {
  435. logger.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
  436. }
  437. }
  438. // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
  439. func DoCancelAfterFirstResponse(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  440. ctx, cancel := context.WithCancel(context.Background())
  441. stream, err := tc.FullDuplexCall(ctx, args...)
  442. if err != nil {
  443. logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
  444. }
  445. respParam := []*testpb.ResponseParameters{
  446. {
  447. Size: 31415,
  448. },
  449. }
  450. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
  451. req := &testpb.StreamingOutputCallRequest{
  452. ResponseType: testpb.PayloadType_COMPRESSABLE,
  453. ResponseParameters: respParam,
  454. Payload: pl,
  455. }
  456. if err := stream.Send(req); err != nil {
  457. logger.Fatalf("%v has error %v while sending %v", stream, err, req)
  458. }
  459. if _, err := stream.Recv(); err != nil {
  460. logger.Fatalf("%v.Recv() = %v", stream, err)
  461. }
  462. cancel()
  463. if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
  464. logger.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
  465. }
  466. }
  467. var (
  468. initialMetadataValue = "test_initial_metadata_value"
  469. trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
  470. customMetadata = metadata.Pairs(
  471. initialMetadataKey, initialMetadataValue,
  472. trailingMetadataKey, trailingMetadataValue,
  473. )
  474. )
  475. func validateMetadata(header, trailer metadata.MD) {
  476. if len(header[initialMetadataKey]) != 1 {
  477. logger.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
  478. }
  479. if header[initialMetadataKey][0] != initialMetadataValue {
  480. logger.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
  481. }
  482. if len(trailer[trailingMetadataKey]) != 1 {
  483. logger.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
  484. }
  485. if trailer[trailingMetadataKey][0] != trailingMetadataValue {
  486. logger.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
  487. }
  488. }
  489. // DoCustomMetadata checks that metadata is echoed back to the client.
  490. func DoCustomMetadata(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  491. // Testing with UnaryCall.
  492. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  493. req := &testpb.SimpleRequest{
  494. ResponseType: testpb.PayloadType_COMPRESSABLE,
  495. ResponseSize: int32(1),
  496. Payload: pl,
  497. }
  498. ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
  499. var header, trailer metadata.MD
  500. args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
  501. reply, err := tc.UnaryCall(
  502. ctx,
  503. req,
  504. args...,
  505. )
  506. if err != nil {
  507. logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
  508. }
  509. t := reply.GetPayload().GetType()
  510. s := len(reply.GetPayload().GetBody())
  511. if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
  512. logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
  513. }
  514. validateMetadata(header, trailer)
  515. // Testing with FullDuplex.
  516. stream, err := tc.FullDuplexCall(ctx, args...)
  517. if err != nil {
  518. logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  519. }
  520. respParam := []*testpb.ResponseParameters{
  521. {
  522. Size: 1,
  523. },
  524. }
  525. streamReq := &testpb.StreamingOutputCallRequest{
  526. ResponseType: testpb.PayloadType_COMPRESSABLE,
  527. ResponseParameters: respParam,
  528. Payload: pl,
  529. }
  530. if err := stream.Send(streamReq); err != nil {
  531. logger.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
  532. }
  533. streamHeader, err := stream.Header()
  534. if err != nil {
  535. logger.Fatalf("%v.Header() = %v", stream, err)
  536. }
  537. if _, err := stream.Recv(); err != nil {
  538. logger.Fatalf("%v.Recv() = %v", stream, err)
  539. }
  540. if err := stream.CloseSend(); err != nil {
  541. logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  542. }
  543. if _, err := stream.Recv(); err != io.EOF {
  544. logger.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
  545. }
  546. streamTrailer := stream.Trailer()
  547. validateMetadata(streamHeader, streamTrailer)
  548. }
  549. // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
  550. func DoStatusCodeAndMessage(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  551. var code int32 = 2
  552. msg := "test status message"
  553. expectedErr := status.Error(codes.Code(code), msg)
  554. respStatus := &testpb.EchoStatus{
  555. Code: code,
  556. Message: msg,
  557. }
  558. // Test UnaryCall.
  559. req := &testpb.SimpleRequest{
  560. ResponseStatus: respStatus,
  561. }
  562. if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
  563. logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  564. }
  565. // Test FullDuplexCall.
  566. stream, err := tc.FullDuplexCall(context.Background(), args...)
  567. if err != nil {
  568. logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  569. }
  570. streamReq := &testpb.StreamingOutputCallRequest{
  571. ResponseStatus: respStatus,
  572. }
  573. if err := stream.Send(streamReq); err != nil {
  574. logger.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
  575. }
  576. if err := stream.CloseSend(); err != nil {
  577. logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  578. }
  579. if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
  580. logger.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
  581. }
  582. }
  583. // DoSpecialStatusMessage verifies Unicode and whitespace is correctly processed
  584. // in status message.
  585. func DoSpecialStatusMessage(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
  586. const (
  587. code int32 = 2
  588. msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
  589. )
  590. expectedErr := status.Error(codes.Code(code), msg)
  591. req := &testpb.SimpleRequest{
  592. ResponseStatus: &testpb.EchoStatus{
  593. Code: code,
  594. Message: msg,
  595. },
  596. }
  597. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  598. defer cancel()
  599. if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
  600. logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
  601. }
  602. }
  603. // DoUnimplementedService attempts to call a method from an unimplemented service.
  604. func DoUnimplementedService(tc testgrpc.UnimplementedServiceClient) {
  605. _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
  606. if status.Code(err) != codes.Unimplemented {
  607. logger.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
  608. }
  609. }
  610. // DoUnimplementedMethod attempts to call an unimplemented method.
  611. func DoUnimplementedMethod(cc *grpc.ClientConn) {
  612. var req, reply proto.Message
  613. if err := cc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
  614. logger.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
  615. }
  616. }
  617. // DoPickFirstUnary runs multiple RPCs (rpcCount) and checks that all requests
  618. // are sent to the same backend.
  619. func DoPickFirstUnary(tc testgrpc.TestServiceClient) {
  620. const rpcCount = 100
  621. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
  622. req := &testpb.SimpleRequest{
  623. ResponseType: testpb.PayloadType_COMPRESSABLE,
  624. ResponseSize: int32(1),
  625. Payload: pl,
  626. FillServerId: true,
  627. }
  628. // TODO(mohanli): Revert the timeout back to 10s once TD migrates to xdstp.
  629. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  630. defer cancel()
  631. var serverID string
  632. for i := 0; i < rpcCount; i++ {
  633. resp, err := tc.UnaryCall(ctx, req)
  634. if err != nil {
  635. logger.Fatalf("iteration %d, failed to do UnaryCall: %v", i, err)
  636. }
  637. id := resp.ServerId
  638. if id == "" {
  639. logger.Fatalf("iteration %d, got empty server ID", i)
  640. }
  641. if i == 0 {
  642. serverID = id
  643. continue
  644. }
  645. if serverID != id {
  646. logger.Fatalf("iteration %d, got different server ids: %q vs %q", i, serverID, id)
  647. }
  648. }
  649. }
  650. func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
  651. start := time.Now()
  652. client := tc
  653. if resetChannel {
  654. var conn *grpc.ClientConn
  655. conn, err = grpc.Dial(serverAddr, dopts...)
  656. if err != nil {
  657. return
  658. }
  659. defer conn.Close()
  660. client = testgrpc.NewTestServiceClient(conn)
  661. }
  662. // per test spec, don't include channel shutdown in latency measurement
  663. defer func() { latency = time.Since(start) }()
  664. // do a large-unary RPC
  665. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
  666. req := &testpb.SimpleRequest{
  667. ResponseType: testpb.PayloadType_COMPRESSABLE,
  668. ResponseSize: int32(largeRespSize),
  669. Payload: pl,
  670. }
  671. var reply *testpb.SimpleResponse
  672. reply, err = client.UnaryCall(ctx, req, copts...)
  673. if err != nil {
  674. err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
  675. return
  676. }
  677. t := reply.GetPayload().GetType()
  678. s := len(reply.GetPayload().GetBody())
  679. if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
  680. err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
  681. return
  682. }
  683. return
  684. }
  685. // DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
  686. // If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
  687. // stub that is created with the provided server address and dial options.
  688. func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) {
  689. start := time.Now()
  690. ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
  691. defer cancel()
  692. iterationsDone := 0
  693. totalFailures := 0
  694. hopts := stats.HistogramOptions{
  695. NumBuckets: 20,
  696. GrowthFactor: 1,
  697. BaseBucketSize: 1,
  698. MinValue: 0,
  699. }
  700. h := stats.NewHistogram(hopts)
  701. for i := 0; i < soakIterations; i++ {
  702. if time.Now().After(overallDeadline) {
  703. break
  704. }
  705. earliestNextStart := time.After(minTimeBetweenRPCs)
  706. iterationsDone++
  707. var p peer.Peer
  708. latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts, []grpc.CallOption{grpc.Peer(&p)})
  709. latencyMs := int64(latency / time.Millisecond)
  710. h.Add(latencyMs)
  711. if err != nil {
  712. totalFailures++
  713. addrStr := "nil"
  714. if p.Addr != nil {
  715. addrStr = p.Addr.String()
  716. }
  717. fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
  718. <-earliestNextStart
  719. continue
  720. }
  721. if latency > perIterationMaxAcceptableLatency {
  722. totalFailures++
  723. fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
  724. <-earliestNextStart
  725. continue
  726. }
  727. fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
  728. <-earliestNextStart
  729. }
  730. var b bytes.Buffer
  731. h.Print(&b)
  732. fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
  733. fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
  734. if iterationsDone < soakIterations {
  735. logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, overallDeadline.Sub(start).Seconds(), iterationsDone, soakIterations)
  736. }
  737. if totalFailures > maxFailures {
  738. logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
  739. }
  740. }
  741. type testServer struct {
  742. testgrpc.UnimplementedTestServiceServer
  743. orcaMu sync.Mutex
  744. metricsRecorder orca.ServerMetricsRecorder
  745. }
  746. // NewTestServerOptions contains options that control the behavior of the test
  747. // server returned by NewTestServer.
  748. type NewTestServerOptions struct {
  749. MetricsRecorder orca.ServerMetricsRecorder
  750. }
  751. // NewTestServer creates a test server for test service. opts carries optional
  752. // settings and does not need to be provided. If multiple opts are provided,
  753. // only the first one is used.
  754. func NewTestServer(opts ...NewTestServerOptions) testgrpc.TestServiceServer {
  755. if len(opts) > 0 {
  756. return &testServer{metricsRecorder: opts[0].MetricsRecorder}
  757. }
  758. return &testServer{}
  759. }
  760. func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  761. return new(testpb.Empty), nil
  762. }
  763. func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
  764. if size < 0 {
  765. return nil, fmt.Errorf("requested a response with invalid length %d", size)
  766. }
  767. body := make([]byte, size)
  768. switch t {
  769. case testpb.PayloadType_COMPRESSABLE:
  770. default:
  771. return nil, fmt.Errorf("unsupported payload type: %d", t)
  772. }
  773. return &testpb.Payload{
  774. Type: t,
  775. Body: body,
  776. }, nil
  777. }
  778. func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  779. st := in.GetResponseStatus()
  780. if md, ok := metadata.FromIncomingContext(ctx); ok {
  781. if initialMetadata, ok := md[initialMetadataKey]; ok {
  782. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  783. grpc.SendHeader(ctx, header)
  784. }
  785. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  786. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  787. grpc.SetTrailer(ctx, trailer)
  788. }
  789. }
  790. if st != nil && st.Code != 0 {
  791. return nil, status.Error(codes.Code(st.Code), st.Message)
  792. }
  793. pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
  794. if err != nil {
  795. return nil, err
  796. }
  797. if r, orcaData := orca.CallMetricsRecorderFromContext(ctx), in.GetOrcaPerQueryReport(); r != nil && orcaData != nil {
  798. // Transfer the request's per-Call ORCA data to the call metrics
  799. // recorder in the context, if present.
  800. setORCAMetrics(r, orcaData)
  801. }
  802. return &testpb.SimpleResponse{
  803. Payload: pl,
  804. }, nil
  805. }
  806. func setORCAMetrics(r orca.ServerMetricsRecorder, orcaData *testpb.TestOrcaReport) {
  807. r.SetCPUUtilization(orcaData.CpuUtilization)
  808. r.SetMemoryUtilization(orcaData.MemoryUtilization)
  809. if rq, ok := r.(orca.CallMetricsRecorder); ok {
  810. for k, v := range orcaData.RequestCost {
  811. rq.SetRequestCost(k, v)
  812. }
  813. }
  814. for k, v := range orcaData.Utilization {
  815. r.SetNamedUtilization(k, v)
  816. }
  817. }
  818. func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
  819. cs := args.GetResponseParameters()
  820. for _, c := range cs {
  821. if us := c.GetIntervalUs(); us > 0 {
  822. time.Sleep(time.Duration(us) * time.Microsecond)
  823. }
  824. pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
  825. if err != nil {
  826. return err
  827. }
  828. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  829. Payload: pl,
  830. }); err != nil {
  831. return err
  832. }
  833. }
  834. return nil
  835. }
  836. func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
  837. var sum int
  838. for {
  839. in, err := stream.Recv()
  840. if err == io.EOF {
  841. return stream.SendAndClose(&testpb.StreamingInputCallResponse{
  842. AggregatedPayloadSize: int32(sum),
  843. })
  844. }
  845. if err != nil {
  846. return err
  847. }
  848. p := in.GetPayload().GetBody()
  849. sum += len(p)
  850. }
  851. }
  852. func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
  853. if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
  854. if initialMetadata, ok := md[initialMetadataKey]; ok {
  855. header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
  856. stream.SendHeader(header)
  857. }
  858. if trailingMetadata, ok := md[trailingMetadataKey]; ok {
  859. trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
  860. stream.SetTrailer(trailer)
  861. }
  862. }
  863. hasORCALock := false
  864. for {
  865. in, err := stream.Recv()
  866. if err == io.EOF {
  867. // read done.
  868. return nil
  869. }
  870. if err != nil {
  871. return err
  872. }
  873. st := in.GetResponseStatus()
  874. if st != nil && st.Code != 0 {
  875. return status.Error(codes.Code(st.Code), st.Message)
  876. }
  877. if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
  878. // Transfer the request's OOB ORCA data to the server metrics recorder
  879. // in the server, if present.
  880. if !hasORCALock {
  881. s.orcaMu.Lock()
  882. defer s.orcaMu.Unlock()
  883. hasORCALock = true
  884. }
  885. setORCAMetrics(r, orcaData)
  886. }
  887. cs := in.GetResponseParameters()
  888. for _, c := range cs {
  889. if us := c.GetIntervalUs(); us > 0 {
  890. time.Sleep(time.Duration(us) * time.Microsecond)
  891. }
  892. pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
  893. if err != nil {
  894. return err
  895. }
  896. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  897. Payload: pl,
  898. }); err != nil {
  899. return err
  900. }
  901. }
  902. }
  903. }
  904. func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallServer) error {
  905. var msgBuf []*testpb.StreamingOutputCallRequest
  906. for {
  907. in, err := stream.Recv()
  908. if err == io.EOF {
  909. // read done.
  910. break
  911. }
  912. if err != nil {
  913. return err
  914. }
  915. msgBuf = append(msgBuf, in)
  916. }
  917. for _, m := range msgBuf {
  918. cs := m.GetResponseParameters()
  919. for _, c := range cs {
  920. if us := c.GetIntervalUs(); us > 0 {
  921. time.Sleep(time.Duration(us) * time.Microsecond)
  922. }
  923. pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
  924. if err != nil {
  925. return err
  926. }
  927. if err := stream.Send(&testpb.StreamingOutputCallResponse{
  928. Payload: pl,
  929. }); err != nil {
  930. return err
  931. }
  932. }
  933. }
  934. return nil
  935. }
  936. // DoORCAPerRPCTest performs a unary RPC that enables ORCA per-call reporting
  937. // and verifies the load report sent back to the LB policy's Done callback.
  938. func DoORCAPerRPCTest(tc testgrpc.TestServiceClient) {
  939. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  940. defer cancel()
  941. orcaRes := &v3orcapb.OrcaLoadReport{}
  942. _, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{
  943. OrcaPerQueryReport: &testpb.TestOrcaReport{
  944. CpuUtilization: 0.8210,
  945. MemoryUtilization: 0.5847,
  946. RequestCost: map[string]float64{"cost": 3456.32},
  947. Utilization: map[string]float64{"util": 0.30499},
  948. },
  949. })
  950. if err != nil {
  951. logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
  952. }
  953. want := &v3orcapb.OrcaLoadReport{
  954. CpuUtilization: 0.8210,
  955. MemUtilization: 0.5847,
  956. RequestCost: map[string]float64{"cost": 3456.32},
  957. Utilization: map[string]float64{"util": 0.30499},
  958. }
  959. if !proto.Equal(orcaRes, want) {
  960. logger.Fatalf("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
  961. }
  962. }
  963. // DoORCAOOBTest performs a streaming RPC that enables ORCA OOB reporting and
  964. // verifies the load report sent to the LB policy's OOB listener.
  965. func DoORCAOOBTest(tc testgrpc.TestServiceClient) {
  966. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  967. defer cancel()
  968. stream, err := tc.FullDuplexCall(ctx)
  969. if err != nil {
  970. logger.Fatalf("/TestService/FullDuplexCall received error starting stream: %v", err)
  971. }
  972. err = stream.Send(&testpb.StreamingOutputCallRequest{
  973. OrcaOobReport: &testpb.TestOrcaReport{
  974. CpuUtilization: 0.8210,
  975. MemoryUtilization: 0.5847,
  976. Utilization: map[string]float64{"util": 0.30499},
  977. },
  978. ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
  979. })
  980. if err != nil {
  981. logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
  982. }
  983. _, err = stream.Recv()
  984. if err != nil {
  985. logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
  986. }
  987. want := &v3orcapb.OrcaLoadReport{
  988. CpuUtilization: 0.8210,
  989. MemUtilization: 0.5847,
  990. Utilization: map[string]float64{"util": 0.30499},
  991. }
  992. checkORCAMetrics(ctx, tc, want)
  993. err = stream.Send(&testpb.StreamingOutputCallRequest{
  994. OrcaOobReport: &testpb.TestOrcaReport{
  995. CpuUtilization: 0.29309,
  996. MemoryUtilization: 0.2,
  997. Utilization: map[string]float64{"util": 0.2039},
  998. },
  999. ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
  1000. })
  1001. if err != nil {
  1002. logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
  1003. }
  1004. _, err = stream.Recv()
  1005. if err != nil {
  1006. logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
  1007. }
  1008. want = &v3orcapb.OrcaLoadReport{
  1009. CpuUtilization: 0.29309,
  1010. MemUtilization: 0.2,
  1011. Utilization: map[string]float64{"util": 0.2039},
  1012. }
  1013. checkORCAMetrics(ctx, tc, want)
  1014. }
  1015. func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {
  1016. for ctx.Err() == nil {
  1017. orcaRes := &v3orcapb.OrcaLoadReport{}
  1018. if _, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{}); err != nil {
  1019. logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
  1020. }
  1021. if proto.Equal(orcaRes, want) {
  1022. return
  1023. }
  1024. logger.Infof("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
  1025. time.Sleep(time.Second)
  1026. }
  1027. logger.Fatalf("timed out waiting for expected ORCA load report")
  1028. }