report.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package main
  2. import (
  3. "github.com/beorn7/perks/histogram"
  4. "github.com/beorn7/perks/quantile"
  5. "math"
  6. "sync"
  7. "time"
  8. )
  9. var quantiles = []float64{0.50, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}
  10. var quantilesTarget = map[float64]float64{
  11. 0.50: 0.01,
  12. 0.75: 0.01,
  13. 0.90: 0.001,
  14. 0.95: 0.001,
  15. 0.99: 0.001,
  16. 0.999: 0.0001,
  17. 0.9999: 0.00001,
  18. }
  19. var httpStatusSectionLabelMap = map[int]string{
  20. 1: "1xx",
  21. 2: "2xx",
  22. 3: "3xx",
  23. 4: "4xx",
  24. 5: "5xx",
  25. }
  26. type Stats struct {
  27. count int64
  28. sum float64
  29. sumSq float64
  30. min float64
  31. max float64
  32. }
  33. func (s *Stats) Update(v float64) {
  34. s.count++
  35. s.sum += v
  36. s.sumSq += v * v
  37. if v < s.min || s.count == 1 {
  38. s.min = v
  39. }
  40. if v > s.max || s.count == 1 {
  41. s.max = v
  42. }
  43. }
  44. func (s *Stats) Stddev() float64 {
  45. num := (float64(s.count) * s.sumSq) - math.Pow(s.sum, 2)
  46. div := float64(s.count * (s.count - 1))
  47. if div == 0 {
  48. return 0
  49. }
  50. return math.Sqrt(num / div)
  51. }
  52. func (s *Stats) Mean() float64 {
  53. if s.count == 0 {
  54. return 0
  55. }
  56. return s.sum / float64(s.count)
  57. }
  58. func (s *Stats) Reset() {
  59. s.count = 0
  60. s.sum = 0
  61. s.sumSq = 0
  62. s.min = 0
  63. s.max = 0
  64. }
  65. type StreamReport struct {
  66. lock sync.Mutex
  67. latencyStats *Stats
  68. rpsStats *Stats
  69. latencyQuantile *quantile.Stream
  70. latencyHistogram *histogram.Histogram
  71. codes map[int]int64
  72. errors map[string]int64
  73. latencyWithinSec *Stats
  74. rpsWithinSec float64
  75. noDateWithinSec bool
  76. readBytes int64
  77. writeBytes int64
  78. doneChan chan struct{}
  79. }
  80. func NewStreamReport() *StreamReport {
  81. return &StreamReport{
  82. latencyQuantile: quantile.NewTargeted(quantilesTarget),
  83. latencyHistogram: histogram.New(8),
  84. codes: make(map[int]int64, 1),
  85. errors: make(map[string]int64, 1),
  86. doneChan: make(chan struct{}, 1),
  87. latencyStats: &Stats{},
  88. rpsStats: &Stats{},
  89. latencyWithinSec: &Stats{},
  90. }
  91. }
  92. func (s *StreamReport) insert(v float64) {
  93. s.latencyQuantile.Insert(v)
  94. s.latencyHistogram.Insert(v)
  95. s.latencyStats.Update(v)
  96. }
  97. func (s *StreamReport) Collect(records <-chan *ReportRecord) {
  98. latencyWithinSecTemp := &Stats{}
  99. go func() {
  100. ticker := time.NewTicker(time.Second)
  101. lastCount := int64(0)
  102. lastTime := startTime
  103. for {
  104. select {
  105. case <-ticker.C:
  106. s.lock.Lock()
  107. dc := s.latencyStats.count - lastCount
  108. if dc > 0 {
  109. rps := float64(dc) / time.Since(lastTime).Seconds()
  110. s.rpsStats.Update(rps)
  111. lastCount = s.latencyStats.count
  112. lastTime = time.Now()
  113. *s.latencyWithinSec = *latencyWithinSecTemp
  114. s.rpsWithinSec = rps
  115. latencyWithinSecTemp.Reset()
  116. s.noDateWithinSec = false
  117. } else {
  118. s.noDateWithinSec = true
  119. }
  120. s.lock.Unlock()
  121. case <-s.doneChan:
  122. return
  123. }
  124. }
  125. }()
  126. for {
  127. r, ok := <-records
  128. if !ok {
  129. close(s.doneChan)
  130. break
  131. }
  132. s.lock.Lock()
  133. latencyWithinSecTemp.Update(float64(r.cost))
  134. s.insert(float64(r.cost))
  135. if r.code != 0 {
  136. s.codes[r.code]++
  137. }
  138. if r.error != "" {
  139. s.errors[r.error]++
  140. }
  141. s.readBytes = r.readBytes
  142. s.writeBytes = r.writeBytes
  143. s.lock.Unlock()
  144. recordPool.Put(r)
  145. }
  146. }
  147. type SnapshotReport struct {
  148. Elapsed time.Duration
  149. Count int64
  150. Codes map[string]int64
  151. Errors map[string]int64
  152. RPS float64
  153. ReadThroughput float64
  154. WriteThroughput float64
  155. Stats *struct {
  156. Min time.Duration
  157. Mean time.Duration
  158. StdDev time.Duration
  159. Max time.Duration
  160. }
  161. RpsStats *struct {
  162. Min float64
  163. Mean float64
  164. StdDev float64
  165. Max float64
  166. }
  167. Percentiles []*struct {
  168. Percentile float64
  169. Latency time.Duration
  170. }
  171. Histograms []*struct {
  172. Mean time.Duration
  173. Count int
  174. }
  175. }
  176. func (s *StreamReport) Snapshot() *SnapshotReport {
  177. s.lock.Lock()
  178. rs := &SnapshotReport{
  179. Elapsed: time.Since(startTime),
  180. Count: s.latencyStats.count,
  181. Stats: &struct {
  182. Min time.Duration
  183. Mean time.Duration
  184. StdDev time.Duration
  185. Max time.Duration
  186. }{time.Duration(s.latencyStats.min), time.Duration(s.latencyStats.Mean()),
  187. time.Duration(s.latencyStats.Stddev()), time.Duration(s.latencyStats.max)},
  188. }
  189. if s.rpsStats.count > 0 {
  190. rs.RpsStats = &struct {
  191. Min float64
  192. Mean float64
  193. StdDev float64
  194. Max float64
  195. }{s.rpsStats.min, s.rpsStats.Mean(),
  196. s.rpsStats.Stddev(), s.rpsStats.max}
  197. }
  198. elapseInSec := rs.Elapsed.Seconds()
  199. rs.RPS = float64(rs.Count) / elapseInSec
  200. rs.ReadThroughput = float64(s.readBytes) / 1024.0 / 1024.0 / elapseInSec
  201. rs.WriteThroughput = float64(s.writeBytes) / 1024.0 / 1024.0 / elapseInSec
  202. rs.Codes = make(map[string]int64, len(s.codes))
  203. for k, v := range s.codes {
  204. section := k / 100
  205. rs.Codes[httpStatusSectionLabelMap[section]] = v
  206. }
  207. rs.Errors = make(map[string]int64, len(s.errors))
  208. for k, v := range s.errors {
  209. rs.Errors[k] = v
  210. }
  211. rs.Percentiles = make([]*struct {
  212. Percentile float64
  213. Latency time.Duration
  214. }, len(quantiles))
  215. for i, p := range quantiles {
  216. rs.Percentiles[i] = &struct {
  217. Percentile float64
  218. Latency time.Duration
  219. }{p, time.Duration(s.latencyQuantile.Query(p))}
  220. }
  221. hisBins := s.latencyHistogram.Bins()
  222. rs.Histograms = make([]*struct {
  223. Mean time.Duration
  224. Count int
  225. }, len(hisBins))
  226. for i, b := range hisBins {
  227. rs.Histograms[i] = &struct {
  228. Mean time.Duration
  229. Count int
  230. }{time.Duration(b.Mean()), b.Count}
  231. }
  232. s.lock.Unlock()
  233. return rs
  234. }
  235. func (s *StreamReport) Done() <-chan struct{} {
  236. return s.doneChan
  237. }
  238. type ChartsReport struct {
  239. RPS float64
  240. Latency Stats
  241. CodeMap map[int]int64
  242. }
  243. func (s *StreamReport) Charts() *ChartsReport {
  244. s.lock.Lock()
  245. var cr *ChartsReport
  246. if s.noDateWithinSec {
  247. cr = nil
  248. } else {
  249. cr = &ChartsReport{
  250. RPS: s.rpsWithinSec,
  251. Latency: *s.latencyWithinSec,
  252. CodeMap: s.codes,
  253. }
  254. }
  255. s.lock.Unlock()
  256. return cr
  257. }