buffered_encoder_base.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #include "buffered_encoder_base.h"
  2. #include <util/string/join.h>
  3. #include <util/string/builder.h>
  4. namespace NMonitoring {
  5. void TBufferedEncoderBase::OnStreamBegin() {
  6. State_.Expect(TEncoderState::EState::ROOT);
  7. }
  8. void TBufferedEncoderBase::OnStreamEnd() {
  9. State_.Expect(TEncoderState::EState::ROOT);
  10. }
  11. void TBufferedEncoderBase::OnCommonTime(TInstant time) {
  12. State_.Expect(TEncoderState::EState::ROOT);
  13. CommonTime_ = time;
  14. }
  15. void TBufferedEncoderBase::OnMetricBegin(EMetricType type) {
  16. State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC);
  17. Metrics_.emplace_back();
  18. Metrics_.back().MetricType = type;
  19. }
  20. void TBufferedEncoderBase::OnMetricEnd() {
  21. State_.Switch(TEncoderState::EState::METRIC, TEncoderState::EState::ROOT);
  22. switch (MetricsMergingMode_) {
  23. case EMetricsMergingMode::MERGE_METRICS: {
  24. auto& metric = Metrics_.back();
  25. Sort(metric.Labels, [] (const TPooledLabel& lhs, const TPooledLabel& rhs) {
  26. return std::tie(lhs.Key, lhs.Value) < std::tie(rhs.Key, rhs.Value);
  27. });
  28. auto it = MetricMap_.find(metric.Labels);
  29. if (it == std::end(MetricMap_)) {
  30. MetricMap_.emplace(metric.Labels, Metrics_.size() - 1);
  31. } else {
  32. auto& existing = Metrics_[it->second].TimeSeries;
  33. Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(),
  34. "Time series point type mismatch: expected " << existing.GetValueType()
  35. << " but found " << metric.TimeSeries.GetValueType()
  36. << ", labels '" << FormatLabels(metric.Labels) << "'");
  37. existing.CopyFrom(metric.TimeSeries);
  38. Metrics_.pop_back();
  39. }
  40. break;
  41. }
  42. case EMetricsMergingMode::DEFAULT:
  43. break;
  44. }
  45. }
  46. void TBufferedEncoderBase::OnLabelsBegin() {
  47. if (State_ == TEncoderState::EState::METRIC) {
  48. State_ = TEncoderState::EState::METRIC_LABELS;
  49. } else if (State_ == TEncoderState::EState::ROOT) {
  50. State_ = TEncoderState::EState::COMMON_LABELS;
  51. } else {
  52. State_.ThrowInvalid("expected METRIC or ROOT");
  53. }
  54. }
  55. void TBufferedEncoderBase::OnLabelsEnd() {
  56. if (State_ == TEncoderState::EState::METRIC_LABELS) {
  57. State_ = TEncoderState::EState::METRIC;
  58. } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
  59. State_ = TEncoderState::EState::ROOT;
  60. } else {
  61. State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
  62. }
  63. }
  64. void TBufferedEncoderBase::OnLabel(TStringBuf name, TStringBuf value) {
  65. TPooledLabels* labels;
  66. if (State_ == TEncoderState::EState::METRIC_LABELS) {
  67. labels = &Metrics_.back().Labels;
  68. } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
  69. labels = &CommonLabels_;
  70. } else {
  71. State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
  72. }
  73. labels->emplace_back(LabelNamesPool_.PutIfAbsent(name), LabelValuesPool_.PutIfAbsent(value));
  74. }
  75. void TBufferedEncoderBase::OnLabel(ui32 name, ui32 value) {
  76. TPooledLabels* labels;
  77. if (State_ == TEncoderState::EState::METRIC_LABELS) {
  78. labels = &Metrics_.back().Labels;
  79. } else if (State_ == TEncoderState::EState::COMMON_LABELS) {
  80. labels = &CommonLabels_;
  81. } else {
  82. State_.ThrowInvalid("expected LABELS or COMMON_LABELS");
  83. }
  84. labels->emplace_back(LabelNamesPool_.GetByIndex(name), LabelValuesPool_.GetByIndex(value));
  85. }
  86. std::pair<ui32, ui32> TBufferedEncoderBase::PrepareLabel(TStringBuf name, TStringBuf value) {
  87. auto nameLabel = LabelNamesPool_.PutIfAbsent(name);
  88. auto valueLabel = LabelValuesPool_.PutIfAbsent(value);
  89. return std::make_pair(nameLabel->Index, valueLabel->Index);
  90. }
  91. void TBufferedEncoderBase::OnDouble(TInstant time, double value) {
  92. State_.Expect(TEncoderState::EState::METRIC);
  93. TMetric& metric = Metrics_.back();
  94. metric.TimeSeries.Add(time, value);
  95. }
  96. void TBufferedEncoderBase::OnInt64(TInstant time, i64 value) {
  97. State_.Expect(TEncoderState::EState::METRIC);
  98. TMetric& metric = Metrics_.back();
  99. metric.TimeSeries.Add(time, value);
  100. }
  101. void TBufferedEncoderBase::OnUint64(TInstant time, ui64 value) {
  102. State_.Expect(TEncoderState::EState::METRIC);
  103. TMetric& metric = Metrics_.back();
  104. metric.TimeSeries.Add(time, value);
  105. }
  106. void TBufferedEncoderBase::OnHistogram(TInstant time, IHistogramSnapshotPtr s) {
  107. State_.Expect(TEncoderState::EState::METRIC);
  108. TMetric& metric = Metrics_.back();
  109. metric.TimeSeries.Add(time, s.Get());
  110. }
  111. void TBufferedEncoderBase::OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr s) {
  112. State_.Expect(TEncoderState::EState::METRIC);
  113. TMetric& metric = Metrics_.back();
  114. metric.TimeSeries.Add(time, s.Get());
  115. }
  116. void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr s) {
  117. State_.Expect(TEncoderState::EState::METRIC);
  118. TMetric& metric = Metrics_.back();
  119. metric.TimeSeries.Add(time, s.Get());
  120. }
  121. TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const {
  122. auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size()));
  123. auto addLabel = [&](const TPooledLabel& l) {
  124. auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value);
  125. formattedLabels.push_back(std::move(formattedLabel));
  126. };
  127. for (const auto& l: labels) {
  128. addLabel(l);
  129. }
  130. for (const auto& l: CommonLabels_) {
  131. const auto it = FindIf(labels, [&](const TPooledLabel& label) {
  132. return label.Key == l.Key;
  133. });
  134. if (it == labels.end()) {
  135. addLabel(l);
  136. }
  137. }
  138. Sort(formattedLabels);
  139. return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}";
  140. }
  141. } // namespace NMonitoring