#include "legacy_protobuf.h" #include #include #include #include #include #include #include #include #include #ifdef LEGACY_PB_TRACE #define TRACE(msg) \ Cerr << msg << Endl #else #define TRACE(...) ; #endif namespace NMonitoring { namespace { using TMaybeMeta = TMaybe; TString ReadLabelValue(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) { using namespace NProtoBuf; switch (d->type()) { case FieldDescriptor::TYPE_UINT32: return ::ToString(r.GetUInt32(msg, d)); case FieldDescriptor::TYPE_UINT64: return ::ToString(r.GetUInt64(msg, d)); case FieldDescriptor::TYPE_STRING: return r.GetString(msg, d); case FieldDescriptor::TYPE_ENUM: { auto val = r.GetEnumValue(msg, d); auto* valDesc = d->enum_type()->FindValueByNumber(val); return valDesc->name(); } default: ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; } return {}; } double ReadFieldAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r) { using namespace NProtoBuf; switch (d->type()) { case FieldDescriptor::TYPE_DOUBLE: return r.GetDouble(msg, d); case FieldDescriptor::TYPE_BOOL: return r.GetBool(msg, d) ? 1 : 0; case FieldDescriptor::TYPE_INT32: return r.GetInt32(msg, d); case FieldDescriptor::TYPE_INT64: return r.GetInt64(msg, d); case FieldDescriptor::TYPE_UINT32: return r.GetUInt32(msg, d); case FieldDescriptor::TYPE_UINT64: return r.GetUInt64(msg, d); case FieldDescriptor::TYPE_SINT32: return r.GetInt32(msg, d); case FieldDescriptor::TYPE_SINT64: return r.GetInt64(msg, d); case FieldDescriptor::TYPE_FIXED32: return r.GetUInt32(msg, d); case FieldDescriptor::TYPE_FIXED64: return r.GetUInt64(msg, d); case FieldDescriptor::TYPE_SFIXED32: return r.GetInt32(msg, d); case FieldDescriptor::TYPE_SFIXED64: return r.GetInt64(msg, d); case FieldDescriptor::TYPE_FLOAT: return r.GetFloat(msg, d); case FieldDescriptor::TYPE_ENUM: return r.GetEnumValue(msg, d); default: ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; } return std::numeric_limits::quiet_NaN(); } double ReadRepeatedAsDouble(const NProtoBuf::Message& msg, const NProtoBuf::FieldDescriptor* d, const NProtoBuf::Reflection& r, size_t i) { using namespace NProtoBuf; switch (d->type()) { case FieldDescriptor::TYPE_DOUBLE: return r.GetRepeatedDouble(msg, d, i); case FieldDescriptor::TYPE_BOOL: return r.GetRepeatedBool(msg, d, i) ? 1 : 0; case FieldDescriptor::TYPE_INT32: return r.GetRepeatedInt32(msg, d, i); case FieldDescriptor::TYPE_INT64: return r.GetRepeatedInt64(msg, d, i); case FieldDescriptor::TYPE_UINT32: return r.GetRepeatedUInt32(msg, d, i); case FieldDescriptor::TYPE_UINT64: return r.GetRepeatedUInt64(msg, d, i); case FieldDescriptor::TYPE_SINT32: return r.GetRepeatedInt32(msg, d, i); case FieldDescriptor::TYPE_SINT64: return r.GetRepeatedInt64(msg, d, i); case FieldDescriptor::TYPE_FIXED32: return r.GetRepeatedUInt32(msg, d, i); case FieldDescriptor::TYPE_FIXED64: return r.GetRepeatedUInt64(msg, d, i); case FieldDescriptor::TYPE_SFIXED32: return r.GetRepeatedInt32(msg, d, i); case FieldDescriptor::TYPE_SFIXED64: return r.GetRepeatedInt64(msg, d, i); case FieldDescriptor::TYPE_FLOAT: return r.GetRepeatedFloat(msg, d, i); case FieldDescriptor::TYPE_ENUM: return r.GetRepeatedEnumValue(msg, d, i); default: ythrow yexception() << "type " << d->type_name() << " cannot be used as a field value"; } return std::numeric_limits::quiet_NaN(); } TString LabelFromField(const NProtoBuf::Message& msg, const TString& name) { const auto* fieldDesc = msg.GetDescriptor()->FindFieldByName(name); const auto* reflection = msg.GetReflection(); Y_ENSURE(fieldDesc && reflection, "Unable to get meta for field " << name); auto s = ReadLabelValue(msg, fieldDesc, *reflection); std::replace(std::begin(s), s.vend(), ' ', '_'); return s; } TMaybeMeta MaybeGetMeta(const NProtoBuf::FieldOptions& opts) { if (opts.HasExtension(NMonProto::Metric)) { return opts.GetExtension(NMonProto::Metric); } return Nothing(); } class ILabelGetter: public TThrRefBase { public: enum class EType { Fixed = 1, Lazy = 2, }; virtual TLabel Get(const NProtoBuf::Message&) = 0; virtual EType Type() const = 0; }; class TFixedLabel: public ILabelGetter { public: explicit TFixedLabel(TLabel&& l) : Label_{std::move(l)} { TRACE("found fixed label " << l); } EType Type() const override { return EType::Fixed; } TLabel Get(const NProtoBuf::Message&) override { return Label_; } private: TLabel Label_; }; using TFunction = std::function; class TLazyLabel: public ILabelGetter { public: TLazyLabel(TFunction&& fn) : Fn_{std::move(fn)} { TRACE("found lazy label"); } EType Type() const override { return EType::Lazy; } TLabel Get(const NProtoBuf::Message& msg) override { return Fn_(msg); } private: TFunction Fn_; }; class TDecoderContext { public: void Init(const NProtoBuf::Message* msg) { Message_ = msg; Y_ENSURE(Message_); Reflection_ = msg->GetReflection(); Y_ENSURE(Reflection_); for (auto it = Labels_.begin(); it != Labels_.end(); ++it) { if ((*it)->Type() == ILabelGetter::EType::Lazy) { auto l = (*it)->Get(Message()); *it = ::MakeIntrusive(std::move(l)); } else { auto l = (*it)->Get(Message()); } } } void Clear() noexcept { Message_ = nullptr; Reflection_ = nullptr; } TDecoderContext CreateChildFromMeta(const NMonProto::TMetricMeta& metricMeta, const TString& name, i64 repeatedIdx = -1) { TDecoderContext child{*this}; child.Clear(); if (metricMeta.HasCustomPath()) { if (const auto& nodePath = metricMeta.GetCustomPath()) { child.AppendPath(nodePath); } } else if (metricMeta.GetPath()) { child.AppendPath(name); } if (metricMeta.HasKeys()) { child.ParseKeys(metricMeta.GetKeys(), repeatedIdx); } return child; } TDecoderContext CreateChildFromRepeatedScalar(const NMonProto::TMetricMeta& metricMeta, i64 repeatedIdx = -1) { TDecoderContext child{*this}; child.Clear(); if (metricMeta.HasKeys()) { child.ParseKeys(metricMeta.GetKeys(), repeatedIdx); } return child; } TDecoderContext CreateChildFromEls(const TString& name, const NMonProto::TExtraLabelMetrics& metrics, size_t idx, TMaybeMeta maybeMeta) { TDecoderContext child{*this}; child.Clear(); auto usePath = [&maybeMeta] { return !maybeMeta->HasPath() || maybeMeta->GetPath(); }; if (!name.empty() && (!maybeMeta || usePath())) { child.AppendPath(name); } child.Labels_.push_back(::MakeIntrusive( [ labelName = metrics.GetlabelName(), idx, &metrics ](const auto&) { const auto& val = metrics.Getvalues(idx); TString labelVal; const auto uintLabel = val.GetlabelValueUint(); if (uintLabel) { labelVal = ::ToString(uintLabel); } else { labelVal = val.GetlabelValue(); } return TLabel{labelName, labelVal}; })); return child; } void ParseKeys(TStringBuf keys, i64 repeatedIdx = -1) { auto parts = StringSplitter(keys) .Split(' ') .SkipEmpty(); for (auto part : parts) { auto str = part.Token(); TStringBuf lhs, rhs; const bool isDynamic = str.TrySplit(':', lhs, rhs); const bool isIndexing = isDynamic && rhs == TStringBuf("#"); if (isIndexing) { TRACE("parsed index labels"); // :# means that we should use index of the repeated // field as label value Y_ENSURE(repeatedIdx != -1); Labels_.push_back(::MakeIntrusive([=](const auto&) { return TLabel{lhs, ::ToString(repeatedIdx)}; })); } else if (isDynamic) { TRACE("parsed dynamic labels"); // : means that we need to take label value // later from message's field Labels_.push_back(::MakeIntrusive([=](const auto& msg) { return TLabel{lhs, LabelFromField(msg, TString{rhs})}; })); } else if (str.TrySplit('=', lhs, rhs)) { TRACE("parsed static labels"); // = stands for constant label Labels_.push_back(::MakeIntrusive(TLabel{lhs, rhs})); } else { ythrow yexception() << "Incorrect Keys format"; } } } void AppendPath(TStringBuf fieldName) { Path_ += '/'; Path_ += fieldName; } const TString& Path() const { return Path_; } TLabels Labels() const { TLabels result; for (auto&& l : Labels_) { result.Add(l->Get(Message())); } return result; } const NProtoBuf::Message& Message() const { Y_VERIFY_DEBUG(Message_); return *Message_; } const NProtoBuf::Reflection& Reflection() const { return *Reflection_; } private: const NProtoBuf::Message* Message_{nullptr}; const NProtoBuf::Reflection* Reflection_{nullptr}; TString Path_; TVector> Labels_; }; class TDecoder { public: TDecoder(IMetricConsumer* consumer, const NProtoBuf::Message& message, TInstant timestamp) : Consumer_{consumer} , Message_{message} , Timestamp_{timestamp} { } void Decode() const { Consumer_->OnStreamBegin(); DecodeToStream(); Consumer_->OnStreamEnd(); } void DecodeToStream() const { DecodeImpl(Message_, {}); } private: static const NMonProto::TExtraLabelMetrics& ExtractExtraMetrics(TDecoderContext& ctx, const NProtoBuf::FieldDescriptor& f) { const auto& parent = ctx.Message(); const auto& reflection = ctx.Reflection(); auto& subMessage = reflection.GetMessage(parent, &f); return dynamic_cast(subMessage); } void DecodeImpl(const NProtoBuf::Message& msg, TDecoderContext ctx) const { std::vector fields; ctx.Init(&msg); ctx.Reflection().ListFields(msg, &fields); for (const auto* f : fields) { Y_ENSURE(f); const auto& opts = f->options(); const auto isMessage = f->type() == NProtoBuf::FieldDescriptor::TYPE_MESSAGE; const auto isExtraLabelMetrics = isMessage && f->message_type()->full_name() == "NMonProto.TExtraLabelMetrics"; const auto maybeMeta = MaybeGetMeta(opts); if (!(maybeMeta || isExtraLabelMetrics)) { continue; } if (isExtraLabelMetrics) { const auto& extra = ExtractExtraMetrics(ctx, *f); RecurseExtraLabelMetrics(ctx, extra, f->name(), maybeMeta); } else if (isMessage) { RecurseMessage(ctx, *maybeMeta, *f); } else if (f->is_repeated()) { RecurseRepeatedScalar(ctx, *maybeMeta, *f); } else if (maybeMeta->HasType()) { const auto val = ReadFieldAsDouble(msg, f, ctx.Reflection()); const bool isRate = maybeMeta->GetType() == NMonProto::EMetricType::RATE; WriteMetric(val, ctx, f->name(), isRate); } } } void RecurseRepeatedScalar(TDecoderContext ctx, const NMonProto::TMetricMeta& meta, const NProtoBuf::FieldDescriptor& f) const { auto&& msg = ctx.Message(); auto&& reflection = ctx.Reflection(); const bool isRate = meta.GetType() == NMonProto::EMetricType::RATE; // this is a repeated scalar field, which makes metric only if it's indexing for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) { auto subCtx = ctx.CreateChildFromRepeatedScalar(meta, i); subCtx.Init(&msg); auto val = ReadRepeatedAsDouble(msg, &f, reflection, i); WriteMetric(val, subCtx, f.name(), isRate); } } void RecurseExtraLabelMetrics(TDecoderContext ctx, const NMonProto::TExtraLabelMetrics& msg, const TString& name, const TMaybeMeta& meta) const { auto i = 0; for (const auto& val : msg.Getvalues()) { auto subCtx = ctx.CreateChildFromEls(name, msg, i++, meta); subCtx.Init(&val); const bool isRate = val.Hastype() ? val.Gettype() == NMonProto::EMetricType::RATE : meta->GetType() == NMonProto::EMetricType::RATE; double metricVal{0}; if (isRate) { metricVal = val.GetlongValue(); } else { metricVal = val.GetdoubleValue(); } WriteMetric(metricVal, subCtx, "", isRate); for (const auto& child : val.Getchildren()) { RecurseExtraLabelMetrics(subCtx, child, "", meta); } } } void RecurseMessage(TDecoderContext ctx, const NMonProto::TMetricMeta& metricMeta, const NProtoBuf::FieldDescriptor& f) const { const auto& msg = ctx.Message(); const auto& reflection = ctx.Reflection(); if (f.is_repeated()) { TRACE("recurse into repeated message " << f.name()); for (auto i = 0; i < reflection.FieldSize(msg, &f); ++i) { auto& subMessage = reflection.GetRepeatedMessage(msg, &f, i); DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name(), i)); } } else { TRACE("recurse into message " << f.name()); auto& subMessage = reflection.GetMessage(msg, &f); DecodeImpl(subMessage, ctx.CreateChildFromMeta(metricMeta, f.name())); } } inline void WriteValue(ui64 value) const { Consumer_->OnUint64(Timestamp_, value); } inline void WriteValue(double value) const { Consumer_->OnDouble(Timestamp_, value); } void WriteMetric(double value, const TDecoderContext& ctx, const TString& name, bool isRate) const { if (isRate) { Consumer_->OnMetricBegin(EMetricType::RATE); WriteValue(static_cast(value)); } else { Consumer_->OnMetricBegin(EMetricType::GAUGE); WriteValue(static_cast(value)); } Consumer_->OnLabelsBegin(); for (const auto& label : ctx.Labels()) { Consumer_->OnLabel(label.Name(), label.Value()); } const auto fullPath = name.empty() ? ctx.Path() : ctx.Path() + '/' + name; if (fullPath) { Consumer_->OnLabel("path", fullPath); } Consumer_->OnLabelsEnd(); Consumer_->OnMetricEnd(); } private: IMetricConsumer* Consumer_{nullptr}; const NProtoBuf::Message& Message_; TInstant Timestamp_; }; } void DecodeLegacyProto(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) { Y_ENSURE(consumer); TDecoder(consumer, data, ts).Decode(); } void DecodeLegacyProtoToStream(const NProtoBuf::Message& data, IMetricConsumer* consumer, TInstant ts) { Y_ENSURE(consumer); TDecoder(consumer, data, ts).DecodeToStream(); } }