Browse Source

YDB Import 566
96265cd0cc64e1b9bb31fe97b915ed2a09caf1cb

robot-ydb-importer 1 year ago
parent
commit
d6ee605467

+ 23 - 0
contrib/python/ydb/ya.make

@@ -0,0 +1,23 @@
+PY23_LIBRARY()
+
+LICENSE(Service-Py23-Proxy)
+
+IF (PYTHON2)
+    PEERDIR(contrib/python/ydb/py2)
+ELSE()
+    PEERDIR(contrib/python/ydb/py3)
+ENDIF()
+
+PEERDIR(
+  contrib/ydb/public/api/grpc
+  contrib/ydb/public/api/grpc/draft
+)
+
+NO_LINT()
+
+END()
+
+RECURSE(
+    py2
+    py3
+)

+ 13 - 0
library/cpp/monlib/encode/unistat/unistat.h

@@ -0,0 +1,13 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <util/datetime/base.h>
+
+namespace NMonitoring {
+    /// Decodes unistat-style metrics
+    /// https://wiki.yandex-team.ru/golovan/stat-handle
+    void DecodeUnistat(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero());
+
+    /// Assumes consumer's stream is open by the caller
+    void DecodeUnistatToStream(TStringBuf data, class IMetricConsumer* c, TStringBuf metricNameLabel = "sensor", TInstant ts = TInstant::Zero());
+}

+ 312 - 0
library/cpp/monlib/encode/unistat/unistat_decoder.cpp

@@ -0,0 +1,312 @@
+#include "unistat.h"
+
+#include <library/cpp/monlib/metrics/histogram_collector.h>
+#include <library/cpp/monlib/metrics/labels.h>
+#include <library/cpp/monlib/metrics/metric_type.h>
+#include <library/cpp/monlib/metrics/metric_value.h>
+#include <library/cpp/monlib/metrics/metric_consumer.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <util/datetime/base.h>
+#include <util/string/split.h>
+
+#include <contrib/libs/re2/re2/re2.h>
+
+using namespace NJson;
+
+const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"};
+
+namespace NMonitoring {
+    namespace {
+        bool IsNumber(const NJson::TJsonValue& j) {
+            switch (j.GetType()) {
+                case EJsonValueType::JSON_INTEGER:
+                case EJsonValueType::JSON_UINTEGER:
+                case EJsonValueType::JSON_DOUBLE:
+                    return true;
+
+                default:
+                    return false;
+            }
+        }
+
+        template <typename T>
+        T ExtractNumber(const TJsonValue& val) {
+            switch (val.GetType()) {
+                case EJsonValueType::JSON_INTEGER:
+                    return static_cast<T>(val.GetInteger());
+                case EJsonValueType::JSON_UINTEGER:
+                    return static_cast<T>(val.GetUInteger());
+                case EJsonValueType::JSON_DOUBLE:
+                    return static_cast<T>(val.GetDouble());
+
+                default:
+                    ythrow yexception() << "Expected number, but found " << val.GetType();
+            }
+        }
+
+        auto ExtractDouble = ExtractNumber<double>;
+        auto ExtractUi64 = ExtractNumber<ui64>;
+
+        class THistogramBuilder {
+        public:
+            void Add(TBucketBound bound, TBucketValue value) {
+                /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones,
+                /// so (-inf; 0) [0, 100) [100; +inf)
+                /// becomes (-inf; 0] (0, 100] (100; +inf)
+                /// but since we've already lost some information these no way to avoid this kind of error here
+                Bounds_.push_back(bound);
+
+                /// this will always be 0 for the first bucket,
+                /// since there's no way to make (-inf; N) bucket in yasm
+                Values_.push_back(NextValue_);
+
+                /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]]
+                /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50
+                NextValue_ = value;
+            }
+
+            IHistogramSnapshotPtr Finalize() {
+                Bounds_.push_back(std::numeric_limits<TBucketBound>::max());
+                Values_.push_back(NextValue_);
+
+                return ExplicitHistogramSnapshot(Bounds_, Values_, true);
+            }
+
+        public:
+            TBucketValue NextValue_ {0};
+            TBucketBounds Bounds_;
+            TBucketValues Values_;
+        };
+
+        class LogHistogramBuilder {
+        public:
+            void Add(double value) {
+                while (Values_.size() < HISTOGRAM_MAX_BUCKETS_COUNT && value > MaxValueWithBickets(Values_.size())) {
+                    Values_.push_back(0);
+                }
+
+                ui32 index = 0;
+                if (value > MaxValueWithBickets(HISTOGRAM_MAX_BUCKETS_COUNT)) {
+                    index = Values_.size() - 1;
+                } else if (value > MIN_VALUE) {
+                    double logBase = std::log(value) / std::log(BASE);
+                    index = static_cast<ui32>(std::ceil(logBase));
+                }
+                ++Values_[index];
+            }
+
+            IHistogramSnapshotPtr Finalize() && {
+                return new TExponentialHistogramSnapshot(BASE, MIN_VALUE, std::move(Values_));
+            }
+
+        private:
+            static constexpr double BASE = 2;
+            static constexpr double MIN_VALUE = 1;
+
+            static double MaxValueWithBickets(ui64 buckets) {
+                return std::pow(BASE, buckets - 2);
+            }
+
+        private:
+            TBucketValues Values_ = TBucketValues(2, 0);
+        };
+
+        class TDecoderUnistat {
+        private:
+        public:
+            explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts)
+                : Consumer_{consumer},
+                MetricNameLabel(metricNameLabel),
+                Timestamp_{ts} {
+                ReadJsonTree(is, &Json_, /* throw */ true);
+            }
+
+            void Decode() {
+                Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType());
+
+                for (auto&& metric : Json_.GetArray()) {
+                    Y_ENSURE(metric.IsArray(), "Metric must be an array");
+                    auto&& arr = metric.GetArray();
+                    Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements");
+                    auto&& name = arr[0];
+                    auto&& value = arr[1];
+                    MetricContext_ = {};
+
+                    ParseName(name.GetString());
+
+                    if (value.IsArray()) {
+                        const auto& array = value.GetArray();
+                        if (!array.empty() && IsNumber(array[0])) {
+                            OnLogHistogram(value);
+                        } else {
+                            OnHistogram(value);
+                        }
+                    } else if (IsNumber(value)) {
+                        if (MetricContext_.Name.EndsWith("_ahhh")) {
+                            OnLogHistogram(value);
+                        } else {
+                            OnScalar(value);
+                        }
+                    } else {
+                        ythrow yexception() << "Expected list or number, but found " << value.GetType();
+                    }
+
+                    WriteValue();
+                }
+            }
+
+        private:
+            void OnScalar(const TJsonValue& jsonValue) {
+                if (MetricContext_.IsDeriv) {
+                    MetricContext_.Type = EMetricType::RATE;
+                    MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)};
+                } else {
+                    MetricContext_.Type = EMetricType::GAUGE;
+                    MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)};
+                }
+            }
+
+            void OnLogHistogram(const TJsonValue& value) {
+                Y_ENSURE(MetricContext_.Name.EndsWith("_ahhh"), "Values list is supported only for _ahhh metrics");
+                MetricContext_.Type = EMetricType::HIST;
+
+                LogHistogramBuilder histogramBuilder;
+                if (IsNumber(value)) {
+                    histogramBuilder.Add(value.GetDouble());
+                } else {
+                    for (auto&& item: value.GetArray()) {
+                        Y_ENSURE(IsNumber(item), "Expected a number, but found " << item.GetType());
+                        histogramBuilder.Add(item.GetDouble());
+                    }
+                }
+
+                MetricContext_.Histogram = std::move(histogramBuilder).Finalize();
+                MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
+            }
+
+            void OnHistogram(const TJsonValue& jsonHist) {
+                if (MetricContext_.IsDeriv) {
+                    MetricContext_.Type = EMetricType::HIST_RATE;
+                } else {
+                    MetricContext_.Type = EMetricType::HIST;
+                }
+
+                auto histogramBuilder = THistogramBuilder();
+
+                for (auto&& bucket : jsonHist.GetArray()) {
+                    Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType());
+                    auto&& arr = bucket.GetArray();
+                    Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements");
+                    const auto bound = ExtractDouble(arr[0]);
+                    const auto weight = ExtractUi64(arr[1]);
+                    histogramBuilder.Add(bound, weight);
+                }
+
+                MetricContext_.Histogram = histogramBuilder.Finalize();
+                MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
+            }
+
+            bool IsDeriv(TStringBuf name) {
+                TStringBuf ignore, suffix;
+                name.RSplit('_', ignore, suffix);
+
+                Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix);
+
+                if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) {
+                    return true;
+                } else if (suffix == TStringBuf("max")) {
+                    return false;
+                }
+
+                return suffix[0] == 'd';
+            }
+
+            void ParseName(TStringBuf value) {
+                TVector<TStringBuf> parts;
+                StringSplitter(value).Split(';').SkipEmpty().Collect(&parts);
+
+                Y_ENSURE(parts.size() >= 1 && parts.size() <= 16);
+
+                TStringBuf name = parts.back();
+                parts.pop_back();
+
+                Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE),
+                         "Metric name " << name << " doesn't match regex " << NAME_RE.pattern());
+
+                MetricContext_.Name = name;
+                MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name);
+
+                for (auto tag : parts) {
+                    TStringBuf n, v;
+                    tag.Split('=', n, v);
+                    Y_ENSURE(n && v, "Unexpected tag format in " << tag);
+                    MetricContext_.Labels.Add(n, v);
+                }
+            }
+
+        private:
+            void WriteValue() {
+                Consumer_->OnMetricBegin(MetricContext_.Type);
+
+                Consumer_->OnLabelsBegin();
+                Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name});
+                for (auto&& l : MetricContext_.Labels) {
+                    Consumer_->OnLabel(l.Name(), l.Value());
+                }
+
+                Consumer_->OnLabelsEnd();
+
+                switch (MetricContext_.Type) {
+                    case EMetricType::GAUGE:
+                        Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble());
+                        break;
+                    case EMetricType::RATE:
+                        Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64());
+                        break;
+                    case EMetricType::HIST:
+                    case EMetricType::HIST_RATE:
+                        Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram());
+                        break;
+                    case EMetricType::LOGHIST:
+                    case EMetricType::DSUMMARY:
+                    case EMetricType::IGAUGE:
+                    case EMetricType::COUNTER:
+                    case EMetricType::UNKNOWN:
+                        ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type;
+                }
+
+                Consumer_->OnMetricEnd();
+            }
+
+        private:
+            IMetricConsumer* Consumer_;
+            NJson::TJsonValue Json_;
+            TStringBuf MetricNameLabel;
+            TInstant Timestamp_;
+
+            struct {
+                TStringBuf Name;
+                EMetricType Type{EMetricType::UNKNOWN};
+                TMetricValue Value;
+                bool IsDeriv{false};
+                TLabels Labels;
+                IHistogramSnapshotPtr Histogram;
+            } MetricContext_;
+        };
+
+    }
+
+    void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
+        c->OnStreamBegin();
+        DecodeUnistatToStream(data, c, metricNameLabel, ts);
+        c->OnStreamEnd();
+    }
+
+    void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
+        TMemoryInput in{data.data(), data.size()};
+        TDecoderUnistat decoder(c, &in, metricNameLabel, ts);
+        decoder.Decode();
+    }
+}

+ 341 - 0
library/cpp/monlib/encode/unistat/unistat_ut.cpp

@@ -0,0 +1,341 @@
+#include "unistat.h"
+
+#include <library/cpp/monlib/encode/protobuf/protobuf.h>
+#include <library/cpp/monlib/metrics/labels.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NMonitoring;
+
+Y_UNIT_TEST_SUITE(TUnistatDecoderTest) {
+    Y_UNIT_TEST(MetricNameLabel) {
+        constexpr auto input = TStringBuf(R"([["something_axxx", 42]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get(), "metric_name_label");
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+        auto sample = samples.GetSamples(0);
+
+        auto label = sample.GetLabels(0);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "metric_name_label");
+    }
+
+    Y_UNIT_TEST(ScalarMetric) {
+        constexpr auto input = TStringBuf(R"([["something_axxx", 42]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::GAUGE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto label = sample.GetLabels(0);
+        auto point = sample.GetPoints(0);
+        UNIT_ASSERT_VALUES_EQUAL(point.GetFloat64(), 42.);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "something_axxx");
+    }
+
+    Y_UNIT_TEST(OverriddenTags) {
+        constexpr auto input = TStringBuf(R"([["ctype=foo;prj=bar;custom_tag=qwe;something_axxx", 42]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 4);
+
+        const auto& labels = sample.GetLabels();
+        TLabels actual;
+        for (auto&& l : labels) {
+            actual.Add(l.GetName(), l.GetValue());
+        }
+
+        TLabels expected{{"ctype", "foo"}, {"prj", "bar"}, {"custom_tag", "qwe"}, {"sensor", "something_axxx"}};
+
+        UNIT_ASSERT_VALUES_EQUAL(actual.size(), expected.size());
+        for (auto&& l : actual) {
+            UNIT_ASSERT(expected.Extract(l.Name())->Value() == l.Value());
+        }
+    }
+
+    Y_UNIT_TEST(ThrowsOnTopLevelObject) {
+        constexpr auto input = TStringBuf(R"({["something_axxx", 42]})");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+    }
+
+    Y_UNIT_TEST(ThrowsOnUnwrappedMetric) {
+        constexpr auto input = TStringBuf(R"(["something_axxx", 42])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+    }
+
+    Y_UNIT_TEST(HistogramMetric) {
+        constexpr auto input = TStringBuf(R"([["something_hgram", [[0, 1], [200, 2], [500, 3]] ]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::HIST_RATE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto label = sample.GetLabels(0);
+        const auto point = sample.GetPoints(0);
+        const auto histogram = point.GetHistogram();
+        const auto size = histogram.BoundsSize();
+        UNIT_ASSERT_VALUES_EQUAL(size, 4);
+
+        const TVector<double> expectedBounds {0, 200, 500, std::numeric_limits<double>::max()};
+        const TVector<ui64> expectedValues {0, 1, 2, 3};
+
+        for (auto i = 0; i < 4; ++i) {
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetBounds(i), expectedBounds[i]);
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetValues(i), expectedValues[i]);
+        }
+
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "something_hgram");
+    }
+
+    Y_UNIT_TEST(AbsoluteHistogram) {
+        constexpr auto input = TStringBuf(R"([["something_ahhh", [[0, 1], [200, 2], [500, 3]] ]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::HISTOGRAM);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+    }
+
+    Y_UNIT_TEST(LogHistogram) {
+        constexpr auto input = TStringBuf(R"([["something_ahhh", [1, 2, 3] ]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::HISTOGRAM);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto histogram = sample.GetPoints(0).GetHistogram();
+        UNIT_ASSERT_VALUES_EQUAL(histogram.BoundsSize(), 4);
+        TVector<double> expectedBounds = {1, 2, 4, std::numeric_limits<double>::max()};
+        TVector<ui64> expectedValues = {1, 1, 1, 0};
+
+        for (auto i = 0; i < 4; ++i) {
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetBounds(i), expectedBounds[i]);
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetValues(i), expectedValues[i]);
+        }
+    }
+
+    Y_UNIT_TEST(LogHistogramOverflow) {
+        constexpr auto input = TStringBuf(R"([["something_ahhh", [1125899906842624, 2251799813685248] ]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::HISTOGRAM);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto histogram = sample.GetPoints(0).GetHistogram();
+        UNIT_ASSERT_VALUES_EQUAL(histogram.BoundsSize(), HISTOGRAM_MAX_BUCKETS_COUNT);
+
+        TVector<double> expectedBounds;
+        for (ui64 i = 0; i < 50; ++i) {
+            expectedBounds.push_back(std::pow(2, i));
+        }
+        expectedBounds.push_back(std::numeric_limits<double>::max());
+
+        TVector<ui64> expectedValues;
+        for (ui64 i = 0; i < 50; ++i) {
+            expectedValues.push_back(0);
+        }
+        expectedValues.push_back(2);
+
+        for (auto i = 0; i < 4; ++i) {
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetBounds(i), expectedBounds[i]);
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetValues(i), expectedValues[i]);
+        }
+    }
+
+    Y_UNIT_TEST(LogHistogramSingle) {
+        constexpr auto input = TStringBuf(R"([["something_ahhh", 4 ]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::HISTOGRAM);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto histogram = sample.GetPoints(0).GetHistogram();
+        UNIT_ASSERT_VALUES_EQUAL(histogram.BoundsSize(), 4);
+        TVector<double> expectedBounds = {1, 2, 4, std::numeric_limits<double>::max()};
+        TVector<ui64> expectedValues = {0, 0, 1, 0};
+
+        for (auto i = 0; i < 4; ++i) {
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetBounds(i), expectedBounds[i]);
+            UNIT_ASSERT_VALUES_EQUAL(histogram.GetValues(i), expectedValues[i]);
+        }
+    }
+
+    Y_UNIT_TEST(LogHistogramInvalid) {
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        {
+            constexpr auto input = TStringBuf(R"([["something_ahhh", [1, 2, [3, 4]] ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+
+        {
+            constexpr auto input = TStringBuf(R"([["something_ahhh", [[3, 4], 1, 2] ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+
+        {
+            constexpr auto input = TStringBuf(R"([["something_hgram", 1, 2, 3, 4 ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+    }
+
+
+    Y_UNIT_TEST(AllowedMetricNames) {
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        {
+            constexpr auto input = TStringBuf(R"([["a/A-b/c_D/__G_dmmm", [[0, 1], [200, 2], [500, 3]] ]])");
+            UNIT_ASSERT_NO_EXCEPTION(DecodeUnistat(input, encoder.Get()));
+        }
+    }
+
+    Y_UNIT_TEST(DisallowedMetricNames) {
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        {
+            constexpr auto input = TStringBuf(R"([["someth!ng_ahhh", [[0, 1], [200, 2], [500, 3]] ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+
+        {
+            constexpr auto input = TStringBuf(R"([["foo_a", [[0, 1], [200, 2], [500, 3]] ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+
+        {
+            constexpr auto input = TStringBuf(R"([["foo_ahhh;tag=value", [[0, 1], [200, 2], [500, 3]] ]])");
+            UNIT_ASSERT_EXCEPTION(DecodeUnistat(input, encoder.Get()), yexception);
+        }
+    }
+
+    Y_UNIT_TEST(MultipleMetrics) {
+        constexpr auto input = TStringBuf(R"([["something_axxx", 42], ["some-other_dhhh", 53]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+
+        DecodeUnistat(input, encoder.Get());
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 2);
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::GAUGE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto label = sample.GetLabels(0);
+        auto point = sample.GetPoints(0);
+        UNIT_ASSERT_VALUES_EQUAL(point.GetFloat64(), 42.);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "something_axxx");
+
+        sample = samples.GetSamples(1);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::RATE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        label = sample.GetLabels(0);
+        point = sample.GetPoints(0);
+        UNIT_ASSERT_VALUES_EQUAL(point.GetUint64(), 53);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "some-other_dhhh");
+    }
+
+    Y_UNIT_TEST(UnderscoreName) {
+        constexpr auto input = TStringBuf(R"([["something_anything_dmmm", 42]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+        DecodeUnistat(input, encoder.Get());
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::RATE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto label = sample.GetLabels(0);
+        auto point = sample.GetPoints(0);
+        UNIT_ASSERT_VALUES_EQUAL(point.GetUint64(), 42);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "something_anything_dmmm");
+    }
+
+    Y_UNIT_TEST(MaxAggr) {
+        constexpr auto input = TStringBuf(R"([["something_anything_max", 42]])");
+
+        NProto::TMultiSamplesList samples;
+        auto encoder = EncoderProtobuf(&samples);
+        DecodeUnistat(input, encoder.Get());
+
+        UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+        auto sample = samples.GetSamples(0);
+        UNIT_ASSERT_EQUAL(sample.GetMetricType(), NProto::GAUGE);
+        UNIT_ASSERT_VALUES_EQUAL(sample.PointsSize(), 1);
+        UNIT_ASSERT_VALUES_EQUAL(sample.LabelsSize(), 1);
+
+        auto label = sample.GetLabels(0);
+        auto point = sample.GetPoints(0);
+        UNIT_ASSERT_VALUES_EQUAL(point.GetFloat64(), 42.);
+        UNIT_ASSERT_VALUES_EQUAL(label.GetName(), "sensor");
+        UNIT_ASSERT_VALUES_EQUAL(label.GetValue(), "something_anything_max");
+    }
+}

+ 11 - 0
library/cpp/monlib/encode/unistat/ut/ya.make

@@ -0,0 +1,11 @@
+UNITTEST_FOR(library/cpp/monlib/encode/unistat)
+
+SRCS(
+    unistat_ut.cpp
+)
+
+PEERDIR(
+    library/cpp/monlib/encode/protobuf
+)
+
+END()

+ 17 - 0
library/cpp/monlib/encode/unistat/ya.make

@@ -0,0 +1,17 @@
+LIBRARY()
+
+PEERDIR(
+    contrib/libs/re2
+    library/cpp/json
+    library/cpp/monlib/metrics
+)
+
+SRCS(
+    unistat_decoder.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+    ut
+)

+ 79 - 0
library/python/monlib/encoder.pxd

@@ -0,0 +1,79 @@
+from util.generic.string cimport TStringBuf, TString
+from util.generic.ptr cimport THolder
+from util.stream.output cimport IOutputStream
+
+from library.python.monlib.metric_consumer cimport IMetricConsumer
+
+
+cdef extern from "util/stream/input.h" nogil:
+    cdef cppclass IInputStream:
+        pass
+
+
+cdef extern from "util/system/file.h" nogil:
+    cdef cppclass TFile:
+        TFile()
+        TFile(TFile)
+        pass
+
+    cdef TFile Duplicate(int)
+
+
+cdef extern from "library/cpp/monlib/encode/encoder.h" namespace "NMonitoring" nogil:
+    cdef cppclass IMetricEncoder:
+        void Close()
+
+    cdef cppclass ECompression:
+        pass
+
+    ctypedef THolder[IMetricEncoder] IMetricEncoderPtr
+
+
+cdef extern from "library/cpp/monlib/encode/unistat/unistat.h" namespace "NMonitoring" nogil:
+    cdef void DecodeUnistat(TStringBuf data, IMetricConsumer* c)
+
+
+cdef extern from "library/cpp/monlib/encode/json/json.h" namespace "NMonitoring" nogil:
+    cdef IMetricEncoderPtr EncoderJson(IOutputStream* out, int indentation)
+    cdef IMetricEncoderPtr BufferedEncoderJson(IOutputStream* out, int indentation)
+
+    cdef void DecodeJson(TStringBuf data, IMetricConsumer* c)
+
+
+cdef extern from "library/cpp/monlib/encode/spack/spack_v1.h" namespace "NMonitoring" nogil:
+    cdef IMetricEncoderPtr EncoderSpackV1(IOutputStream* out, ETimePrecision, ECompression)
+
+    cdef void DecodeSpackV1(IInputStream* input, IMetricConsumer* c) except +
+    cdef cppclass ETimePrecision:
+        pass
+
+    cdef cppclass EValueType:
+        pass
+
+
+cdef extern from "library/cpp/monlib/encode/spack/spack_v1.h" namespace "NMonitoring::ETimePrecision" nogil:
+    cdef ETimePrecision SECONDS "NMonitoring::ETimePrecision::SECONDS"
+    cdef ETimePrecision MILLIS "NMonitoring::ETimePrecision::MILLIS"
+
+
+cdef extern from "library/cpp/monlib/encode/encoder.h" namespace "NMonitoring::ECompression" nogil:
+    cdef ECompression UNKNOWN "NMonitoring::ECompression::UNKNOWN"
+    cdef ECompression IDENTITY "NMonitoring::ECompression::IDENTITY"
+    cdef ECompression ZLIB "NMonitoring::ECompression::ZLIB"
+    cdef ECompression LZ4 "NMonitoring::ECompression::LZ4"
+    cdef ECompression ZSTD "NMonitoring::ECompression::ZSTD"
+
+
+cdef class Encoder:
+    cdef IMetricEncoderPtr __wrapped
+    cdef THolder[TFile] __file
+    cdef THolder[IOutputStream] __stream
+
+    cdef IMetricEncoder* native(self)
+
+    cdef _make_stream(self, py_stream)
+
+    @staticmethod
+    cdef Encoder create_spack(object stream, ETimePrecision timePrecision, ECompression compression)
+    @staticmethod
+    cdef Encoder create_json(object stream, int indent)

+ 260 - 0
library/python/monlib/encoder.pyx

@@ -0,0 +1,260 @@
+from util.generic.string cimport TString, TStringBuf
+from util.generic.ptr cimport THolder
+
+from cython.operator cimport dereference as deref
+
+import sys
+
+from datetime import datetime
+from os import dup
+
+
+cdef extern from "util/stream/fwd.h" nogil:
+    cdef cppclass TAdaptivelyBuffered[T]:
+        TAdaptivelyBuffered(TFile) except +
+
+    ctypedef TAdaptivelyBuffered[TUnbufferedFileOutput] TFileOutput
+
+cdef extern from "util/stream/mem.h" nogil:
+    cdef cppclass TMemoryInput:
+        TMemoryInput(const TStringBuf buf)
+
+
+cdef extern from "util/stream/file.h" nogil:
+    cdef cppclass TUnbufferedFileOutput:
+        TUnbufferedFileOutput(TFile)
+
+    cdef cppclass TFileInput:
+        TFileInput(TFile) except +
+
+
+cdef extern from "util/stream/str.h" nogil:
+    cdef cppclass TStringStream:
+        const TString& Str() const
+
+
+cdef class Encoder:
+    cdef IMetricEncoder* native(self):
+        return self.__wrapped.Get()
+
+    def close(self):
+        deref(self.__wrapped.Get()).Close()
+
+    def dumps(self):
+        return (<TStringStream&?>deref(self.__stream.Get())).Str()
+
+    cdef _make_stream(self, py_stream):
+        if py_stream is not None:
+            fd = Duplicate(py_stream.fileno())
+
+            self.__file.Reset(new TFile(fd))
+            f = self.__file.Get()
+            self.__stream.Reset(<IOutputStream*>(new TFileOutput(deref(f))))
+        else:
+            self.__stream.Reset(<IOutputStream*>(new TStringStream()))
+
+    @staticmethod
+    cdef Encoder create_spack(object stream, ETimePrecision precision, ECompression compression):
+        cdef Encoder wrapper = Encoder.__new__(Encoder)
+        wrapper._make_stream(stream)
+
+        wrapper.__wrapped = EncoderSpackV1(wrapper.__stream.Get(),
+            precision,
+            compression)
+
+        return wrapper
+
+    @staticmethod
+    cdef Encoder create_json(object stream, int indent):
+        cdef Encoder wrapper = Encoder.__new__(Encoder)
+        wrapper._make_stream(stream)
+
+        wrapper.__wrapped = EncoderJson(wrapper.__stream.Get(), indent)
+
+        return wrapper
+
+
+cpdef Encoder create_json_encoder(object stream, int indent):
+    return Encoder.create_json(stream, indent)
+
+
+cdef class TimePrecision:
+    Millis = <int>MILLIS
+    Seconds = <int>SECONDS
+
+    @staticmethod
+    cdef ETimePrecision to_native(int p) except *:
+        if p == TimePrecision.Millis:
+            return MILLIS
+        elif p == TimePrecision.Seconds:
+            return SECONDS
+
+        raise ValueError('Unsupported TimePrecision value')
+
+cdef class Compression:
+    Identity = <int>IDENTITY
+    Lz4 = <int>LZ4
+    Zlib = <int>ZLIB
+    Zstd = <int>ZSTD
+
+    @staticmethod
+    cdef ECompression to_native(int p) except *:
+        if p == Compression.Identity:
+            return IDENTITY
+        elif p == Compression.Lz4:
+            return LZ4
+        elif p == Compression.Zlib:
+            return ZLIB
+        elif p == Compression.Zstd:
+            return ZSTD
+
+        raise ValueError('Unsupported Compression value')
+
+
+# XXX: timestamps
+def dump(registry, fp, format='spack', **kwargs):
+    """
+    Dumps metrics held by the metric registry to a file. Output can be additionally
+    adjusted using kwargs, which may differ depending on the selected format.
+
+    :param registry: Metric registry object
+    :param fp: File descriptor to serialize to
+    :param format: Format to serialize to (allowed values: spack). Default: json
+
+    Keyword arguments:
+    :param time_precision: Time precision (spack)
+    :param compression: Compression codec (spack)
+    :param indent: Pretty-print indentation for object members and arrays (json)
+    :param timestamp: Metric timestamp datetime
+    :returns: Nothing
+    """
+    if not hasattr(fp, 'fileno'):
+        raise TypeError('Expected a file-like object, but got ' + str(type(fp)))
+
+    if format == 'spack':
+        time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
+        compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
+        encoder = Encoder.create_spack(fp, time_precision, compression)
+    elif format == 'json':
+        indent = int(kwargs.get('indent', 0))
+        encoder = Encoder.create_json(fp, indent)
+    timestamp = kwargs.get('timestamp', datetime.utcfromtimestamp(0))
+
+    registry.accept(timestamp, encoder)
+    encoder.close()
+
+
+def dumps(registry, format='spack', **kwargs):
+    """
+    Dumps metrics held by the metric registry to a string. Output can be additionally
+    adjusted using kwargs, which may differ depending on the selected format.
+
+    :param registry: Metric registry object
+    :param format: Format to serialize to (allowed values: spack). Default: json
+
+    Keyword arguments:
+    :param time_precision: Time precision (spack)
+    :param compression: Compression codec (spack)
+    :param indent: Pretty-print indentation for object members and arrays (json)
+    :param timestamp: Metric timestamp datetime
+    :returns: A string of the specified format
+    """
+    if format == 'spack':
+        time_precision = TimePrecision.to_native(kwargs.get('time_precision', TimePrecision.Seconds))
+        compression = Compression.to_native(kwargs.get('compression', Compression.Identity))
+        encoder = Encoder.create_spack(None, time_precision, compression)
+    elif format == 'json':
+        indent = int(kwargs.get('indent', 0))
+        encoder = Encoder.create_json(None, indent)
+    timestamp = kwargs.get('timestamp', datetime.utcfromtimestamp(0))
+
+    registry.accept(timestamp, encoder)
+    encoder.close()
+
+    s = encoder.dumps()
+
+    return s
+
+
+def load(fp, from_format='spack', to_format='json'):
+    """
+    Converts metrics from one format to another.
+
+    :param fp: File to load data from
+    :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
+    :param to_format: Target format (allowed values: json, spack). Default: json
+    :returns: a string containing metrics in the specified format
+    """
+    if from_format == to_format:
+        return fp.read()
+
+    cdef THolder[TFile] file
+    file.Reset(new TFile(Duplicate(fp.fileno())))
+
+    cdef THolder[TFileInput] input
+    input.Reset(new TFileInput(deref(file.Get())))
+
+    if to_format == 'json':
+        encoder = Encoder.create_json(None, 0)
+    elif to_format == 'spack':
+        encoder = Encoder.create_spack(None, SECONDS, IDENTITY)
+    else:
+        raise ValueError('Unsupported format ' + to_format)
+
+    if from_format == 'spack':
+        DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
+    elif from_format == 'json':
+        s = open(fp, 'r').read()
+        DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
+    elif from_format == 'unistat':
+        s = open(fp, 'r').read()
+        DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
+
+    else:
+        raise ValueError('Unsupported format ' + from_format)
+
+    encoder.close()
+    s = encoder.dumps()
+
+    return s
+
+
+def loads(s, from_format='spack', to_format='json', compression=Compression.Identity):
+    """
+    Converts metrics from one format to another.
+
+    :param s: String to load from
+    :param from_format: Source string format (allowed values: json, spack, unistat). Default: spack
+    :param to_format: Target format (allowed values: json, spack). Default: json
+    :returns: a string containing metrics in the specified format
+    """
+    if from_format == to_format:
+        return s
+
+    if sys.version_info[0] >= 3 and not isinstance(s, bytes):
+        s = s.encode('iso-8859-15')
+
+    cdef THolder[TMemoryInput] input
+
+    if to_format == 'json':
+        encoder = Encoder.create_json(None, 0)
+    elif to_format == 'spack':
+        comp = Compression.to_native(compression)
+        encoder = Encoder.create_spack(None, SECONDS, comp)
+    else:
+        raise ValueError('Unsupported format ' + to_format)
+
+    if from_format == 'spack':
+        input.Reset(new TMemoryInput(s))
+        DecodeSpackV1(<IInputStream*>(input.Get()), <IMetricConsumer*?>encoder.native())
+    elif from_format == 'json':
+        DecodeJson(TStringBuf(s), <IMetricConsumer*?>encoder.native())
+    elif from_format == 'unistat':
+        DecodeUnistat(TStringBuf(s), <IMetricConsumer*?>encoder.native())
+    else:
+        raise ValueError('Unsupported format ' + from_format)
+
+    encoder.close()
+    s = encoder.dumps()
+
+    return s

+ 47 - 0
library/python/monlib/labels.pxd

@@ -0,0 +1,47 @@
+from libcpp cimport bool
+
+from util.generic.maybe cimport TMaybe
+from util.generic.string cimport TStringBuf, TString
+
+
+cdef extern from "library/cpp/monlib/metrics/labels.h" namespace "NMonitoring" nogil:
+    cdef cppclass ILabel:
+        const TStringBuf Name() const
+        const TStringBuf Value() const
+
+    cdef cppclass ILabels:
+        bool Add(TStringBuf name, TStringBuf value)
+        bool Add(const TString& name, const TString& value)
+
+        size_t Size() const
+
+    cdef cppclass TLabel:
+        TLabel() except +
+        TLabel(TStringBuf name, TStringBuf value) except +
+        const TString& Name() const
+        const TString& Value() const
+
+        TString ToString() const
+        bool operator!=(const TLabel&) const
+        bool operator==(const TLabel&) const
+
+    cdef cppclass TLabels:
+        cppclass const_iterator:
+            const TLabel& operator*() const
+            bool operator!=(const_iterator) const
+            bool operator==(const_iterator) const
+
+        TLabels() except +
+
+        bool Add(const TLabel&) except +
+        bool Add(TStringBuf name, TStringBuf value) except +
+        bool Add(const TString& name, const TString& value) except +
+        bool operator==(const TLabels&) const
+
+        TMaybe[TLabel] Find(TStringBuf name) const
+        TMaybe[TLabel] Extract(TStringBuf name) except +
+
+        size_t Size() const
+
+        const_iterator begin() const
+        const_iterator end() const

+ 103 - 0
library/python/monlib/metric.pxd

@@ -0,0 +1,103 @@
+from libcpp cimport bool
+
+from util.system.types cimport ui64, ui32, i64
+from util.generic.ptr cimport THolder, TIntrusivePtr
+from util.generic.vector cimport TVector
+
+
+cdef extern from "library/cpp/monlib/metrics/histogram_collector.h" namespace "NMonitoring" nogil:
+    ctypedef double TBucketBound
+    ctypedef ui64 TBucketValue
+
+    cdef cppclass IHistogramSnapshot:
+        ui32 Count() const
+        TBucketBound UpperBound(ui32 index) const
+        TBucketValue Value(ui32 index) const
+
+    ctypedef TIntrusivePtr[IHistogramSnapshot] IHistogramSnapshotPtr
+
+    cdef cppclass IHistogramCollector:
+        void Collect(i64 value)
+        void Collect(i64 value, ui32 count)
+        IHistogramSnapshotPtr Snapshot() const
+
+    ctypedef THolder[IHistogramCollector] IHistogramCollectorPtr
+
+    IHistogramCollectorPtr ExponentialHistogram(ui32 bucketsCount, double base, double scale) except +
+    IHistogramCollectorPtr ExplicitHistogram(const TVector[double]& buckets) except +
+    IHistogramCollectorPtr LinearHistogram(ui32 bucketsCount, i64 startValue, i64 bucketWidth) except +
+
+
+cdef extern from "library/cpp/monlib/metrics/metric.h" namespace "NMonitoring" nogil:
+    cdef cppclass TGauge:
+        TGauge(double value) except +
+
+        void Set(double)
+        double Get() const
+        double Add(double)
+
+    cdef cppclass TIntGauge:
+        TIntGauge(ui64 value) except +
+
+        void Set(ui64)
+        ui64 Get() const
+        ui64 Add(double)
+        ui64 Inc()
+        ui64 Dec()
+
+    cdef cppclass TCounter:
+        TCounter(ui64 value) except +
+
+        void Set(ui64)
+        ui64 Get() const
+        void Inc()
+        void Reset()
+
+    cdef cppclass TRate:
+        TRate(ui64 value) except +
+
+        void Add(ui64)
+        ui64 Get() const
+        void Inc()
+
+    cdef cppclass THistogram:
+        THistogram(IHistogramCollectorPtr collector, bool isRate) except +
+
+        void Record(double value)
+        void Record(double value, ui32 count)
+
+
+cdef class Gauge:
+    cdef TGauge* __wrapped
+
+    @staticmethod
+    cdef Gauge from_ptr(TGauge* native)
+
+
+cdef class Counter:
+    cdef TCounter* __wrapped
+
+    @staticmethod
+    cdef Counter from_ptr(TCounter* native)
+
+
+cdef class Rate:
+    cdef TRate* __wrapped
+
+    @staticmethod
+    cdef Rate from_ptr(TRate* native)
+
+
+cdef class IntGauge:
+    cdef TIntGauge* __wrapped
+
+    @staticmethod
+    cdef IntGauge from_ptr(TIntGauge* native)
+
+
+cdef class Histogram:
+    cdef THistogram* __wrapped
+    cdef bool __is_owner
+
+    @staticmethod
+    cdef Histogram from_ptr(THistogram* native)

Some files were not shown because too many files changed in this diff