123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #pragma once
- #include "log_histogram_snapshot.h"
- #include <util/generic/algorithm.h>
- #include <util/generic/utility.h>
- #include <util/generic/yexception.h>
- #include <mutex>
- #include <cmath>
- namespace NMonitoring {
- class TLogHistogramCollector {
- public:
- static constexpr int DEFAULT_START_POWER = -1;
- explicit TLogHistogramCollector(int startPower = DEFAULT_START_POWER)
- : StartPower_(startPower)
- , CountZero_(0u)
- {}
- void Collect(TLogHistogramSnapshot* logHist) {
- std::lock_guard guard(Mutex_);
- Merge(logHist);
- }
- bool Collect(double value) {
- std::lock_guard guard(Mutex_);
- return CollectDouble(value);
- }
- TLogHistogramSnapshotPtr Snapshot() const {
- std::lock_guard guard(Mutex_);
- return MakeIntrusive<TLogHistogramSnapshot>(BASE, CountZero_, StartPower_, Buckets_);
- }
- void AddZeros(ui64 zerosCount) noexcept {
- std::lock_guard guard(Mutex_);
- CountZero_ += zerosCount;
- }
- private:
- int StartPower_;
- ui64 CountZero_;
- TVector<double> Buckets_;
- mutable std::mutex Mutex_;
- static constexpr size_t MAX_BUCKETS = LOG_HIST_MAX_BUCKETS;
- static constexpr double BASE = 1.5;
- private:
- int EstimateBucketIndex(double value) const {
- return (int) (std::floor(std::log(value) / std::log(BASE)) - StartPower_);
- }
- void CollectPositiveDouble(double value) {
- ssize_t idx = std::floor(std::log(value) / std::log(BASE)) - StartPower_;
- if (idx >= Buckets_.ysize()) {
- idx = ExtendUp(idx);
- } else if (idx <= 0) {
- idx = Max<ssize_t>(0, ExtendDown(idx, 1));
- }
- ++Buckets_[idx];
- }
- bool CollectDouble(double value) {
- if (Y_UNLIKELY(std::isnan(value) || std::isinf(value))) {
- return false;
- }
- if (value <= 0.0) {
- ++CountZero_;
- } else {
- CollectPositiveDouble(value);
- }
- return true;
- }
- void Merge(TLogHistogramSnapshot* logHist) {
- CountZero_ += logHist->ZerosCount();
- const i32 firstIdxBeforeExtend = logHist->StartPower() - StartPower_;
- const i32 lastIdxBeforeExtend = firstIdxBeforeExtend + logHist->Count() - 1;
- if (firstIdxBeforeExtend > Max<i16>() || firstIdxBeforeExtend < Min<i16>()) {
- ythrow yexception() << "i16 overflow on first index";
- }
- if (lastIdxBeforeExtend > Max<i16>() || lastIdxBeforeExtend < Min<i16>()) {
- ythrow yexception() << "i16 overflow on last index";
- }
- i64 firstIdx = ExtendBounds(firstIdxBeforeExtend, lastIdxBeforeExtend, 0).first;
- size_t toMerge = std::min<ui32>(std::max<i64>(-firstIdx, (i64) 0), logHist->Count());
- if (toMerge) {
- for (size_t i = 0; i < toMerge; ++i) {
- Buckets_[0] += logHist->Bucket(i);
- }
- firstIdx = 0;
- }
- for (size_t i = toMerge; i != logHist->Count(); ++i) {
- Buckets_[firstIdx] += logHist->Bucket(i);
- ++firstIdx;
- }
- }
- int ExtendUp(int expectedIndex) {
- Y_DEBUG_ABORT_UNLESS(expectedIndex >= (int) Buckets_.size());
- const size_t toAdd = expectedIndex - Buckets_.size() + 1;
- const size_t newSize = Buckets_.size() + toAdd;
- if (newSize <= MAX_BUCKETS) {
- Buckets_.resize(newSize, 0.0);
- return expectedIndex;
- }
- const size_t toRemove = newSize - MAX_BUCKETS;
- const size_t actualToRemove = std::min<size_t>(toRemove, Buckets_.size());
- if (actualToRemove > 0) {
- const double firstWeight = std::accumulate(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove, 0.0);
- Buckets_.erase(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove);
- if (Buckets_.empty()) {
- Buckets_.push_back(firstWeight);
- } else {
- Buckets_[0] = firstWeight;
- }
- }
- Buckets_.resize(MAX_BUCKETS, 0.0);
- StartPower_ += toRemove;
- return expectedIndex - toRemove;
- }
- int ExtendDown(int expectedIndex, int margin) {
- Y_DEBUG_ABORT_UNLESS(expectedIndex <= 0);
- int toAdd = std::min<int>(MAX_BUCKETS - Buckets_.size(), margin - expectedIndex);
- if (toAdd > 0) {
- Buckets_.insert(Buckets_.begin(), toAdd, 0.0);
- StartPower_ -= toAdd;
- }
- return expectedIndex + toAdd;
- }
- std::pair<ssize_t, ssize_t> ExtendBounds(ssize_t startIdx, ssize_t endIdx, ui8 margin) {
- ssize_t realEndIdx;
- ssize_t realStartIdx;
- if (endIdx >= Buckets_.ysize()) {
- Buckets_.reserve(std::max<size_t>(std::min<ui32>(endIdx - startIdx + 1ul, MAX_BUCKETS), 0ul));
- realEndIdx = ExtendUp(endIdx);
- startIdx += realEndIdx - endIdx;
- } else {
- realEndIdx = endIdx;
- }
- if (startIdx < 1) {
- realStartIdx = ExtendDown(startIdx, margin);
- realEndIdx += realStartIdx - startIdx;
- } else {
- realStartIdx = startIdx;
- }
- return std::make_pair(realStartIdx, realEndIdx);
- }
- };
- } // namespace NMonitoring
|