collecting_consumer.h 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #pragma once
  2. #include <library/cpp/monlib/metrics/labels.h>
  3. #include <library/cpp/monlib/metrics/metric_value.h>
  4. #include <library/cpp/monlib/metrics/metric_consumer.h>
  5. #include <util/datetime/base.h>
  6. namespace NMonitoring {
  7. // TODO(ivanzhukov@): very similar to https://nda.ya.ru/t/ST-KDJAH3W3cfn. Merge them later
  8. struct TMetricData {
  9. TMetricData()
  10. : Values{new NMonitoring::TMetricTimeSeries}
  11. {
  12. }
  13. TMetricData(NMonitoring::TLabels labels, NMonitoring::EMetricType type, THolder<NMonitoring::TMetricTimeSeries> s)
  14. : Labels{std::move(labels)}
  15. , Kind{type}
  16. , Values{std::move(s)}
  17. {
  18. }
  19. NMonitoring::TLabels Labels;
  20. // TODO(ivanzhukov@): rename to Type
  21. NMonitoring::EMetricType Kind{NMonitoring::EMetricType::UNKNOWN};
  22. THolder<NMonitoring::TMetricTimeSeries> Values;
  23. };
  24. template <typename TLabelsImpl>
  25. struct TCollectingConsumerImpl: NMonitoring::IMetricConsumer {
  26. TCollectingConsumerImpl() = default;
  27. explicit TCollectingConsumerImpl(bool doMergeCommonLabels)
  28. : DoMergeCommonLabels{doMergeCommonLabels}
  29. {}
  30. void OnStreamBegin() override {}
  31. void OnStreamEnd() override {}
  32. void OnCommonTime(TInstant time) override {
  33. CommonTime = time;
  34. }
  35. void OnMetricBegin(NMonitoring::EMetricType kind) override {
  36. auto& metric = Metrics.emplace_back();
  37. metric.Kind = kind;
  38. InsideSensor = true;
  39. }
  40. void OnMetricEnd() override {
  41. InsideSensor = false;
  42. }
  43. void OnLabelsBegin() override {}
  44. void OnLabelsEnd() override {
  45. if (DoMergeCommonLabels) {
  46. for (auto& cl : CommonLabels) {
  47. Metrics.back().Labels.Add(cl);
  48. }
  49. }
  50. }
  51. void OnLabel(TStringBuf key, TStringBuf value) override {
  52. if (InsideSensor) {
  53. Metrics.back().Labels.Add(key, value);
  54. } else {
  55. CommonLabels.Add(key, value);
  56. }
  57. }
  58. void OnDouble(TInstant time, double value) override {
  59. Metrics.back().Values->Add(time, value);
  60. }
  61. void OnInt64(TInstant time, i64 value) override {
  62. Metrics.back().Values->Add(time, value);
  63. }
  64. void OnUint64(TInstant time, ui64 value) override {
  65. Metrics.back().Values->Add(time, value);
  66. }
  67. void OnHistogram(TInstant time, NMonitoring::IHistogramSnapshotPtr snapshot) override {
  68. auto& val = Metrics.back().Values;
  69. val->Add(time, snapshot.Get());
  70. }
  71. void OnSummaryDouble(TInstant time, NMonitoring::ISummaryDoubleSnapshotPtr snapshot) override {
  72. auto& val = Metrics.back().Values;
  73. val->Add(time, snapshot.Get());
  74. }
  75. void OnLogHistogram(TInstant time, NMonitoring::TLogHistogramSnapshotPtr snapshot) override {
  76. auto& val = Metrics.back().Values;
  77. val->Add(time, snapshot.Get());
  78. }
  79. bool DoMergeCommonLabels{false};
  80. TVector<TMetricData> Metrics;
  81. TLabelsImpl CommonLabels;
  82. TInstant CommonTime;
  83. bool InsideSensor{false};
  84. };
  85. using TCollectingConsumer = TCollectingConsumerImpl<NMonitoring::TLabels>;
  86. } // namespace NMonitoring