123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- package main
- import (
- "github.com/beorn7/perks/histogram"
- "github.com/beorn7/perks/quantile"
- "math"
- "sync"
- "time"
- )
- var quantiles = []float64{0.50, 0.75, 0.90, 0.95, 0.99, 0.999, 0.9999}
- var quantilesTarget = map[float64]float64{
- 0.50: 0.01,
- 0.75: 0.01,
- 0.90: 0.001,
- 0.95: 0.001,
- 0.99: 0.001,
- 0.999: 0.0001,
- 0.9999: 0.00001,
- }
- var httpStatusSectionLabelMap = map[int]string{
- 1: "1xx",
- 2: "2xx",
- 3: "3xx",
- 4: "4xx",
- 5: "5xx",
- }
- type Stats struct {
- count int64
- sum float64
- sumSq float64
- min float64
- max float64
- }
- func (s *Stats) Update(v float64) {
- s.count++
- s.sum += v
- s.sumSq += v * v
- if v < s.min || s.count == 1 {
- s.min = v
- }
- if v > s.max || s.count == 1 {
- s.max = v
- }
- }
- func (s *Stats) Stddev() float64 {
- num := (float64(s.count) * s.sumSq) - math.Pow(s.sum, 2)
- div := float64(s.count * (s.count - 1))
- if div == 0 {
- return 0
- }
- return math.Sqrt(num / div)
- }
- func (s *Stats) Mean() float64 {
- if s.count == 0 {
- return 0
- }
- return s.sum / float64(s.count)
- }
- func (s *Stats) Reset() {
- s.count = 0
- s.sum = 0
- s.sumSq = 0
- s.min = 0
- s.max = 0
- }
- type StreamReport struct {
- lock sync.Mutex
- latencyStats *Stats
- rpsStats *Stats
- latencyQuantile *quantile.Stream
- latencyHistogram *histogram.Histogram
- codes map[int]int64
- errors map[string]int64
- latencyWithinSec *Stats
- rpsWithinSec float64
- noDateWithinSec bool
- readBytes int64
- writeBytes int64
- doneChan chan struct{}
- }
- func NewStreamReport() *StreamReport {
- return &StreamReport{
- latencyQuantile: quantile.NewTargeted(quantilesTarget),
- latencyHistogram: histogram.New(8),
- codes: make(map[int]int64, 1),
- errors: make(map[string]int64, 1),
- doneChan: make(chan struct{}, 1),
- latencyStats: &Stats{},
- rpsStats: &Stats{},
- latencyWithinSec: &Stats{},
- }
- }
- func (s *StreamReport) insert(v float64) {
- s.latencyQuantile.Insert(v)
- s.latencyHistogram.Insert(v)
- s.latencyStats.Update(v)
- }
- func (s *StreamReport) Collect(records <-chan *ReportRecord) {
- latencyWithinSecTemp := &Stats{}
- go func() {
- ticker := time.NewTicker(time.Second)
- lastCount := int64(0)
- lastTime := startTime
- for {
- select {
- case <-ticker.C:
- s.lock.Lock()
- dc := s.latencyStats.count - lastCount
- if dc > 0 {
- rps := float64(dc) / time.Since(lastTime).Seconds()
- s.rpsStats.Update(rps)
- lastCount = s.latencyStats.count
- lastTime = time.Now()
- *s.latencyWithinSec = *latencyWithinSecTemp
- s.rpsWithinSec = rps
- latencyWithinSecTemp.Reset()
- s.noDateWithinSec = false
- } else {
- s.noDateWithinSec = true
- }
- s.lock.Unlock()
- case <-s.doneChan:
- return
- }
- }
- }()
- for {
- r, ok := <-records
- if !ok {
- close(s.doneChan)
- break
- }
- s.lock.Lock()
- latencyWithinSecTemp.Update(float64(r.cost))
- s.insert(float64(r.cost))
- if r.code != 0 {
- s.codes[r.code]++
- }
- if r.error != "" {
- s.errors[r.error]++
- }
- s.readBytes = r.readBytes
- s.writeBytes = r.writeBytes
- s.lock.Unlock()
- recordPool.Put(r)
- }
- }
- type SnapshotReport struct {
- Elapsed time.Duration
- Count int64
- Codes map[string]int64
- Errors map[string]int64
- RPS float64
- ReadThroughput float64
- WriteThroughput float64
- Stats *struct {
- Min time.Duration
- Mean time.Duration
- StdDev time.Duration
- Max time.Duration
- }
- RpsStats *struct {
- Min float64
- Mean float64
- StdDev float64
- Max float64
- }
- Percentiles []*struct {
- Percentile float64
- Latency time.Duration
- }
- Histograms []*struct {
- Mean time.Duration
- Count int
- }
- }
- func (s *StreamReport) Snapshot() *SnapshotReport {
- s.lock.Lock()
- rs := &SnapshotReport{
- Elapsed: time.Since(startTime),
- Count: s.latencyStats.count,
- Stats: &struct {
- Min time.Duration
- Mean time.Duration
- StdDev time.Duration
- Max time.Duration
- }{time.Duration(s.latencyStats.min), time.Duration(s.latencyStats.Mean()),
- time.Duration(s.latencyStats.Stddev()), time.Duration(s.latencyStats.max)},
- }
- if s.rpsStats.count > 0 {
- rs.RpsStats = &struct {
- Min float64
- Mean float64
- StdDev float64
- Max float64
- }{s.rpsStats.min, s.rpsStats.Mean(),
- s.rpsStats.Stddev(), s.rpsStats.max}
- }
- elapseInSec := rs.Elapsed.Seconds()
- rs.RPS = float64(rs.Count) / elapseInSec
- rs.ReadThroughput = float64(s.readBytes) / 1024.0 / 1024.0 / elapseInSec
- rs.WriteThroughput = float64(s.writeBytes) / 1024.0 / 1024.0 / elapseInSec
- rs.Codes = make(map[string]int64, len(s.codes))
- for k, v := range s.codes {
- section := k / 100
- rs.Codes[httpStatusSectionLabelMap[section]] = v
- }
- rs.Errors = make(map[string]int64, len(s.errors))
- for k, v := range s.errors {
- rs.Errors[k] = v
- }
- rs.Percentiles = make([]*struct {
- Percentile float64
- Latency time.Duration
- }, len(quantiles))
- for i, p := range quantiles {
- rs.Percentiles[i] = &struct {
- Percentile float64
- Latency time.Duration
- }{p, time.Duration(s.latencyQuantile.Query(p))}
- }
- hisBins := s.latencyHistogram.Bins()
- rs.Histograms = make([]*struct {
- Mean time.Duration
- Count int
- }, len(hisBins))
- for i, b := range hisBins {
- rs.Histograms[i] = &struct {
- Mean time.Duration
- Count int
- }{time.Duration(b.Mean()), b.Count}
- }
- s.lock.Unlock()
- return rs
- }
- func (s *StreamReport) Done() <-chan struct{} {
- return s.doneChan
- }
- type ChartsReport struct {
- RPS float64
- Latency Stats
- CodeMap map[int]int64
- }
- func (s *StreamReport) Charts() *ChartsReport {
- s.lock.Lock()
- var cr *ChartsReport
- if s.noDateWithinSec {
- cr = nil
- } else {
- cr = &ChartsReport{
- RPS: s.rpsWithinSec,
- Latency: *s.latencyWithinSec,
- CodeMap: s.codes,
- }
- }
- s.lock.Unlock()
- return cr
- }
|