1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162 |
- #include "json.h"
- #include "typed_point.h"
- #include <library/cpp/monlib/exception/exception.h>
- #include <library/cpp/monlib/metrics/labels.h>
- #include <library/cpp/monlib/metrics/metric_value.h>
- #include <library/cpp/json/json_reader.h>
- #include <util/datetime/base.h>
- #include <util/string/cast.h>
- #include <limits>
- namespace NMonitoring {
- #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__)
- namespace {
- ///////////////////////////////////////////////////////////////////////
- // THistogramBuilder
- ///////////////////////////////////////////////////////////////////////
- class THistogramBuilder {
- public:
- void AddBound(TBucketBound bound) {
- if (!Bounds_.empty()) {
- DECODE_ENSURE(Bounds_.back() < bound,
- "non sorted bounds, " << Bounds_.back() <<
- " >= " << bound);
- }
- Bounds_.push_back(bound);
- }
- void AddValue(TBucketValue value) {
- Values_.push_back(value);
- }
- void AddInf(TBucketValue value) {
- InfPresented_ = true;
- InfValue_ = value;
- }
- IHistogramSnapshotPtr Build() {
- if (InfPresented_) {
- Bounds_.push_back(Max<TBucketBound>());
- Values_.push_back(InfValue_);
- }
- auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_);
- Bounds_.clear();
- Values_.clear();
- InfPresented_ = false;
- return snapshot;
- }
- bool Empty() const noexcept {
- return Bounds_.empty() && Values_.empty();
- }
- void Clear() {
- Bounds_.clear();
- Values_.clear();
- }
- private:
- TBucketBounds Bounds_;
- TBucketValues Values_;
- bool InfPresented_ = false;
- TBucketValue InfValue_;
- };
- class TSummaryDoubleBuilder {
- public:
- ISummaryDoubleSnapshotPtr Build() const {
- return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_);
- }
- void SetSum(double sum) {
- Empty_ = false;
- Sum_ = sum;
- }
- void SetMin(double min) {
- Empty_ = false;
- Min_ = min;
- }
- void SetMax(double max) {
- Empty_ = false;
- Max_ = max;
- }
- void SetLast(double last) {
- Empty_ = false;
- Last_ = last;
- }
- void SetCount(ui64 count) {
- Empty_ = false;
- Count_ = count;
- }
- void Clear() {
- Empty_ = true;
- Sum_ = 0;
- Min_ = 0;
- Max_ = 0;
- Last_ = 0;
- Count_ = 0;
- }
- bool Empty() const {
- return Empty_;
- }
- private:
- double Sum_ = 0;
- double Min_ = 0;
- double Max_ = 0;
- double Last_ = 0;
- ui64 Count_ = 0;
- bool Empty_ = true;
- };
- class TLogHistogramBuilder {
- public:
- void SetBase(double base) {
- DECODE_ENSURE(base > 0, "base must be positive");
- Base_ = base;
- }
- void SetZerosCount(ui64 zerosCount) {
- DECODE_ENSURE(zerosCount >= 0, "zeros count must be positive");
- ZerosCount_ = zerosCount;
- }
- void SetStartPower(int startPower) {
- StartPower_ = startPower;
- }
- void AddBucketValue(double value) {
- DECODE_ENSURE(value > 0.0, "bucket values must be positive");
- DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite");
- Buckets_.push_back(value);
- }
- void Clear() {
- Buckets_.clear();
- Base_ = 1.5;
- ZerosCount_ = 0;
- StartPower_ = 0;
- }
- bool Empty() const {
- return Buckets_.empty() && ZerosCount_ == 0;
- }
- TLogHistogramSnapshotPtr Build() {
- return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_));
- }
- private:
- double Base_ = 1.5;
- ui64 ZerosCount_ = 0;
- int StartPower_ = 0;
- TVector<double> Buckets_;
- };
- std::pair<double, bool> ParseSpecDouble(TStringBuf string) {
- if (string == TStringBuf("nan") || string == TStringBuf("NaN")) {
- return {std::numeric_limits<double>::quiet_NaN(), true};
- } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) {
- return {std::numeric_limits<double>::infinity(), true};
- } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) {
- return {-std::numeric_limits<double>::infinity(), true};
- } else {
- return {0, false};
- }
- }
- ///////////////////////////////////////////////////////////////////////
- // TMetricCollector
- ///////////////////////////////////////////////////////////////////////
- struct TMetricCollector {
- EMetricType Type = EMetricType::UNKNOWN;
- TLabels Labels;
- THistogramBuilder HistogramBuilder;
- TSummaryDoubleBuilder SummaryBuilder;
- TLogHistogramBuilder LogHistBuilder;
- TTypedPoint LastPoint;
- TVector<TTypedPoint> TimeSeries;
- bool SeenTsOrValue = false;
- bool SeenTimeseries = false;
- void Clear() {
- Type = EMetricType::UNKNOWN;
- Labels.Clear();
- SeenTsOrValue = false;
- SeenTimeseries = false;
- TimeSeries.clear();
- LastPoint = {};
- HistogramBuilder.Clear();
- SummaryBuilder.Clear();
- LogHistBuilder.Clear();
- }
- void AddLabel(const TLabel& label) {
- Labels.Add(label.Name(), label.Value());
- }
- void SetLastTime(TInstant time) {
- LastPoint.SetTime(time);
- }
- template <typename T>
- void SetLastValue(T value) {
- LastPoint.SetValue(value);
- }
- void SaveLastPoint() {
- DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(),
- "cannot add point without or zero timestamp");
- if (!HistogramBuilder.Empty()) {
- auto histogram = HistogramBuilder.Build();
- TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get());
- } else if (!SummaryBuilder.Empty()) {
- auto summary = SummaryBuilder.Build();
- TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get());
- } else if (!LogHistBuilder.Empty()) {
- auto logHist = LogHistBuilder.Build();
- TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get());
- } else {
- TimeSeries.push_back(std::move(LastPoint));
- }
- }
- template <typename TConsumer>
- void Consume(TConsumer&& consumer) {
- if (TimeSeries.empty()) {
- const auto& p = LastPoint;
- consumer(p.GetTime(), p.GetValueType(), p.GetValue());
- } else {
- for (const auto& p: TimeSeries) {
- consumer(p.GetTime(), p.GetValueType(), p.GetValue());
- }
- }
- }
- };
- struct TCommonParts {
- TInstant CommonTime;
- TLabels CommonLabels;
- };
- class IHaltableMetricConsumer: public IMetricConsumer {
- public:
- virtual bool NeedToStop() const = 0;
- };
- // TODO(ivanzhukov@): check all states for cases when a json document is invalid
- // e.g. "metrics" or "commonLabels" keys are specified multiple times
- class TCommonPartsCollector: public IHaltableMetricConsumer {
- public:
- TCommonParts&& CommonParts() {
- return std::move(CommonParts_);
- }
- private:
- bool NeedToStop() const override {
- return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty();
- }
- void OnStreamBegin() override {
- }
- void OnStreamEnd() override {
- }
- void OnCommonTime(TInstant time) override {
- CommonParts_.CommonTime = time;
- }
- void OnMetricBegin(EMetricType) override {
- IsMetric_ = true;
- }
- void OnMetricEnd() override {
- IsMetric_ = false;
- }
- void OnLabelsBegin() override {
- }
- void OnLabelsEnd() override {
- }
- void OnLabel(TStringBuf name, TStringBuf value) override {
- if (!IsMetric_) {
- CommonParts_.CommonLabels.Add(std::move(name), std::move(value));
- }
- }
- void OnDouble(TInstant, double) override {
- }
- void OnInt64(TInstant, i64) override {
- }
- void OnUint64(TInstant, ui64) override {
- }
- void OnHistogram(TInstant, IHistogramSnapshotPtr) override {
- }
- void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override {
- }
- void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override {
- }
- private:
- TCommonParts CommonParts_;
- bool IsMetric_{false};
- };
- class TCommonPartsProxy: public IHaltableMetricConsumer {
- public:
- TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c)
- : CommonParts_{std::move(commonParts)}
- , Consumer_{c}
- {}
- private:
- bool NeedToStop() const override {
- return false;
- }
- void OnStreamBegin() override {
- Consumer_->OnStreamBegin();
- if (!CommonParts_.CommonLabels.Empty()) {
- Consumer_->OnLabelsBegin();
- for (auto&& label : CommonParts_.CommonLabels) {
- Consumer_->OnLabel(label.Name(), label.Value());
- }
- Consumer_->OnLabelsEnd();
- }
- if (TInstant::Zero() != CommonParts_.CommonTime) {
- Consumer_->OnCommonTime(CommonParts_.CommonTime);
- }
- }
- void OnStreamEnd() override {
- Consumer_->OnStreamEnd();
- }
- void OnCommonTime(TInstant) override {
- }
- void OnMetricBegin(EMetricType type) override {
- IsMetric_ = true;
- Consumer_->OnMetricBegin(type);
- }
- void OnMetricEnd() override {
- IsMetric_ = false;
- Consumer_->OnMetricEnd();
- }
- void OnLabelsBegin() override {
- if (IsMetric_) {
- Consumer_->OnLabelsBegin();
- }
- }
- void OnLabelsEnd() override {
- if (IsMetric_) {
- Consumer_->OnLabelsEnd();
- }
- }
- void OnLabel(TStringBuf name, TStringBuf value) override {
- if (IsMetric_) {
- Consumer_->OnLabel(std::move(name), std::move(value));
- }
- }
- void OnDouble(TInstant time, double value) override {
- Consumer_->OnDouble(time, value);
- }
- void OnInt64(TInstant time, i64 value) override {
- Consumer_->OnInt64(time, value);
- }
- void OnUint64(TInstant time, ui64 value) override {
- Consumer_->OnUint64(time, value);
- }
- void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
- Consumer_->OnHistogram(time, std::move(snapshot));
- }
- void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
- Consumer_->OnLogHistogram(time, std::move(snapshot));
- }
- void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
- Consumer_->OnSummaryDouble(time, std::move(snapshot));
- }
- private:
- const TCommonParts CommonParts_;
- IMetricConsumer* Consumer_;
- bool IsMetric_{false};
- };
- ///////////////////////////////////////////////////////////////////////
- // TDecoderJson
- ///////////////////////////////////////////////////////////////////////
- class TDecoderJson final: public NJson::TJsonCallbacks {
- struct TState {
- enum EState {
- ROOT_OBJECT = 0x01,
- COMMON_LABELS,
- COMMON_TS,
- METRICS_ARRAY,
- METRIC_OBJECT,
- METRIC_NAME,
- METRIC_LABELS,
- METRIC_TYPE,
- METRIC_MODE, // TODO: must be deleted
- METRIC_TIMESERIES,
- METRIC_TS,
- METRIC_VALUE,
- METRIC_HIST,
- METRIC_HIST_BOUNDS,
- METRIC_HIST_BUCKETS,
- METRIC_HIST_INF,
- METRIC_DSUMMARY,
- METRIC_DSUMMARY_SUM,
- METRIC_DSUMMARY_MIN,
- METRIC_DSUMMARY_MAX,
- METRIC_DSUMMARY_LAST,
- METRIC_DSUMMARY_COUNT,
- METRIC_LOG_HIST,
- METRIC_LOG_HIST_BASE,
- METRIC_LOG_HIST_ZEROS,
- METRIC_LOG_HIST_START_POWER,
- METRIC_LOG_HIST_BUCKETS,
- };
- constexpr EState Current() const noexcept {
- return static_cast<EState>(State_ & 0xFF);
- }
- void ToNext(EState state) noexcept {
- constexpr auto bitSize = 8 * sizeof(ui8);
- State_ = (State_ << bitSize) | static_cast<ui8>(state);
- }
- void ToPrev() noexcept {
- constexpr auto bitSize = 8 * sizeof(ui8);
- State_ = State_ >> bitSize;
- }
- private:
- ui64 State_ = static_cast<ui64>(ROOT_OBJECT);
- };
- public:
- TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel)
- : Data_(data)
- , MetricConsumer_(metricConsumer)
- , MetricNameLabel_(metricNameLabel)
- {
- }
- private:
- #define PARSE_ENSURE(CONDITION, ...) \
- do { \
- if (Y_UNLIKELY(!(CONDITION))) { \
- ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \
- return false; \
- } \
- } while (false)
- bool OnInteger(long long value) override {
- switch (State_.Current()) {
- case TState::COMMON_TS:
- PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value);
- MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
- State_.ToPrev();
- if (MetricConsumer_->NeedToStop()) {
- IsIntentionallyHalted_ = true;
- return false;
- }
- break;
- case TState::METRIC_TS:
- PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value);
- LastMetric_.SetLastTime(TInstant::Seconds(value));
- State_.ToPrev();
- break;
- case TState::METRIC_VALUE:
- LastMetric_.SetLastValue(static_cast<i64>(value));
- State_.ToPrev();
- break;
- case TState::METRIC_HIST_BOUNDS:
- LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
- break;
- case TState::METRIC_HIST_BUCKETS:
- PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value);
- LastMetric_.HistogramBuilder.AddValue(value);
- break;
- case TState::METRIC_HIST_INF:
- PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value);
- LastMetric_.HistogramBuilder.AddInf(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_COUNT:
- LastMetric_.SummaryBuilder.SetCount(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_SUM:
- LastMetric_.SummaryBuilder.SetSum(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MIN:
- LastMetric_.SummaryBuilder.SetMin(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MAX:
- LastMetric_.SummaryBuilder.SetMax(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_LAST:
- LastMetric_.SummaryBuilder.SetLast(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BASE:
- LastMetric_.LogHistBuilder.SetBase(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_ZEROS:
- LastMetric_.LogHistBuilder.SetZerosCount(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_START_POWER:
- LastMetric_.LogHistBuilder.SetStartPower(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BUCKETS:
- LastMetric_.LogHistBuilder.AddBucketValue(value);
- break;
- default:
- return false;
- }
- return true;
- }
- bool OnUInteger(unsigned long long value) override {
- switch (State_.Current()) {
- case TState::COMMON_TS:
- MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
- State_.ToPrev();
- if (MetricConsumer_->NeedToStop()) {
- IsIntentionallyHalted_ = true;
- return false;
- }
- break;
- case TState::METRIC_TS:
- LastMetric_.SetLastTime(TInstant::Seconds(value));
- State_.ToPrev();
- break;
- case TState::METRIC_VALUE:
- PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value);
- LastMetric_.SetLastValue(static_cast<ui64>(value));
- State_.ToPrev();
- break;
- case TState::METRIC_HIST_BOUNDS:
- LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
- break;
- case TState::METRIC_HIST_BUCKETS:
- PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value);
- LastMetric_.HistogramBuilder.AddValue(value);
- break;
- case TState::METRIC_HIST_INF:
- LastMetric_.HistogramBuilder.AddInf(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_COUNT:
- LastMetric_.SummaryBuilder.SetCount(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_SUM:
- LastMetric_.SummaryBuilder.SetSum(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MIN:
- LastMetric_.SummaryBuilder.SetMin(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MAX:
- LastMetric_.SummaryBuilder.SetMax(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_LAST:
- LastMetric_.SummaryBuilder.SetLast(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BASE:
- LastMetric_.LogHistBuilder.SetBase(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_ZEROS:
- LastMetric_.LogHistBuilder.SetZerosCount(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_START_POWER:
- LastMetric_.LogHistBuilder.SetStartPower(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BUCKETS:
- LastMetric_.LogHistBuilder.AddBucketValue(value);
- break;
- default:
- return false;
- }
- return true;
- }
- bool OnDouble(double value) override {
- switch (State_.Current()) {
- case TState::METRIC_VALUE:
- LastMetric_.SetLastValue(value);
- State_.ToPrev();
- break;
- case TState::METRIC_HIST_BOUNDS:
- LastMetric_.HistogramBuilder.AddBound(value);
- break;
- case TState::METRIC_DSUMMARY_SUM:
- LastMetric_.SummaryBuilder.SetSum(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MIN:
- LastMetric_.SummaryBuilder.SetMin(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MAX:
- LastMetric_.SummaryBuilder.SetMax(value);
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_LAST:
- LastMetric_.SummaryBuilder.SetLast(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BASE:
- LastMetric_.LogHistBuilder.SetBase(value);
- State_.ToPrev();
- break;
- case TState::METRIC_LOG_HIST_BUCKETS:
- LastMetric_.LogHistBuilder.AddBucketValue(value);
- break;
- default:
- return false;
- }
- return true;
- }
- bool OnString(const TStringBuf& value) override {
- switch (State_.Current()) {
- case TState::COMMON_LABELS:
- PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels");
- MetricConsumer_->OnLabel(LastLabelName_, TString{value});
- break;
- case TState::METRIC_LABELS:
- PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels");
- LastMetric_.Labels.Add(LastLabelName_, TString{value});
- break;
- case TState::METRIC_NAME:
- PARSE_ENSURE(!value.empty(), "empty metric name");
- LastMetric_.Labels.Add(MetricNameLabel_, TString{value});
- State_.ToPrev();
- break;
- case TState::COMMON_TS:
- MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value));
- State_.ToPrev();
- if (MetricConsumer_->NeedToStop()) {
- IsIntentionallyHalted_ = true;
- return false;
- }
- break;
- case TState::METRIC_TS:
- LastMetric_.SetLastTime(TInstant::ParseIso8601(value));
- State_.ToPrev();
- break;
- case TState::METRIC_VALUE:
- if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
- LastMetric_.SetLastValue(doubleValue);
- } else {
- return false;
- }
- State_.ToPrev();
- break;
- case TState::METRIC_TYPE:
- LastMetric_.Type = MetricTypeFromStr(value);
- State_.ToPrev();
- break;
- case TState::METRIC_MODE:
- if (value == TStringBuf("deriv")) {
- LastMetric_.Type = EMetricType::RATE;
- }
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_SUM:
- if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
- LastMetric_.SummaryBuilder.SetSum(doubleValue);
- } else {
- return false;
- }
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MIN:
- if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
- LastMetric_.SummaryBuilder.SetMin(doubleValue);
- } else {
- return false;
- }
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_MAX:
- if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
- LastMetric_.SummaryBuilder.SetMax(doubleValue);
- } else {
- return false;
- }
- State_.ToPrev();
- break;
- case TState::METRIC_DSUMMARY_LAST:
- if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
- LastMetric_.SummaryBuilder.SetLast(doubleValue);
- } else {
- return false;
- }
- State_.ToPrev();
- break;
- default:
- return false;
- }
- return true;
- }
- bool OnMapKey(const TStringBuf& key) override {
- switch (State_.Current()) {
- case TState::ROOT_OBJECT:
- if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) {
- State_.ToNext(TState::COMMON_LABELS);
- } else if (key == TStringBuf("ts")) {
- State_.ToNext(TState::COMMON_TS);
- } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) {
- State_.ToNext(TState::METRICS_ARRAY);
- }
- break;
- case TState::COMMON_LABELS:
- case TState::METRIC_LABELS:
- LastLabelName_ = key;
- break;
- case TState::METRIC_OBJECT:
- if (key == TStringBuf("labels")) {
- State_.ToNext(TState::METRIC_LABELS);
- } else if (key == TStringBuf("name")) {
- State_.ToNext(TState::METRIC_NAME);
- } else if (key == TStringBuf("ts")) {
- PARSE_ENSURE(!LastMetric_.SeenTimeseries,
- "mixed timeseries and ts attributes");
- LastMetric_.SeenTsOrValue = true;
- State_.ToNext(TState::METRIC_TS);
- } else if (key == TStringBuf("value")) {
- PARSE_ENSURE(!LastMetric_.SeenTimeseries,
- "mixed timeseries and value attributes");
- LastMetric_.SeenTsOrValue = true;
- State_.ToNext(TState::METRIC_VALUE);
- } else if (key == TStringBuf("timeseries")) {
- PARSE_ENSURE(!LastMetric_.SeenTsOrValue,
- "mixed timeseries and ts/value attributes");
- LastMetric_.SeenTimeseries = true;
- State_.ToNext(TState::METRIC_TIMESERIES);
- } else if (key == TStringBuf("mode")) {
- State_.ToNext(TState::METRIC_MODE);
- } else if (key == TStringBuf("kind") || key == TStringBuf("type")) {
- State_.ToNext(TState::METRIC_TYPE);
- } else if (key == TStringBuf("hist")) {
- State_.ToNext(TState::METRIC_HIST);
- } else if (key == TStringBuf("summary")) {
- State_.ToNext(TState::METRIC_DSUMMARY);
- } else if (key == TStringBuf("log_hist")) {
- State_.ToNext(TState::METRIC_LOG_HIST);
- } else if (key == TStringBuf("memOnly")) {
- // deprecated. Skip it without errors for backward compatibility
- } else {
- ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema";
- return false;
- }
- break;
- case TState::METRIC_TIMESERIES:
- if (key == TStringBuf("ts")) {
- State_.ToNext(TState::METRIC_TS);
- } else if (key == TStringBuf("value")) {
- State_.ToNext(TState::METRIC_VALUE);
- } else if (key == TStringBuf("hist")) {
- State_.ToNext(TState::METRIC_HIST);
- } else if (key == TStringBuf("summary")) {
- State_.ToNext(TState::METRIC_DSUMMARY);
- } else if (key == TStringBuf("log_hist")) {
- State_.ToNext(TState::METRIC_LOG_HIST);
- }
- break;
- case TState::METRIC_HIST:
- if (key == TStringBuf("bounds")) {
- State_.ToNext(TState::METRIC_HIST_BOUNDS);
- } else if (key == TStringBuf("buckets")) {
- State_.ToNext(TState::METRIC_HIST_BUCKETS);
- } else if (key == TStringBuf("inf")) {
- State_.ToNext(TState::METRIC_HIST_INF);
- }
- break;
- case TState::METRIC_LOG_HIST:
- if (key == TStringBuf("base")) {
- State_.ToNext(TState::METRIC_LOG_HIST_BASE);
- } else if (key == TStringBuf("zeros_count")) {
- State_.ToNext(TState::METRIC_LOG_HIST_ZEROS);
- } else if (key == TStringBuf("start_power")) {
- State_.ToNext(TState::METRIC_LOG_HIST_START_POWER);
- } else if (key == TStringBuf("buckets")) {
- State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS);
- }
- break;
- case TState::METRIC_DSUMMARY:
- if (key == TStringBuf("sum")) {
- State_.ToNext(TState::METRIC_DSUMMARY_SUM);
- } else if (key == TStringBuf("min")) {
- State_.ToNext(TState::METRIC_DSUMMARY_MIN);
- } else if (key == TStringBuf("max")) {
- State_.ToNext(TState::METRIC_DSUMMARY_MAX);
- } else if (key == TStringBuf("last")) {
- State_.ToNext(TState::METRIC_DSUMMARY_LAST);
- } else if (key == TStringBuf("count")) {
- State_.ToNext(TState::METRIC_DSUMMARY_COUNT);
- }
- break;
- default:
- return false;
- }
- return true;
- }
- bool OnOpenMap() override {
- switch (State_.Current()) {
- case TState::ROOT_OBJECT:
- MetricConsumer_->OnStreamBegin();
- break;
- case TState::COMMON_LABELS:
- MetricConsumer_->OnLabelsBegin();
- break;
- case TState::METRICS_ARRAY:
- State_.ToNext(TState::METRIC_OBJECT);
- LastMetric_.Clear();
- break;
- default:
- break;
- }
- return true;
- }
- bool OnCloseMap() override {
- switch (State_.Current()) {
- case TState::ROOT_OBJECT:
- MetricConsumer_->OnStreamEnd();
- break;
- case TState::METRIC_LABELS:
- State_.ToPrev();
- break;
- case TState::COMMON_LABELS:
- MetricConsumer_->OnLabelsEnd();
- State_.ToPrev();
- if (MetricConsumer_->NeedToStop()) {
- IsIntentionallyHalted_ = true;
- return false;
- }
- break;
- case TState::METRIC_OBJECT:
- ConsumeMetric();
- State_.ToPrev();
- break;
- case TState::METRIC_TIMESERIES:
- LastMetric_.SaveLastPoint();
- break;
- case TState::METRIC_HIST:
- case TState::METRIC_DSUMMARY:
- case TState::METRIC_LOG_HIST:
- State_.ToPrev();
- break;
- default:
- break;
- }
- return true;
- }
- bool OnOpenArray() override {
- auto currentState = State_.Current();
- PARSE_ENSURE(
- currentState == TState::METRICS_ARRAY ||
- currentState == TState::METRIC_TIMESERIES ||
- currentState == TState::METRIC_HIST_BOUNDS ||
- currentState == TState::METRIC_HIST_BUCKETS ||
- currentState == TState::METRIC_LOG_HIST_BUCKETS,
- "unexpected array begin");
- return true;
- }
- bool OnCloseArray() override {
- switch (State_.Current()) {
- case TState::METRICS_ARRAY:
- case TState::METRIC_TIMESERIES:
- case TState::METRIC_HIST_BOUNDS:
- case TState::METRIC_HIST_BUCKETS:
- case TState::METRIC_LOG_HIST_BUCKETS:
- State_.ToPrev();
- break;
- default:
- return false;
- }
- return true;
- }
- void OnError(size_t off, TStringBuf reason) override {
- if (IsIntentionallyHalted_) {
- return;
- }
- size_t snippetBeg = (off < 20) ? 0 : (off - 20);
- TStringBuf snippet = Data_.SubStr(snippetBeg, 40);
- throw TJsonDecodeError()
- << "cannot parse JSON, error at: " << off
- << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_})
- << "\nsnippet: ..." << snippet << "...";
- }
- bool OnEnd() override {
- return true;
- }
- void ConsumeMetric() {
- // for backwad compatibility all unknown metrics treated as gauges
- if (LastMetric_.Type == EMetricType::UNKNOWN) {
- if (LastMetric_.HistogramBuilder.Empty()) {
- LastMetric_.Type = EMetricType::GAUGE;
- } else {
- LastMetric_.Type = EMetricType::HIST;
- }
- }
- // (1) begin metric
- MetricConsumer_->OnMetricBegin(LastMetric_.Type);
- // (2) labels
- if (!LastMetric_.Labels.empty()) {
- MetricConsumer_->OnLabelsBegin();
- for (auto&& label : LastMetric_.Labels) {
- MetricConsumer_->OnLabel(label.Name(), label.Value());
- }
- MetricConsumer_->OnLabelsEnd();
- }
- // (3) values
- switch (LastMetric_.Type) {
- case EMetricType::GAUGE:
- LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
- MetricConsumer_->OnDouble(time, value.AsDouble(valueType));
- });
- break;
- case EMetricType::IGAUGE:
- LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
- MetricConsumer_->OnInt64(time, value.AsInt64(valueType));
- });
- break;
- case EMetricType::COUNTER:
- case EMetricType::RATE:
- LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
- MetricConsumer_->OnUint64(time, value.AsUint64(valueType));
- });
- break;
- case EMetricType::HIST:
- case EMetricType::HIST_RATE:
- if (LastMetric_.TimeSeries.empty()) {
- auto time = LastMetric_.LastPoint.GetTime();
- auto histogram = LastMetric_.HistogramBuilder.Build();
- MetricConsumer_->OnHistogram(time, histogram);
- } else {
- for (const auto& p : LastMetric_.TimeSeries) {
- DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram");
- MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram());
- }
- }
- break;
- case EMetricType::DSUMMARY:
- if (LastMetric_.TimeSeries.empty()) {
- auto time = LastMetric_.LastPoint.GetTime();
- auto summary = LastMetric_.SummaryBuilder.Build();
- MetricConsumer_->OnSummaryDouble(time, summary);
- } else {
- for (const auto& p : LastMetric_.TimeSeries) {
- DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary");
- MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble());
- }
- }
- break;
- case EMetricType::LOGHIST:
- if (LastMetric_.TimeSeries.empty()) {
- auto time = LastMetric_.LastPoint.GetTime();
- auto logHist = LastMetric_.LogHistBuilder.Build();
- MetricConsumer_->OnLogHistogram(time, logHist);
- } else {
- for (const auto& p : LastMetric_.TimeSeries) {
- DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram");
- MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram());
- }
- }
- break;
- case EMetricType::UNKNOWN:
- // TODO: output metric labels
- ythrow yexception() << "unknown metric type";
- }
- // (4) end metric
- MetricConsumer_->OnMetricEnd();
- }
- private:
- TStringBuf Data_;
- IHaltableMetricConsumer* MetricConsumer_;
- TString MetricNameLabel_;
- TState State_;
- TString LastLabelName_;
- TMetricCollector LastMetric_;
- TString ErrorMsg_;
- bool IsIntentionallyHalted_{false};
- };
- } // namespace
- void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
- TCommonPartsCollector commonPartsCollector;
- {
- TMemoryInput memIn(data);
- TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel);
- // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
- NJson::ReadJson(&memIn, &decoder);
- }
- TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c);
- {
- TMemoryInput memIn(data);
- TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel);
- // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
- NJson::ReadJson(&memIn, &decoder);
- }
- }
- #undef DECODE_ENSURE
- }
|