#include "spack_v1.h" #include "varint.h" #include "compression.h" #include #include #include #include #include #include #include #include #ifndef _little_endian_ #error Unsupported platform #endif namespace NMonitoring { namespace { #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__) constexpr ui64 LABEL_SIZE_LIMIT = 128_MB; /////////////////////////////////////////////////////////////////////// // TDecoderSpackV1 /////////////////////////////////////////////////////////////////////// class TDecoderSpackV1 { public: TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel) : In_(in) , MetricNameLabel_(metricNameLabel) { } void Decode(IMetricConsumer* c) { c->OnStreamBegin(); // (1) read header size_t readBytes = In_->Read(&Header_, sizeof(Header_)); DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header"); ui8 version = ((Header_.Version >> 8) & 0xff); DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')'); DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size"); if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) { DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended"); } if (Header_.MetricCount == 0) { // emulate empty stream c->OnStreamEnd(); return; } // if compression enabled all below reads must go throught decompressor auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression)); if (compressedIn) { In_ = compressedIn.Get(); } TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision); const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize; DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES) << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES)); // (2) read string pools TVector namesBuf(Header_.LabelNamesSize); readBytes = In_->Load(namesBuf.data(), namesBuf.size()); DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool"); TStringPool labelNames(namesBuf.data(), namesBuf.size()); TVector valuesBuf(Header_.LabelValuesSize); readBytes = In_->Load(valuesBuf.data(), valuesBuf.size()); DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool"); TStringPool labelValues(valuesBuf.data(), valuesBuf.size()); // (3) read common time c->OnCommonTime(ReadTime()); // (4) read common labels if (ui32 commonLabelsCount = ReadVarint()) { c->OnLabelsBegin(); ReadLabels(labelNames, labelValues, commonLabelsCount, c); c->OnLabelsEnd(); } // (5) read metrics ReadMetrics(labelNames, labelValues, c); c->OnStreamEnd(); } private: void ReadMetrics( const TStringPool& labelNames, const TStringPool& labelValues, IMetricConsumer* c) { for (ui32 i = 0; i < Header_.MetricCount; i++) { // (5.1) types byte ui8 typesByte = ReadFixed(); EMetricType metricType = DecodeMetricType(typesByte >> 2); EValueType valueType = DecodeValueType(typesByte & 0x03); c->OnMetricBegin(metricType); // TODO: use it ReadFixed(); // skip flags byte auto metricNameValueIndex = std::numeric_limits::max(); if (Header_.Version >= SV1_02) { metricNameValueIndex = ReadVarint(); } // (5.2) labels ui32 labelsCount = ReadVarint(); DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels"); c->OnLabelsBegin(); if (Header_.Version >= SV1_02) { c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex)); } ReadLabels(labelNames, labelValues, labelsCount, c); c->OnLabelsEnd(); // (5.3) values switch (valueType) { case EValueType::NONE: break; case EValueType::ONE_WITHOUT_TS: ReadValue(metricType, TInstant::Zero(), c); break; case EValueType::ONE_WITH_TS: { TInstant time = ReadTime(); ReadValue(metricType, time, c); break; } case EValueType::MANY_WITH_TS: { ui32 pointsCount = ReadVarint(); for (ui32 i = 0; i < pointsCount; i++) { TInstant time = ReadTime(); ReadValue(metricType, time, c); } break; } } c->OnMetricEnd(); } } void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) { switch (metricType) { case EMetricType::GAUGE: c->OnDouble(time, ReadFixed()); break; case EMetricType::IGAUGE: c->OnInt64(time, ReadFixed()); break; case EMetricType::COUNTER: case EMetricType::RATE: c->OnUint64(time, ReadFixed()); break; case EMetricType::DSUMMARY: c->OnSummaryDouble(time, ReadSummaryDouble()); break; case EMetricType::HIST: case EMetricType::HIST_RATE: c->OnHistogram(time, ReadHistogram()); break; case EMetricType::LOGHIST: c->OnLogHistogram(time, ReadLogHistogram()); break; default: throw TSpackDecodeError() << "Unsupported metric type: " << metricType; } } ISummaryDoubleSnapshotPtr ReadSummaryDouble() { ui64 count = ReadFixed(); double sum = ReadFixed(); double min = ReadFixed(); double max = ReadFixed(); double last = ReadFixed(); return MakeIntrusive(sum, min, max, last, count); } TLogHistogramSnapshotPtr ReadLogHistogram() { double base = ReadFixed(); ui64 zerosCount = ReadFixed(); int startPower = static_cast(ReadVarint()); ui32 count = ReadVarint(); // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31 // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9 // TODO: share this constant value Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count); TVector buckets; buckets.reserve(count); for (ui32 i = 0; i < count; ++i) { buckets.emplace_back(ReadFixed()); } return MakeIntrusive(base, zerosCount, startPower, std::move(buckets)); } IHistogramSnapshotPtr ReadHistogram() { ui32 bucketsCount = ReadVarint(); auto s = TExplicitHistogramSnapshot::New(bucketsCount); if (SV1_00 == Header_.Version) { // v1.0 for (ui32 i = 0; i < bucketsCount; i++) { i64 bound = ReadFixed(); double doubleBound = (bound != Max()) ? static_cast(bound) : Max(); (*s)[i].first = doubleBound; } } else { for (ui32 i = 0; i < bucketsCount; i++) { double doubleBound = ReadFixed(); (*s)[i].first = doubleBound; } } // values for (ui32 i = 0; i < bucketsCount; i++) { (*s)[i].second = ReadFixed(); } return s; } void ReadLabels( const TStringPool& labelNames, const TStringPool& labelValues, ui32 count, IMetricConsumer* c) { for (ui32 i = 0; i < count; i++) { auto nameIdx = ReadVarint(); auto valueIdx = ReadVarint(); c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx)); } } TInstant ReadTime() { switch (TimePrecision_) { case ETimePrecision::SECONDS: return TInstant::Seconds(ReadFixed()); case ETimePrecision::MILLIS: return TInstant::MilliSeconds(ReadFixed()); } Y_ABORT("invalid time precision"); } template inline T ReadFixed() { T value; size_t readBytes = In_->Load(&value, sizeof(T)); DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName()); return value; } inline ui32 ReadVarint() { return ReadVarUInt32(In_); } private: IInputStream* In_; TString MetricNameLabel_; ETimePrecision TimePrecision_; TSpackHeader Header_; }; // class TDecoderSpackV1 #undef DECODE_ENSURE } // namespace EValueType DecodeValueType(ui8 byte) { EValueType result; if (!TryDecodeValueType(byte, &result)) { throw TSpackDecodeError() << "unknown value type: " << byte; } return result; } bool TryDecodeValueType(ui8 byte, EValueType* result) { if (byte == EncodeValueType(EValueType::NONE)) { if (result) { *result = EValueType::NONE; } return true; } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) { if (result) { *result = EValueType::ONE_WITHOUT_TS; } return true; } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) { if (result) { *result = EValueType::ONE_WITH_TS; } return true; } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) { if (result) { *result = EValueType::MANY_WITH_TS; } return true; } else { return false; } } ETimePrecision DecodeTimePrecision(ui8 byte) { ETimePrecision result; if (!TryDecodeTimePrecision(byte, &result)) { throw TSpackDecodeError() << "unknown time precision: " << byte; } return result; } bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) { if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) { if (result) { *result = ETimePrecision::SECONDS; } return true; } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) { if (result) { *result = ETimePrecision::MILLIS; } return true; } else { return false; } } EMetricType DecodeMetricType(ui8 byte) { EMetricType result; if (!TryDecodeMetricType(byte, &result)) { throw TSpackDecodeError() << "unknown metric type: " << byte; } return result; } bool TryDecodeMetricType(ui8 byte, EMetricType* result) { if (byte == EncodeMetricType(EMetricType::GAUGE)) { if (result) { *result = EMetricType::GAUGE; } return true; } else if (byte == EncodeMetricType(EMetricType::COUNTER)) { if (result) { *result = EMetricType::COUNTER; } return true; } else if (byte == EncodeMetricType(EMetricType::RATE)) { if (result) { *result = EMetricType::RATE; } return true; } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) { if (result) { *result = EMetricType::IGAUGE; } return true; } else if (byte == EncodeMetricType(EMetricType::HIST)) { if (result) { *result = EMetricType::HIST; } return true; } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) { if (result) { *result = EMetricType::HIST_RATE; } return true; } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) { if (result) { *result = EMetricType::DSUMMARY; } return true; } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) { if (result) { *result = EMetricType::LOGHIST; } return true; } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) { if (result) { *result = EMetricType::UNKNOWN; } return true; } else { return false; } } ui8 EncodeCompression(ECompression c) noexcept { switch (c) { case ECompression::IDENTITY: return 0x00; case ECompression::ZLIB: return 0x01; case ECompression::ZSTD: return 0x02; case ECompression::LZ4: return 0x03; case ECompression::UNKNOWN: return Max(); } Y_ABORT(); // for GCC } ECompression DecodeCompression(ui8 byte) { ECompression result; if (!TryDecodeCompression(byte, &result)) { throw TSpackDecodeError() << "unknown compression alg: " << byte; } return result; } bool TryDecodeCompression(ui8 byte, ECompression* result) { if (byte == EncodeCompression(ECompression::IDENTITY)) { if (result) { *result = ECompression::IDENTITY; } return true; } else if (byte == EncodeCompression(ECompression::ZLIB)) { if (result) { *result = ECompression::ZLIB; } return true; } else if (byte == EncodeCompression(ECompression::ZSTD)) { if (result) { *result = ECompression::ZSTD; } return true; } else if (byte == EncodeCompression(ECompression::LZ4)) { if (result) { *result = ECompression::LZ4; } return true; } else { return false; } } void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) { TDecoderSpackV1 decoder(in, metricNameLabel); decoder.Decode(c); } }