#pragma once #include "log_histogram_snapshot.h" #include #include #include #include #include namespace NMonitoring { class TLogHistogramCollector { public: static constexpr int DEFAULT_START_POWER = -1; explicit TLogHistogramCollector(int startPower = DEFAULT_START_POWER) : StartPower_(startPower) , CountZero_(0u) {} void Collect(TLogHistogramSnapshot* logHist) { std::lock_guard guard(Mutex_); Merge(logHist); } bool Collect(double value) { std::lock_guard guard(Mutex_); return CollectDouble(value); } TLogHistogramSnapshotPtr Snapshot() const { std::lock_guard guard(Mutex_); return MakeIntrusive(BASE, CountZero_, StartPower_, Buckets_); } void AddZeros(ui64 zerosCount) noexcept { std::lock_guard guard(Mutex_); CountZero_ += zerosCount; } private: int StartPower_; ui64 CountZero_; TVector Buckets_; mutable std::mutex Mutex_; static constexpr size_t MAX_BUCKETS = LOG_HIST_MAX_BUCKETS; static constexpr double BASE = 1.5; private: int EstimateBucketIndex(double value) const { return (int) (std::floor(std::log(value) / std::log(BASE)) - StartPower_); } void CollectPositiveDouble(double value) { ssize_t idx = std::floor(std::log(value) / std::log(BASE)) - StartPower_; if (idx >= Buckets_.ysize()) { idx = ExtendUp(idx); } else if (idx <= 0) { idx = Max(0, ExtendDown(idx, 1)); } ++Buckets_[idx]; } bool CollectDouble(double value) { if (Y_UNLIKELY(std::isnan(value) || std::isinf(value))) { return false; } if (value <= 0.0) { ++CountZero_; } else { CollectPositiveDouble(value); } return true; } void Merge(TLogHistogramSnapshot* logHist) { CountZero_ += logHist->ZerosCount(); const i32 firstIdxBeforeExtend = logHist->StartPower() - StartPower_; const i32 lastIdxBeforeExtend = firstIdxBeforeExtend + logHist->Count() - 1; if (firstIdxBeforeExtend > Max() || firstIdxBeforeExtend < Min()) { ythrow yexception() << "i16 overflow on first index"; } if (lastIdxBeforeExtend > Max() || lastIdxBeforeExtend < Min()) { ythrow yexception() << "i16 overflow on last index"; } i64 firstIdx = ExtendBounds(firstIdxBeforeExtend, lastIdxBeforeExtend, 0).first; size_t toMerge = std::min(std::max(-firstIdx, (i64) 0), logHist->Count()); if (toMerge) { for (size_t i = 0; i < toMerge; ++i) { Buckets_[0] += logHist->Bucket(i); } firstIdx = 0; } for (size_t i = toMerge; i != logHist->Count(); ++i) { Buckets_[firstIdx] += logHist->Bucket(i); ++firstIdx; } } int ExtendUp(int expectedIndex) { Y_DEBUG_ABORT_UNLESS(expectedIndex >= (int) Buckets_.size()); const size_t toAdd = expectedIndex - Buckets_.size() + 1; const size_t newSize = Buckets_.size() + toAdd; if (newSize <= MAX_BUCKETS) { Buckets_.resize(newSize, 0.0); return expectedIndex; } const size_t toRemove = newSize - MAX_BUCKETS; const size_t actualToRemove = std::min(toRemove, Buckets_.size()); if (actualToRemove > 0) { const double firstWeight = std::accumulate(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove, 0.0); Buckets_.erase(Buckets_.cbegin(), Buckets_.cbegin() + actualToRemove); if (Buckets_.empty()) { Buckets_.push_back(firstWeight); } else { Buckets_[0] = firstWeight; } } Buckets_.resize(MAX_BUCKETS, 0.0); StartPower_ += toRemove; return expectedIndex - toRemove; } int ExtendDown(int expectedIndex, int margin) { Y_DEBUG_ABORT_UNLESS(expectedIndex <= 0); int toAdd = std::min(MAX_BUCKETS - Buckets_.size(), margin - expectedIndex); if (toAdd > 0) { Buckets_.insert(Buckets_.begin(), toAdd, 0.0); StartPower_ -= toAdd; } return expectedIndex + toAdd; } std::pair ExtendBounds(ssize_t startIdx, ssize_t endIdx, ui8 margin) { ssize_t realEndIdx; ssize_t realStartIdx; if (endIdx >= Buckets_.ysize()) { Buckets_.reserve(std::max(std::min(endIdx - startIdx + 1ul, MAX_BUCKETS), 0ul)); realEndIdx = ExtendUp(endIdx); startIdx += realEndIdx - endIdx; } else { realEndIdx = endIdx; } if (startIdx < 1) { realStartIdx = ExtendDown(startIdx, margin); realEndIdx += realStartIdx - startIdx; } else { realStartIdx = startIdx; } return std::make_pair(realStartIdx, realEndIdx); } }; } // namespace NMonitoring