main.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /*
  19. Package main provides benchmark with setting flags.
  20. An example to run some benchmarks with profiling enabled:
  21. go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
  22. -compression=gzip -maxConcurrentCalls=1 -trace=off \
  23. -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
  24. -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
  25. As a suggestion, when creating a branch, you can run this benchmark and save the result
  26. file "-resultFile=basePerf", and later when you at the middle of the work or finish the
  27. work, you can get the benchmark result and compare it with the base anytime.
  28. Assume there are two result files names as "basePerf" and "curPerf" created by adding
  29. -resultFile=basePerf and -resultFile=curPerf.
  30. To format the curPerf, run:
  31. go run benchmark/benchresult/main.go curPerf
  32. To observe how the performance changes based on a base result, run:
  33. go run benchmark/benchresult/main.go basePerf curPerf
  34. */
  35. package main
  36. import (
  37. "context"
  38. "encoding/gob"
  39. "flag"
  40. "fmt"
  41. "io"
  42. "log"
  43. "net"
  44. "os"
  45. "reflect"
  46. "runtime"
  47. "runtime/pprof"
  48. "strings"
  49. "sync"
  50. "sync/atomic"
  51. "time"
  52. "google.golang.org/grpc"
  53. "google.golang.org/grpc/benchmark"
  54. bm "google.golang.org/grpc/benchmark"
  55. "google.golang.org/grpc/benchmark/flags"
  56. "google.golang.org/grpc/benchmark/latency"
  57. "google.golang.org/grpc/benchmark/stats"
  58. "google.golang.org/grpc/credentials/insecure"
  59. "google.golang.org/grpc/grpclog"
  60. "google.golang.org/grpc/internal/channelz"
  61. "google.golang.org/grpc/keepalive"
  62. "google.golang.org/grpc/metadata"
  63. "google.golang.org/grpc/test/bufconn"
  64. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  65. testpb "google.golang.org/grpc/interop/grpc_testing"
  66. )
  67. var (
  68. workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
  69. fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
  70. traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
  71. fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  72. preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
  73. fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  74. channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
  75. fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
  76. compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
  77. fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
  78. networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
  79. "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
  80. readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
  81. readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
  82. readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
  83. maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
  84. readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
  85. readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
  86. reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
  87. respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
  88. benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
  89. memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
  90. memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
  91. "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
  92. "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
  93. cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
  94. benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
  95. useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
  96. enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
  97. "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
  98. clientReadBufferSize = flags.IntSlice("clientReadBufferSize", []int{-1}, "Configures the client read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
  99. clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
  100. serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
  101. serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
  102. logger = grpclog.Component("benchmark")
  103. )
  104. const (
  105. workloadsUnary = "unary"
  106. workloadsStreaming = "streaming"
  107. workloadsUnconstrained = "unconstrained"
  108. workloadsAll = "all"
  109. // Compression modes.
  110. compModeOff = "off"
  111. compModeGzip = "gzip"
  112. compModeNop = "nop"
  113. compModeAll = "all"
  114. // Toggle modes.
  115. toggleModeOff = "off"
  116. toggleModeOn = "on"
  117. toggleModeBoth = "both"
  118. // Network modes.
  119. networkModeNone = "none"
  120. networkModeLocal = "Local"
  121. networkModeLAN = "LAN"
  122. networkModeWAN = "WAN"
  123. networkLongHaul = "Longhaul"
  124. numStatsBuckets = 10
  125. warmupCallCount = 10
  126. warmuptime = time.Second
  127. )
  128. var (
  129. allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
  130. allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
  131. allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
  132. allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
  133. defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
  134. defaultReadKbps = []int{0, 10240} // if non-positive, infinite
  135. defaultReadMTU = []int{0} // if non-positive, infinite
  136. defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
  137. defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
  138. defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
  139. networks = map[string]latency.Network{
  140. networkModeLocal: latency.Local,
  141. networkModeLAN: latency.LAN,
  142. networkModeWAN: latency.WAN,
  143. networkLongHaul: latency.Longhaul,
  144. }
  145. keepaliveTime = 10 * time.Second
  146. keepaliveTimeout = 1 * time.Second
  147. // This is 0.8*keepaliveTime to prevent connection issues because of server
  148. // keepalive enforcement.
  149. keepaliveMinTime = 8 * time.Second
  150. )
  151. // runModes indicates the workloads to run. This is initialized with a call to
  152. // `runModesFromWorkloads`, passing the workloads flag set by the user.
  153. type runModes struct {
  154. unary, streaming, unconstrained bool
  155. }
  156. // runModesFromWorkloads determines the runModes based on the value of
  157. // workloads flag set by the user.
  158. func runModesFromWorkloads(workload string) runModes {
  159. r := runModes{}
  160. switch workload {
  161. case workloadsUnary:
  162. r.unary = true
  163. case workloadsStreaming:
  164. r.streaming = true
  165. case workloadsUnconstrained:
  166. r.unconstrained = true
  167. case workloadsAll:
  168. r.unary = true
  169. r.streaming = true
  170. r.unconstrained = true
  171. default:
  172. log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
  173. workloads, strings.Join(allWorkloads, ", "))
  174. }
  175. return r
  176. }
  177. type startFunc func(mode string, bf stats.Features)
  178. type stopFunc func(count uint64)
  179. type ucStopFunc func(req uint64, resp uint64)
  180. type rpcCallFunc func(pos int)
  181. type rpcSendFunc func(pos int)
  182. type rpcRecvFunc func(pos int)
  183. type rpcCleanupFunc func()
  184. func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
  185. caller, cleanup := makeFuncUnary(bf)
  186. defer cleanup()
  187. runBenchmark(caller, start, stop, bf, s, workloadsUnary)
  188. }
  189. func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
  190. caller, cleanup := makeFuncStream(bf)
  191. defer cleanup()
  192. runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
  193. }
  194. func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features) {
  195. var sender rpcSendFunc
  196. var recver rpcRecvFunc
  197. var cleanup rpcCleanupFunc
  198. if bf.EnablePreloader {
  199. sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
  200. } else {
  201. sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
  202. }
  203. defer cleanup()
  204. var req, resp uint64
  205. go func() {
  206. // Resets the counters once warmed up
  207. <-time.NewTimer(warmuptime).C
  208. atomic.StoreUint64(&req, 0)
  209. atomic.StoreUint64(&resp, 0)
  210. start(workloadsUnconstrained, bf)
  211. }()
  212. bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
  213. var wg sync.WaitGroup
  214. wg.Add(2 * bf.MaxConcurrentCalls)
  215. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  216. go func(pos int) {
  217. defer wg.Done()
  218. for {
  219. t := time.Now()
  220. if t.After(bmEnd) {
  221. return
  222. }
  223. sender(pos)
  224. atomic.AddUint64(&req, 1)
  225. }
  226. }(i)
  227. go func(pos int) {
  228. defer wg.Done()
  229. for {
  230. t := time.Now()
  231. if t.After(bmEnd) {
  232. return
  233. }
  234. recver(pos)
  235. atomic.AddUint64(&resp, 1)
  236. }
  237. }(i)
  238. }
  239. wg.Wait()
  240. stop(req, resp)
  241. }
  242. // makeClient returns a gRPC client for the grpc.testing.BenchmarkService
  243. // service. The client is configured using the different options in the passed
  244. // 'bf'. Also returns a cleanup function to close the client and release
  245. // resources.
  246. func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
  247. nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
  248. opts := []grpc.DialOption{}
  249. sopts := []grpc.ServerOption{}
  250. if bf.ModeCompressor == compModeNop {
  251. sopts = append(sopts,
  252. grpc.RPCCompressor(nopCompressor{}),
  253. grpc.RPCDecompressor(nopDecompressor{}),
  254. )
  255. opts = append(opts,
  256. grpc.WithCompressor(nopCompressor{}),
  257. grpc.WithDecompressor(nopDecompressor{}),
  258. )
  259. }
  260. if bf.ModeCompressor == compModeGzip {
  261. sopts = append(sopts,
  262. grpc.RPCCompressor(grpc.NewGZIPCompressor()),
  263. grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
  264. )
  265. opts = append(opts,
  266. grpc.WithCompressor(grpc.NewGZIPCompressor()),
  267. grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
  268. )
  269. }
  270. if bf.EnableKeepalive {
  271. sopts = append(sopts,
  272. grpc.KeepaliveParams(keepalive.ServerParameters{
  273. Time: keepaliveTime,
  274. Timeout: keepaliveTimeout,
  275. }),
  276. grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  277. MinTime: keepaliveMinTime,
  278. PermitWithoutStream: true,
  279. }),
  280. )
  281. opts = append(opts,
  282. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  283. Time: keepaliveTime,
  284. Timeout: keepaliveTimeout,
  285. PermitWithoutStream: true,
  286. }),
  287. )
  288. }
  289. if bf.ClientReadBufferSize >= 0 {
  290. opts = append(opts, grpc.WithReadBufferSize(bf.ClientReadBufferSize))
  291. }
  292. if bf.ClientWriteBufferSize >= 0 {
  293. opts = append(opts, grpc.WithWriteBufferSize(bf.ClientWriteBufferSize))
  294. }
  295. if bf.ServerReadBufferSize >= 0 {
  296. sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize))
  297. }
  298. if bf.ServerWriteBufferSize >= 0 {
  299. sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
  300. }
  301. sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
  302. opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
  303. var lis net.Listener
  304. if bf.UseBufConn {
  305. bcLis := bufconn.Listen(256 * 1024)
  306. lis = bcLis
  307. opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  308. return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
  309. return bcLis.Dial()
  310. })(ctx, "", "")
  311. }))
  312. } else {
  313. var err error
  314. lis, err = net.Listen("tcp", "localhost:0")
  315. if err != nil {
  316. logger.Fatalf("Failed to listen: %v", err)
  317. }
  318. opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
  319. return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
  320. }))
  321. }
  322. lis = nw.Listener(lis)
  323. stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
  324. conn := bm.NewClientConn("" /* target not used */, opts...)
  325. return testgrpc.NewBenchmarkServiceClient(conn), func() {
  326. conn.Close()
  327. stopper()
  328. }
  329. }
  330. func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
  331. tc, cleanup := makeClient(bf)
  332. return func(int) {
  333. reqSizeBytes := bf.ReqSizeBytes
  334. respSizeBytes := bf.RespSizeBytes
  335. if bf.ReqPayloadCurve != nil {
  336. reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
  337. }
  338. if bf.RespPayloadCurve != nil {
  339. respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
  340. }
  341. unaryCaller(tc, reqSizeBytes, respSizeBytes)
  342. }, cleanup
  343. }
  344. func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
  345. tc, cleanup := makeClient(bf)
  346. streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
  347. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  348. stream, err := tc.StreamingCall(context.Background())
  349. if err != nil {
  350. logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  351. }
  352. streams[i] = stream
  353. }
  354. return func(pos int) {
  355. reqSizeBytes := bf.ReqSizeBytes
  356. respSizeBytes := bf.RespSizeBytes
  357. if bf.ReqPayloadCurve != nil {
  358. reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
  359. }
  360. if bf.RespPayloadCurve != nil {
  361. respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
  362. }
  363. streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
  364. }, cleanup
  365. }
  366. func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
  367. streams, req, cleanup := setupUnconstrainedStream(bf)
  368. preparedMsg := make([]*grpc.PreparedMsg, len(streams))
  369. for i, stream := range streams {
  370. preparedMsg[i] = &grpc.PreparedMsg{}
  371. err := preparedMsg[i].Encode(stream, req)
  372. if err != nil {
  373. logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
  374. }
  375. }
  376. return func(pos int) {
  377. streams[pos].SendMsg(preparedMsg[pos])
  378. }, func(pos int) {
  379. streams[pos].Recv()
  380. }, cleanup
  381. }
  382. func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
  383. streams, req, cleanup := setupUnconstrainedStream(bf)
  384. return func(pos int) {
  385. streams[pos].Send(req)
  386. }, func(pos int) {
  387. streams[pos].Recv()
  388. }, cleanup
  389. }
  390. func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
  391. tc, cleanup := makeClient(bf)
  392. streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
  393. md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
  394. ctx := metadata.NewOutgoingContext(context.Background(), md)
  395. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  396. stream, err := tc.StreamingCall(ctx)
  397. if err != nil {
  398. logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
  399. }
  400. streams[i] = stream
  401. }
  402. pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
  403. req := &testpb.SimpleRequest{
  404. ResponseType: pl.Type,
  405. ResponseSize: int32(bf.RespSizeBytes),
  406. Payload: pl,
  407. }
  408. return streams, req, cleanup
  409. }
  410. // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
  411. // request and response sizes.
  412. func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
  413. if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
  414. logger.Fatalf("DoUnaryCall failed: %v", err)
  415. }
  416. }
  417. func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
  418. if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
  419. logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
  420. }
  421. }
  422. func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
  423. // Warm up connection.
  424. for i := 0; i < warmupCallCount; i++ {
  425. caller(0)
  426. }
  427. // Run benchmark.
  428. start(mode, bf)
  429. var wg sync.WaitGroup
  430. wg.Add(bf.MaxConcurrentCalls)
  431. bmEnd := time.Now().Add(bf.BenchTime)
  432. var count uint64
  433. for i := 0; i < bf.MaxConcurrentCalls; i++ {
  434. go func(pos int) {
  435. defer wg.Done()
  436. for {
  437. t := time.Now()
  438. if t.After(bmEnd) {
  439. return
  440. }
  441. start := time.Now()
  442. caller(pos)
  443. elapse := time.Since(start)
  444. atomic.AddUint64(&count, 1)
  445. s.AddDuration(elapse)
  446. }
  447. }(i)
  448. }
  449. wg.Wait()
  450. stop(count)
  451. }
  452. // benchOpts represents all configurable options available while running this
  453. // benchmark. This is built from the values passed as flags.
  454. type benchOpts struct {
  455. rModes runModes
  456. benchTime time.Duration
  457. memProfileRate int
  458. memProfile string
  459. cpuProfile string
  460. networkMode string
  461. benchmarkResultFile string
  462. useBufconn bool
  463. enableKeepalive bool
  464. features *featureOpts
  465. }
  466. // featureOpts represents options which can have multiple values. The user
  467. // usually provides a comma-separated list of options for each of these
  468. // features through command line flags. We generate all possible combinations
  469. // for the provided values and run the benchmarks for each combination.
  470. type featureOpts struct {
  471. enableTrace []bool
  472. readLatencies []time.Duration
  473. readKbps []int
  474. readMTU []int
  475. maxConcurrentCalls []int
  476. reqSizeBytes []int
  477. respSizeBytes []int
  478. reqPayloadCurves []*stats.PayloadCurve
  479. respPayloadCurves []*stats.PayloadCurve
  480. compModes []string
  481. enableChannelz []bool
  482. enablePreloader []bool
  483. clientReadBufferSize []int
  484. clientWriteBufferSize []int
  485. serverReadBufferSize []int
  486. serverWriteBufferSize []int
  487. }
  488. // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
  489. // element of the slice (indexed by 'featuresIndex' enum) contains the number
  490. // of features to be exercised by the benchmark code.
  491. // For example: Index 0 of the returned slice contains the number of values for
  492. // enableTrace feature, while index 1 contains the number of value of
  493. // readLatencies feature and so on.
  494. func makeFeaturesNum(b *benchOpts) []int {
  495. featuresNum := make([]int, stats.MaxFeatureIndex)
  496. for i := 0; i < len(featuresNum); i++ {
  497. switch stats.FeatureIndex(i) {
  498. case stats.EnableTraceIndex:
  499. featuresNum[i] = len(b.features.enableTrace)
  500. case stats.ReadLatenciesIndex:
  501. featuresNum[i] = len(b.features.readLatencies)
  502. case stats.ReadKbpsIndex:
  503. featuresNum[i] = len(b.features.readKbps)
  504. case stats.ReadMTUIndex:
  505. featuresNum[i] = len(b.features.readMTU)
  506. case stats.MaxConcurrentCallsIndex:
  507. featuresNum[i] = len(b.features.maxConcurrentCalls)
  508. case stats.ReqSizeBytesIndex:
  509. featuresNum[i] = len(b.features.reqSizeBytes)
  510. case stats.RespSizeBytesIndex:
  511. featuresNum[i] = len(b.features.respSizeBytes)
  512. case stats.ReqPayloadCurveIndex:
  513. featuresNum[i] = len(b.features.reqPayloadCurves)
  514. case stats.RespPayloadCurveIndex:
  515. featuresNum[i] = len(b.features.respPayloadCurves)
  516. case stats.CompModesIndex:
  517. featuresNum[i] = len(b.features.compModes)
  518. case stats.EnableChannelzIndex:
  519. featuresNum[i] = len(b.features.enableChannelz)
  520. case stats.EnablePreloaderIndex:
  521. featuresNum[i] = len(b.features.enablePreloader)
  522. case stats.ClientReadBufferSize:
  523. featuresNum[i] = len(b.features.clientReadBufferSize)
  524. case stats.ClientWriteBufferSize:
  525. featuresNum[i] = len(b.features.clientWriteBufferSize)
  526. case stats.ServerReadBufferSize:
  527. featuresNum[i] = len(b.features.serverReadBufferSize)
  528. case stats.ServerWriteBufferSize:
  529. featuresNum[i] = len(b.features.serverWriteBufferSize)
  530. default:
  531. log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
  532. }
  533. }
  534. return featuresNum
  535. }
  536. // sharedFeatures returns a bool slice which acts as a bitmask. Each item in
  537. // the slice represents a feature, indexed by 'featureIndex' enum. The bit is
  538. // set to 1 if the corresponding feature does not have multiple value, so is
  539. // shared amongst all benchmarks.
  540. func sharedFeatures(featuresNum []int) []bool {
  541. result := make([]bool, len(featuresNum))
  542. for i, num := range featuresNum {
  543. if num <= 1 {
  544. result[i] = true
  545. }
  546. }
  547. return result
  548. }
  549. // generateFeatures generates all combinations of the provided feature options.
  550. // While all the feature options are stored in the benchOpts struct, the input
  551. // parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
  552. // the number of values for each feature.
  553. // For example, let's say the user sets -workloads=all and
  554. // -maxConcurrentCalls=1,100, this would end up with the following
  555. // combinations:
  556. // [workloads: unary, maxConcurrentCalls=1]
  557. // [workloads: unary, maxConcurrentCalls=1]
  558. // [workloads: streaming, maxConcurrentCalls=100]
  559. // [workloads: streaming, maxConcurrentCalls=100]
  560. // [workloads: unconstrained, maxConcurrentCalls=1]
  561. // [workloads: unconstrained, maxConcurrentCalls=100]
  562. func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
  563. // curPos and initialPos are two slices where each value acts as an index
  564. // into the appropriate feature slice maintained in benchOpts.features. This
  565. // loop generates all possible combinations of features by changing one value
  566. // at a time, and once curPos becomes equal to initialPos, we have explored
  567. // all options.
  568. var result []stats.Features
  569. var curPos []int
  570. initialPos := make([]int, stats.MaxFeatureIndex)
  571. for !reflect.DeepEqual(initialPos, curPos) {
  572. if curPos == nil {
  573. curPos = make([]int, stats.MaxFeatureIndex)
  574. }
  575. f := stats.Features{
  576. // These features stay the same for each iteration.
  577. NetworkMode: b.networkMode,
  578. UseBufConn: b.useBufconn,
  579. EnableKeepalive: b.enableKeepalive,
  580. BenchTime: b.benchTime,
  581. // These features can potentially change for each iteration.
  582. EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
  583. Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
  584. Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
  585. MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
  586. MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
  587. ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
  588. EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
  589. EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
  590. ClientReadBufferSize: b.features.clientReadBufferSize[curPos[stats.ClientReadBufferSize]],
  591. ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
  592. ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
  593. ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
  594. }
  595. if len(b.features.reqPayloadCurves) == 0 {
  596. f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
  597. } else {
  598. f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
  599. }
  600. if len(b.features.respPayloadCurves) == 0 {
  601. f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
  602. } else {
  603. f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
  604. }
  605. result = append(result, f)
  606. addOne(curPos, featuresNum)
  607. }
  608. return result
  609. }
  610. // addOne mutates the input slice 'features' by changing one feature, thus
  611. // arriving at the next combination of feature values. 'featuresMaxPosition'
  612. // provides the numbers of allowed values for each feature, indexed by
  613. // 'featureIndex' enum.
  614. func addOne(features []int, featuresMaxPosition []int) {
  615. for i := len(features) - 1; i >= 0; i-- {
  616. if featuresMaxPosition[i] == 0 {
  617. continue
  618. }
  619. features[i] = (features[i] + 1)
  620. if features[i]/featuresMaxPosition[i] == 0 {
  621. break
  622. }
  623. features[i] = features[i] % featuresMaxPosition[i]
  624. }
  625. }
  626. // processFlags reads the command line flags and builds benchOpts. Specifying
  627. // invalid values for certain flags will cause flag.Parse() to fail, and the
  628. // program to terminate.
  629. // This *SHOULD* be the only place where the flags are accessed. All other
  630. // parts of the benchmark code should rely on the returned benchOpts.
  631. func processFlags() *benchOpts {
  632. flag.Parse()
  633. if flag.NArg() != 0 {
  634. log.Fatal("Error: unparsed arguments: ", flag.Args())
  635. }
  636. opts := &benchOpts{
  637. rModes: runModesFromWorkloads(*workloads),
  638. benchTime: *benchTime,
  639. memProfileRate: *memProfileRate,
  640. memProfile: *memProfile,
  641. cpuProfile: *cpuProfile,
  642. networkMode: *networkMode,
  643. benchmarkResultFile: *benchmarkResultFile,
  644. useBufconn: *useBufconn,
  645. enableKeepalive: *enableKeepalive,
  646. features: &featureOpts{
  647. enableTrace: setToggleMode(*traceMode),
  648. readLatencies: append([]time.Duration(nil), *readLatency...),
  649. readKbps: append([]int(nil), *readKbps...),
  650. readMTU: append([]int(nil), *readMTU...),
  651. maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
  652. reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
  653. respSizeBytes: append([]int(nil), *readRespSizeBytes...),
  654. compModes: setCompressorMode(*compressorMode),
  655. enableChannelz: setToggleMode(*channelzOn),
  656. enablePreloader: setToggleMode(*preloaderMode),
  657. clientReadBufferSize: append([]int(nil), *clientReadBufferSize...),
  658. clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
  659. serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
  660. serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
  661. },
  662. }
  663. if len(*reqPayloadCurveFiles) == 0 {
  664. if len(opts.features.reqSizeBytes) == 0 {
  665. opts.features.reqSizeBytes = defaultReqSizeBytes
  666. }
  667. } else {
  668. if len(opts.features.reqSizeBytes) != 0 {
  669. log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
  670. }
  671. for _, file := range *reqPayloadCurveFiles {
  672. pc, err := stats.NewPayloadCurve(file)
  673. if err != nil {
  674. log.Fatalf("cannot load payload curve file %s: %v", file, err)
  675. }
  676. opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
  677. }
  678. opts.features.reqSizeBytes = nil
  679. }
  680. if len(*respPayloadCurveFiles) == 0 {
  681. if len(opts.features.respSizeBytes) == 0 {
  682. opts.features.respSizeBytes = defaultRespSizeBytes
  683. }
  684. } else {
  685. if len(opts.features.respSizeBytes) != 0 {
  686. log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
  687. }
  688. for _, file := range *respPayloadCurveFiles {
  689. pc, err := stats.NewPayloadCurve(file)
  690. if err != nil {
  691. log.Fatalf("cannot load payload curve file %s: %v", file, err)
  692. }
  693. opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
  694. }
  695. opts.features.respSizeBytes = nil
  696. }
  697. // Re-write latency, kpbs and mtu if network mode is set.
  698. if network, ok := networks[opts.networkMode]; ok {
  699. opts.features.readLatencies = []time.Duration{network.Latency}
  700. opts.features.readKbps = []int{network.Kbps}
  701. opts.features.readMTU = []int{network.MTU}
  702. }
  703. return opts
  704. }
  705. func setToggleMode(val string) []bool {
  706. switch val {
  707. case toggleModeOn:
  708. return []bool{true}
  709. case toggleModeOff:
  710. return []bool{false}
  711. case toggleModeBoth:
  712. return []bool{false, true}
  713. default:
  714. // This should never happen because a wrong value passed to this flag would
  715. // be caught during flag.Parse().
  716. return []bool{}
  717. }
  718. }
  719. func setCompressorMode(val string) []string {
  720. switch val {
  721. case compModeNop, compModeGzip, compModeOff:
  722. return []string{val}
  723. case compModeAll:
  724. return []string{compModeNop, compModeGzip, compModeOff}
  725. default:
  726. // This should never happen because a wrong value passed to this flag would
  727. // be caught during flag.Parse().
  728. return []string{}
  729. }
  730. }
  731. func main() {
  732. opts := processFlags()
  733. before(opts)
  734. s := stats.NewStats(numStatsBuckets)
  735. featuresNum := makeFeaturesNum(opts)
  736. sf := sharedFeatures(featuresNum)
  737. var (
  738. start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
  739. stop = func(count uint64) { s.EndRun(count) }
  740. ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
  741. )
  742. for _, bf := range opts.generateFeatures(featuresNum) {
  743. grpc.EnableTracing = bf.EnableTrace
  744. if bf.EnableChannelz {
  745. channelz.TurnOn()
  746. }
  747. if opts.rModes.unary {
  748. unaryBenchmark(start, stop, bf, s)
  749. }
  750. if opts.rModes.streaming {
  751. streamBenchmark(start, stop, bf, s)
  752. }
  753. if opts.rModes.unconstrained {
  754. unconstrainedStreamBenchmark(start, ucStop, bf)
  755. }
  756. }
  757. after(opts, s.GetResults())
  758. }
  759. func before(opts *benchOpts) {
  760. if opts.memProfile != "" {
  761. runtime.MemProfileRate = opts.memProfileRate
  762. }
  763. if opts.cpuProfile != "" {
  764. f, err := os.Create(opts.cpuProfile)
  765. if err != nil {
  766. fmt.Fprintf(os.Stderr, "testing: %s\n", err)
  767. return
  768. }
  769. if err := pprof.StartCPUProfile(f); err != nil {
  770. fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
  771. f.Close()
  772. return
  773. }
  774. }
  775. }
  776. func after(opts *benchOpts, data []stats.BenchResults) {
  777. if opts.cpuProfile != "" {
  778. pprof.StopCPUProfile() // flushes profile to disk
  779. }
  780. if opts.memProfile != "" {
  781. f, err := os.Create(opts.memProfile)
  782. if err != nil {
  783. fmt.Fprintf(os.Stderr, "testing: %s\n", err)
  784. os.Exit(2)
  785. }
  786. runtime.GC() // materialize all statistics
  787. if err = pprof.WriteHeapProfile(f); err != nil {
  788. fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
  789. os.Exit(2)
  790. }
  791. f.Close()
  792. }
  793. if opts.benchmarkResultFile != "" {
  794. f, err := os.Create(opts.benchmarkResultFile)
  795. if err != nil {
  796. log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
  797. }
  798. dataEncoder := gob.NewEncoder(f)
  799. dataEncoder.Encode(data)
  800. f.Close()
  801. }
  802. }
  803. // nopCompressor is a compressor that just copies data.
  804. type nopCompressor struct{}
  805. func (nopCompressor) Do(w io.Writer, p []byte) error {
  806. n, err := w.Write(p)
  807. if err != nil {
  808. return err
  809. }
  810. if n != len(p) {
  811. return fmt.Errorf("nopCompressor.Write: wrote %d bytes; want %d", n, len(p))
  812. }
  813. return nil
  814. }
  815. func (nopCompressor) Type() string { return compModeNop }
  816. // nopDecompressor is a decompressor that just copies data.
  817. type nopDecompressor struct{}
  818. func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return io.ReadAll(r) }
  819. func (nopDecompressor) Type() string { return compModeNop }