#include "unistat.h" #include #include #include #include #include #include #include #include #include using namespace NJson; const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"}; namespace NMonitoring { namespace { bool IsNumber(const NJson::TJsonValue& j) { switch (j.GetType()) { case EJsonValueType::JSON_INTEGER: case EJsonValueType::JSON_UINTEGER: case EJsonValueType::JSON_DOUBLE: return true; default: return false; } } template T ExtractNumber(const TJsonValue& val) { switch (val.GetType()) { case EJsonValueType::JSON_INTEGER: return static_cast(val.GetInteger()); case EJsonValueType::JSON_UINTEGER: return static_cast(val.GetUInteger()); case EJsonValueType::JSON_DOUBLE: return static_cast(val.GetDouble()); default: ythrow yexception() << "Expected number, but found " << val.GetType(); } } auto ExtractDouble = ExtractNumber; auto ExtractUi64 = ExtractNumber; class THistogramBuilder { public: void Add(TBucketBound bound, TBucketValue value) { /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones, /// so (-inf; 0) [0, 100) [100; +inf) /// becomes (-inf; 0] (0, 100] (100; +inf) /// but since we've already lost some information these no way to avoid this kind of error here Bounds_.push_back(bound); /// this will always be 0 for the first bucket, /// since there's no way to make (-inf; N) bucket in yasm Values_.push_back(NextValue_); /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]] /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50 NextValue_ = value; } IHistogramSnapshotPtr Finalize() { Bounds_.push_back(std::numeric_limits::max()); Values_.push_back(NextValue_); return ExplicitHistogramSnapshot(Bounds_, Values_, true); } public: TBucketValue NextValue_ {0}; TBucketBounds Bounds_; TBucketValues Values_; }; class LogHistogramBuilder { public: void Add(double value) { while (Values_.size() < HISTOGRAM_MAX_BUCKETS_COUNT && value > MaxValueWithBickets(Values_.size())) { Values_.push_back(0); } ui32 index = 0; if (value > MaxValueWithBickets(HISTOGRAM_MAX_BUCKETS_COUNT)) { index = Values_.size() - 1; } else if (value > MIN_VALUE) { double logBase = std::log(value) / std::log(BASE); index = static_cast(std::ceil(logBase)); } ++Values_[index]; } IHistogramSnapshotPtr Finalize() && { return new TExponentialHistogramSnapshot(BASE, MIN_VALUE, std::move(Values_)); } private: static constexpr double BASE = 2; static constexpr double MIN_VALUE = 1; static double MaxValueWithBickets(ui64 buckets) { return std::pow(BASE, buckets - 2); } private: TBucketValues Values_ = TBucketValues(2, 0); }; class TDecoderUnistat { private: public: explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts) : Consumer_{consumer}, MetricNameLabel(metricNameLabel), Timestamp_{ts} { ReadJsonTree(is, &Json_, /* throw */ true); } void Decode() { Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType()); for (auto&& metric : Json_.GetArray()) { Y_ENSURE(metric.IsArray(), "Metric must be an array"); auto&& arr = metric.GetArray(); Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements"); auto&& name = arr[0]; auto&& value = arr[1]; MetricContext_ = {}; ParseName(name.GetString()); if (value.IsArray()) { const auto& array = value.GetArray(); if (!array.empty() && IsNumber(array[0])) { OnLogHistogram(value); } else { OnHistogram(value); } } else if (IsNumber(value)) { if (MetricContext_.Name.EndsWith("_ahhh")) { OnLogHistogram(value); } else { OnScalar(value); } } else { ythrow yexception() << "Expected list or number, but found " << value.GetType(); } WriteValue(); } } private: void OnScalar(const TJsonValue& jsonValue) { if (MetricContext_.IsDeriv) { MetricContext_.Type = EMetricType::RATE; MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)}; } else { MetricContext_.Type = EMetricType::GAUGE; MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)}; } } void OnLogHistogram(const TJsonValue& value) { Y_ENSURE(MetricContext_.Name.EndsWith("_ahhh"), "Values list is supported only for _ahhh metrics"); MetricContext_.Type = EMetricType::HIST; LogHistogramBuilder histogramBuilder; if (IsNumber(value)) { histogramBuilder.Add(value.GetDouble()); } else { for (auto&& item: value.GetArray()) { Y_ENSURE(IsNumber(item), "Expected a number, but found " << item.GetType()); histogramBuilder.Add(item.GetDouble()); } } MetricContext_.Histogram = std::move(histogramBuilder).Finalize(); MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()}; } void OnHistogram(const TJsonValue& jsonHist) { if (MetricContext_.IsDeriv) { MetricContext_.Type = EMetricType::HIST_RATE; } else { MetricContext_.Type = EMetricType::HIST; } auto histogramBuilder = THistogramBuilder(); for (auto&& bucket : jsonHist.GetArray()) { Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType()); auto&& arr = bucket.GetArray(); Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements"); const auto bound = ExtractDouble(arr[0]); const auto weight = ExtractUi64(arr[1]); histogramBuilder.Add(bound, weight); } MetricContext_.Histogram = histogramBuilder.Finalize(); MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()}; } bool IsDeriv(TStringBuf name) { TStringBuf ignore, suffix; name.RSplit('_', ignore, suffix); Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix); if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) { return true; } else if (suffix == TStringBuf("max")) { return false; } return suffix[0] == 'd'; } void ParseName(TStringBuf value) { TVector parts; StringSplitter(value).Split(';').SkipEmpty().Collect(&parts); Y_ENSURE(parts.size() >= 1 && parts.size() <= 16); TStringBuf name = parts.back(); parts.pop_back(); Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE), "Metric name " << name << " doesn't match regex " << NAME_RE.pattern()); MetricContext_.Name = name; MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name); for (auto tag : parts) { TStringBuf n, v; tag.Split('=', n, v); Y_ENSURE(n && v, "Unexpected tag format in " << tag); MetricContext_.Labels.Add(n, v); } } private: void WriteValue() { Consumer_->OnMetricBegin(MetricContext_.Type); Consumer_->OnLabelsBegin(); Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name}); for (auto&& l : MetricContext_.Labels) { Consumer_->OnLabel(l.Name(), l.Value()); } Consumer_->OnLabelsEnd(); switch (MetricContext_.Type) { case EMetricType::GAUGE: Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble()); break; case EMetricType::RATE: Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64()); break; case EMetricType::HIST: case EMetricType::HIST_RATE: Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram()); break; case EMetricType::LOGHIST: case EMetricType::DSUMMARY: case EMetricType::IGAUGE: case EMetricType::COUNTER: case EMetricType::UNKNOWN: ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type; } Consumer_->OnMetricEnd(); } private: IMetricConsumer* Consumer_; NJson::TJsonValue Json_; TStringBuf MetricNameLabel; TInstant Timestamp_; struct { TStringBuf Name; EMetricType Type{EMetricType::UNKNOWN}; TMetricValue Value; bool IsDeriv{false}; TLabels Labels; IHistogramSnapshotPtr Histogram; } MetricContext_; }; } void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) { c->OnStreamBegin(); DecodeUnistatToStream(data, c, metricNameLabel, ts); c->OnStreamEnd(); } void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) { TMemoryInput in{data.data(), data.size()}; TDecoderUnistat decoder(c, &in, metricNameLabel, ts); decoder.Decode(); } }