summary_collector.h 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. #pragma once
  2. #include "summary_snapshot.h"
  3. #include <atomic>
  4. #include <limits>
  5. #include <cmath>
  6. namespace NMonitoring {
  7. class ISummaryDoubleCollector {
  8. public:
  9. virtual ~ISummaryDoubleCollector() = default;
  10. virtual void Collect(double value) = 0;
  11. virtual ISummaryDoubleSnapshotPtr Snapshot() const = 0;
  12. virtual size_t SizeBytes() const = 0;
  13. };
  14. using ISummaryDoubleCollectorPtr = THolder<ISummaryDoubleCollector>;
  15. class TSummaryDoubleCollector final: public ISummaryDoubleCollector {
  16. public:
  17. TSummaryDoubleCollector() {
  18. Sum_.store(0, std::memory_order_relaxed);
  19. Min_.store(std::numeric_limits<double>::max(), std::memory_order_relaxed);
  20. Max_.store(std::numeric_limits<double>::lowest(), std::memory_order_relaxed);
  21. Count_.store(0, std::memory_order_relaxed);
  22. }
  23. void Collect(double value) noexcept override {
  24. if (std::isnan(value)) {
  25. return;
  26. }
  27. UpdateSum(value);
  28. UpdateMin(value);
  29. UpdateMax(value);
  30. Last_.store(value, std::memory_order_relaxed);
  31. Count_.fetch_add(1ul, std::memory_order_relaxed);
  32. }
  33. ISummaryDoubleSnapshotPtr Snapshot() const override {
  34. return new TSummaryDoubleSnapshot(
  35. Sum_.load(std::memory_order_relaxed),
  36. Min_.load(std::memory_order_relaxed),
  37. Max_.load(std::memory_order_relaxed),
  38. Last_.load(std::memory_order_relaxed),
  39. Count_.load(std::memory_order_relaxed));
  40. }
  41. size_t SizeBytes() const override {
  42. return sizeof(*this);
  43. }
  44. private:
  45. std::atomic<double> Sum_;
  46. std::atomic<double> Min_;
  47. std::atomic<double> Max_;
  48. std::atomic<double> Last_;
  49. std::atomic_uint64_t Count_;
  50. void UpdateSum(double add) noexcept {
  51. double newValue;
  52. double oldValue = Sum_.load(std::memory_order_relaxed);
  53. do {
  54. newValue = oldValue + add;
  55. } while (!Sum_.compare_exchange_weak(
  56. oldValue,
  57. newValue,
  58. std::memory_order_release,
  59. std::memory_order_consume));
  60. }
  61. void UpdateMin(double candidate) noexcept {
  62. double oldValue = Min_.load(std::memory_order_relaxed);
  63. do {
  64. if (oldValue <= candidate) {
  65. break;
  66. }
  67. } while (!Min_.compare_exchange_weak(
  68. oldValue,
  69. candidate,
  70. std::memory_order_release,
  71. std::memory_order_consume));
  72. }
  73. void UpdateMax(double candidate) noexcept {
  74. double oldValue = Max_.load(std::memory_order_relaxed);
  75. do {
  76. if (oldValue >= candidate) {
  77. break;
  78. }
  79. } while (!Max_.compare_exchange_weak(
  80. oldValue,
  81. candidate,
  82. std::memory_order_release,
  83. std::memory_order_consume));
  84. }
  85. };
  86. }