123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- #include "spack_v1.h"
- #include "varint.h"
- #include "compression.h"
- #include <library/cpp/monlib/encode/buffered/string_pool.h>
- #include <library/cpp/monlib/exception/exception.h>
- #include <library/cpp/monlib/metrics/histogram_collector.h>
- #include <library/cpp/monlib/metrics/metric.h>
- #include <util/generic/yexception.h>
- #include <util/generic/buffer.h>
- #include <util/generic/size_literals.h>
- #include <util/stream/format.h>
- #ifndef _little_endian_
- #error Unsupported platform
- #endif
- namespace NMonitoring {
- namespace {
- #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)
- constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;
- ///////////////////////////////////////////////////////////////////////
- // TDecoderSpackV1
- ///////////////////////////////////////////////////////////////////////
- class TDecoderSpackV1 {
- public:
- TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
- : In_(in)
- , MetricNameLabel_(metricNameLabel)
- {
- }
- void Decode(IMetricConsumer* c) {
- c->OnStreamBegin();
- // (1) read header
- size_t readBytes = In_->Read(&Header_, sizeof(Header_));
- DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");
- ui8 version = ((Header_.Version >> 8) & 0xff);
- DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');
- DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
- if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
- DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
- }
- if (Header_.MetricCount == 0) {
- // emulate empty stream
- c->OnStreamEnd();
- return;
- }
- // if compression enabled all below reads must go throught decompressor
- auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
- if (compressedIn) {
- In_ = compressedIn.Get();
- }
- TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);
- const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;
- DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
- << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));
- // (2) read string pools
- TVector<char> namesBuf(Header_.LabelNamesSize);
- readBytes = In_->Load(namesBuf.data(), namesBuf.size());
- DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
- TStringPool labelNames(namesBuf.data(), namesBuf.size());
- TVector<char> valuesBuf(Header_.LabelValuesSize);
- readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
- DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
- TStringPool labelValues(valuesBuf.data(), valuesBuf.size());
- // (3) read common time
- c->OnCommonTime(ReadTime());
- // (4) read common labels
- if (ui32 commonLabelsCount = ReadVarint()) {
- c->OnLabelsBegin();
- ReadLabels(labelNames, labelValues, commonLabelsCount, c);
- c->OnLabelsEnd();
- }
- // (5) read metrics
- ReadMetrics(labelNames, labelValues, c);
- c->OnStreamEnd();
- }
- private:
- void ReadMetrics(
- const TStringPool& labelNames,
- const TStringPool& labelValues,
- IMetricConsumer* c)
- {
- for (ui32 i = 0; i < Header_.MetricCount; i++) {
- // (5.1) types byte
- ui8 typesByte = ReadFixed<ui8>();
- EMetricType metricType = DecodeMetricType(typesByte >> 2);
- EValueType valueType = DecodeValueType(typesByte & 0x03);
- c->OnMetricBegin(metricType);
- // TODO: use it
- ReadFixed<ui8>(); // skip flags byte
- auto metricNameValueIndex = std::numeric_limits<ui32>::max();
- if (Header_.Version >= SV1_02) {
- metricNameValueIndex = ReadVarint();
- }
- // (5.2) labels
- ui32 labelsCount = ReadVarint();
- DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
- c->OnLabelsBegin();
- if (Header_.Version >= SV1_02) {
- c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
- }
- ReadLabels(labelNames, labelValues, labelsCount, c);
- c->OnLabelsEnd();
- // (5.3) values
- switch (valueType) {
- case EValueType::NONE:
- break;
- case EValueType::ONE_WITHOUT_TS:
- ReadValue(metricType, TInstant::Zero(), c);
- break;
- case EValueType::ONE_WITH_TS: {
- TInstant time = ReadTime();
- ReadValue(metricType, time, c);
- break;
- }
- case EValueType::MANY_WITH_TS: {
- ui32 pointsCount = ReadVarint();
- for (ui32 i = 0; i < pointsCount; i++) {
- TInstant time = ReadTime();
- ReadValue(metricType, time, c);
- }
- break;
- }
- }
- c->OnMetricEnd();
- }
- }
- void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
- switch (metricType) {
- case EMetricType::GAUGE:
- c->OnDouble(time, ReadFixed<double>());
- break;
- case EMetricType::IGAUGE:
- c->OnInt64(time, ReadFixed<i64>());
- break;
- case EMetricType::COUNTER:
- case EMetricType::RATE:
- c->OnUint64(time, ReadFixed<ui64>());
- break;
- case EMetricType::DSUMMARY:
- c->OnSummaryDouble(time, ReadSummaryDouble());
- break;
- case EMetricType::HIST:
- case EMetricType::HIST_RATE:
- c->OnHistogram(time, ReadHistogram());
- break;
- case EMetricType::LOGHIST:
- c->OnLogHistogram(time, ReadLogHistogram());
- break;
- default:
- throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
- }
- }
- ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
- ui64 count = ReadFixed<ui64>();
- double sum = ReadFixed<double>();
- double min = ReadFixed<double>();
- double max = ReadFixed<double>();
- double last = ReadFixed<double>();
- return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count);
- }
- TLogHistogramSnapshotPtr ReadLogHistogram() {
- double base = ReadFixed<double>();
- ui64 zerosCount = ReadFixed<ui64>();
- int startPower = static_cast<int>(ReadVarint());
- ui32 count = ReadVarint();
- // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
- // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
- // TODO: share this constant value
- Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
- TVector<double> buckets;
- buckets.reserve(count);
- for (ui32 i = 0; i < count; ++i) {
- buckets.emplace_back(ReadFixed<double>());
- }
- return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
- }
- IHistogramSnapshotPtr ReadHistogram() {
- ui32 bucketsCount = ReadVarint();
- auto s = TExplicitHistogramSnapshot::New(bucketsCount);
- if (SV1_00 == Header_.Version) { // v1.0
- for (ui32 i = 0; i < bucketsCount; i++) {
- i64 bound = ReadFixed<i64>();
- double doubleBound = (bound != Max<i64>())
- ? static_cast<double>(bound)
- : Max<double>();
- (*s)[i].first = doubleBound;
- }
- } else {
- for (ui32 i = 0; i < bucketsCount; i++) {
- double doubleBound = ReadFixed<double>();
- (*s)[i].first = doubleBound;
- }
- }
- // values
- for (ui32 i = 0; i < bucketsCount; i++) {
- (*s)[i].second = ReadFixed<ui64>();
- }
- return s;
- }
- void ReadLabels(
- const TStringPool& labelNames,
- const TStringPool& labelValues,
- ui32 count,
- IMetricConsumer* c)
- {
- for (ui32 i = 0; i < count; i++) {
- auto nameIdx = ReadVarint();
- auto valueIdx = ReadVarint();
- c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
- }
- }
- TInstant ReadTime() {
- switch (TimePrecision_) {
- case ETimePrecision::SECONDS:
- return TInstant::Seconds(ReadFixed<ui32>());
- case ETimePrecision::MILLIS:
- return TInstant::MilliSeconds(ReadFixed<ui64>());
- }
- Y_ABORT("invalid time precision");
- }
- template <typename T>
- inline T ReadFixed() {
- T value;
- size_t readBytes = In_->Load(&value, sizeof(T));
- DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
- return value;
- }
- inline ui32 ReadVarint() {
- return ReadVarUInt32(In_);
- }
- private:
- IInputStream* In_;
- TString MetricNameLabel_;
- ETimePrecision TimePrecision_;
- TSpackHeader Header_;
- }; // class TDecoderSpackV1
- #undef DECODE_ENSURE
- } // namespace
- EValueType DecodeValueType(ui8 byte) {
- EValueType result;
- if (!TryDecodeValueType(byte, &result)) {
- throw TSpackDecodeError() << "unknown value type: " << byte;
- }
- return result;
- }
- bool TryDecodeValueType(ui8 byte, EValueType* result) {
- if (byte == EncodeValueType(EValueType::NONE)) {
- if (result) {
- *result = EValueType::NONE;
- }
- return true;
- } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
- if (result) {
- *result = EValueType::ONE_WITHOUT_TS;
- }
- return true;
- } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
- if (result) {
- *result = EValueType::ONE_WITH_TS;
- }
- return true;
- } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
- if (result) {
- *result = EValueType::MANY_WITH_TS;
- }
- return true;
- } else {
- return false;
- }
- }
- ETimePrecision DecodeTimePrecision(ui8 byte) {
- ETimePrecision result;
- if (!TryDecodeTimePrecision(byte, &result)) {
- throw TSpackDecodeError() << "unknown time precision: " << byte;
- }
- return result;
- }
- bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
- if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
- if (result) {
- *result = ETimePrecision::SECONDS;
- }
- return true;
- } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
- if (result) {
- *result = ETimePrecision::MILLIS;
- }
- return true;
- } else {
- return false;
- }
- }
- EMetricType DecodeMetricType(ui8 byte) {
- EMetricType result;
- if (!TryDecodeMetricType(byte, &result)) {
- throw TSpackDecodeError() << "unknown metric type: " << byte;
- }
- return result;
- }
- bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
- if (byte == EncodeMetricType(EMetricType::GAUGE)) {
- if (result) {
- *result = EMetricType::GAUGE;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
- if (result) {
- *result = EMetricType::COUNTER;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::RATE)) {
- if (result) {
- *result = EMetricType::RATE;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
- if (result) {
- *result = EMetricType::IGAUGE;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::HIST)) {
- if (result) {
- *result = EMetricType::HIST;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
- if (result) {
- *result = EMetricType::HIST_RATE;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
- if (result) {
- *result = EMetricType::DSUMMARY;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) {
- if (result) {
- *result = EMetricType::LOGHIST;
- }
- return true;
- } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
- if (result) {
- *result = EMetricType::UNKNOWN;
- }
- return true;
- } else {
- return false;
- }
- }
- ui8 EncodeCompression(ECompression c) noexcept {
- switch (c) {
- case ECompression::IDENTITY:
- return 0x00;
- case ECompression::ZLIB:
- return 0x01;
- case ECompression::ZSTD:
- return 0x02;
- case ECompression::LZ4:
- return 0x03;
- case ECompression::UNKNOWN:
- return Max<ui8>();
- }
- Y_ABORT(); // for GCC
- }
- ECompression DecodeCompression(ui8 byte) {
- ECompression result;
- if (!TryDecodeCompression(byte, &result)) {
- throw TSpackDecodeError() << "unknown compression alg: " << byte;
- }
- return result;
- }
- bool TryDecodeCompression(ui8 byte, ECompression* result) {
- if (byte == EncodeCompression(ECompression::IDENTITY)) {
- if (result) {
- *result = ECompression::IDENTITY;
- }
- return true;
- } else if (byte == EncodeCompression(ECompression::ZLIB)) {
- if (result) {
- *result = ECompression::ZLIB;
- }
- return true;
- } else if (byte == EncodeCompression(ECompression::ZSTD)) {
- if (result) {
- *result = ECompression::ZSTD;
- }
- return true;
- } else if (byte == EncodeCompression(ECompression::LZ4)) {
- if (result) {
- *result = ECompression::LZ4;
- }
- return true;
- } else {
- return false;
- }
- }
- void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
- TDecoderSpackV1 decoder(in, metricNameLabel);
- decoder.Decode(c);
- }
- }
|