spack_v1_decoder.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. #include "spack_v1.h"
  2. #include "varint.h"
  3. #include "compression.h"
  4. #include <library/cpp/monlib/encode/buffered/string_pool.h>
  5. #include <library/cpp/monlib/exception/exception.h>
  6. #include <library/cpp/monlib/metrics/histogram_collector.h>
  7. #include <library/cpp/monlib/metrics/metric.h>
  8. #include <util/generic/yexception.h>
  9. #include <util/generic/buffer.h>
  10. #include <util/generic/size_literals.h>
  11. #include <util/stream/format.h>
  12. #ifndef _little_endian_
  13. #error Unsupported platform
  14. #endif
  15. namespace NMonitoring {
  16. namespace {
  17. #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)
  18. constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;
  19. ///////////////////////////////////////////////////////////////////////
  20. // TDecoderSpackV1
  21. ///////////////////////////////////////////////////////////////////////
  22. class TDecoderSpackV1 {
  23. public:
  24. TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
  25. : In_(in)
  26. , MetricNameLabel_(metricNameLabel)
  27. {
  28. }
  29. void Decode(IMetricConsumer* c) {
  30. c->OnStreamBegin();
  31. // (1) read header
  32. size_t readBytes = In_->Read(&Header_, sizeof(Header_));
  33. DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");
  34. ui8 version = ((Header_.Version >> 8) & 0xff);
  35. DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');
  36. DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
  37. if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
  38. DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
  39. }
  40. if (Header_.MetricCount == 0) {
  41. // emulate empty stream
  42. c->OnStreamEnd();
  43. return;
  44. }
  45. // if compression enabled all below reads must go throught decompressor
  46. auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
  47. if (compressedIn) {
  48. In_ = compressedIn.Get();
  49. }
  50. TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);
  51. const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;
  52. DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
  53. << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));
  54. // (2) read string pools
  55. TVector<char> namesBuf(Header_.LabelNamesSize);
  56. readBytes = In_->Load(namesBuf.data(), namesBuf.size());
  57. DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
  58. TStringPool labelNames(namesBuf.data(), namesBuf.size());
  59. TVector<char> valuesBuf(Header_.LabelValuesSize);
  60. readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
  61. DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
  62. TStringPool labelValues(valuesBuf.data(), valuesBuf.size());
  63. // (3) read common time
  64. c->OnCommonTime(ReadTime());
  65. // (4) read common labels
  66. if (ui32 commonLabelsCount = ReadVarint()) {
  67. c->OnLabelsBegin();
  68. ReadLabels(labelNames, labelValues, commonLabelsCount, c);
  69. c->OnLabelsEnd();
  70. }
  71. // (5) read metrics
  72. ReadMetrics(labelNames, labelValues, c);
  73. c->OnStreamEnd();
  74. }
  75. private:
  76. void ReadMetrics(
  77. const TStringPool& labelNames,
  78. const TStringPool& labelValues,
  79. IMetricConsumer* c)
  80. {
  81. for (ui32 i = 0; i < Header_.MetricCount; i++) {
  82. // (5.1) types byte
  83. ui8 typesByte = ReadFixed<ui8>();
  84. EMetricType metricType = DecodeMetricType(typesByte >> 2);
  85. EValueType valueType = DecodeValueType(typesByte & 0x03);
  86. c->OnMetricBegin(metricType);
  87. // TODO: use it
  88. ReadFixed<ui8>(); // skip flags byte
  89. auto metricNameValueIndex = std::numeric_limits<ui32>::max();
  90. if (Header_.Version >= SV1_02) {
  91. metricNameValueIndex = ReadVarint();
  92. }
  93. // (5.2) labels
  94. ui32 labelsCount = ReadVarint();
  95. DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
  96. c->OnLabelsBegin();
  97. if (Header_.Version >= SV1_02) {
  98. c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
  99. }
  100. ReadLabels(labelNames, labelValues, labelsCount, c);
  101. c->OnLabelsEnd();
  102. // (5.3) values
  103. switch (valueType) {
  104. case EValueType::NONE:
  105. break;
  106. case EValueType::ONE_WITHOUT_TS:
  107. ReadValue(metricType, TInstant::Zero(), c);
  108. break;
  109. case EValueType::ONE_WITH_TS: {
  110. TInstant time = ReadTime();
  111. ReadValue(metricType, time, c);
  112. break;
  113. }
  114. case EValueType::MANY_WITH_TS: {
  115. ui32 pointsCount = ReadVarint();
  116. for (ui32 i = 0; i < pointsCount; i++) {
  117. TInstant time = ReadTime();
  118. ReadValue(metricType, time, c);
  119. }
  120. break;
  121. }
  122. }
  123. c->OnMetricEnd();
  124. }
  125. }
  126. void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
  127. switch (metricType) {
  128. case EMetricType::GAUGE:
  129. c->OnDouble(time, ReadFixed<double>());
  130. break;
  131. case EMetricType::IGAUGE:
  132. c->OnInt64(time, ReadFixed<i64>());
  133. break;
  134. case EMetricType::COUNTER:
  135. case EMetricType::RATE:
  136. c->OnUint64(time, ReadFixed<ui64>());
  137. break;
  138. case EMetricType::DSUMMARY:
  139. c->OnSummaryDouble(time, ReadSummaryDouble());
  140. break;
  141. case EMetricType::HIST:
  142. case EMetricType::HIST_RATE:
  143. c->OnHistogram(time, ReadHistogram());
  144. break;
  145. case EMetricType::LOGHIST:
  146. c->OnLogHistogram(time, ReadLogHistogram());
  147. break;
  148. default:
  149. throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
  150. }
  151. }
  152. ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
  153. ui64 count = ReadFixed<ui64>();
  154. double sum = ReadFixed<double>();
  155. double min = ReadFixed<double>();
  156. double max = ReadFixed<double>();
  157. double last = ReadFixed<double>();
  158. return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count);
  159. }
  160. TLogHistogramSnapshotPtr ReadLogHistogram() {
  161. double base = ReadFixed<double>();
  162. ui64 zerosCount = ReadFixed<ui64>();
  163. int startPower = static_cast<int>(ReadVarint());
  164. ui32 count = ReadVarint();
  165. // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
  166. // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
  167. // TODO: share this constant value
  168. Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
  169. TVector<double> buckets;
  170. buckets.reserve(count);
  171. for (ui32 i = 0; i < count; ++i) {
  172. buckets.emplace_back(ReadFixed<double>());
  173. }
  174. return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
  175. }
  176. IHistogramSnapshotPtr ReadHistogram() {
  177. ui32 bucketsCount = ReadVarint();
  178. auto s = TExplicitHistogramSnapshot::New(bucketsCount);
  179. if (SV1_00 == Header_.Version) { // v1.0
  180. for (ui32 i = 0; i < bucketsCount; i++) {
  181. i64 bound = ReadFixed<i64>();
  182. double doubleBound = (bound != Max<i64>())
  183. ? static_cast<double>(bound)
  184. : Max<double>();
  185. (*s)[i].first = doubleBound;
  186. }
  187. } else {
  188. for (ui32 i = 0; i < bucketsCount; i++) {
  189. double doubleBound = ReadFixed<double>();
  190. (*s)[i].first = doubleBound;
  191. }
  192. }
  193. // values
  194. for (ui32 i = 0; i < bucketsCount; i++) {
  195. (*s)[i].second = ReadFixed<ui64>();
  196. }
  197. return s;
  198. }
  199. void ReadLabels(
  200. const TStringPool& labelNames,
  201. const TStringPool& labelValues,
  202. ui32 count,
  203. IMetricConsumer* c)
  204. {
  205. for (ui32 i = 0; i < count; i++) {
  206. auto nameIdx = ReadVarint();
  207. auto valueIdx = ReadVarint();
  208. c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
  209. }
  210. }
  211. TInstant ReadTime() {
  212. switch (TimePrecision_) {
  213. case ETimePrecision::SECONDS:
  214. return TInstant::Seconds(ReadFixed<ui32>());
  215. case ETimePrecision::MILLIS:
  216. return TInstant::MilliSeconds(ReadFixed<ui64>());
  217. }
  218. Y_ABORT("invalid time precision");
  219. }
  220. template <typename T>
  221. inline T ReadFixed() {
  222. T value;
  223. size_t readBytes = In_->Load(&value, sizeof(T));
  224. DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
  225. return value;
  226. }
  227. inline ui32 ReadVarint() {
  228. return ReadVarUInt32(In_);
  229. }
  230. private:
  231. IInputStream* In_;
  232. TString MetricNameLabel_;
  233. ETimePrecision TimePrecision_;
  234. TSpackHeader Header_;
  235. }; // class TDecoderSpackV1
  236. #undef DECODE_ENSURE
  237. } // namespace
  238. EValueType DecodeValueType(ui8 byte) {
  239. EValueType result;
  240. if (!TryDecodeValueType(byte, &result)) {
  241. throw TSpackDecodeError() << "unknown value type: " << byte;
  242. }
  243. return result;
  244. }
  245. bool TryDecodeValueType(ui8 byte, EValueType* result) {
  246. if (byte == EncodeValueType(EValueType::NONE)) {
  247. if (result) {
  248. *result = EValueType::NONE;
  249. }
  250. return true;
  251. } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
  252. if (result) {
  253. *result = EValueType::ONE_WITHOUT_TS;
  254. }
  255. return true;
  256. } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
  257. if (result) {
  258. *result = EValueType::ONE_WITH_TS;
  259. }
  260. return true;
  261. } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
  262. if (result) {
  263. *result = EValueType::MANY_WITH_TS;
  264. }
  265. return true;
  266. } else {
  267. return false;
  268. }
  269. }
  270. ETimePrecision DecodeTimePrecision(ui8 byte) {
  271. ETimePrecision result;
  272. if (!TryDecodeTimePrecision(byte, &result)) {
  273. throw TSpackDecodeError() << "unknown time precision: " << byte;
  274. }
  275. return result;
  276. }
  277. bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
  278. if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
  279. if (result) {
  280. *result = ETimePrecision::SECONDS;
  281. }
  282. return true;
  283. } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
  284. if (result) {
  285. *result = ETimePrecision::MILLIS;
  286. }
  287. return true;
  288. } else {
  289. return false;
  290. }
  291. }
  292. EMetricType DecodeMetricType(ui8 byte) {
  293. EMetricType result;
  294. if (!TryDecodeMetricType(byte, &result)) {
  295. throw TSpackDecodeError() << "unknown metric type: " << byte;
  296. }
  297. return result;
  298. }
  299. bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
  300. if (byte == EncodeMetricType(EMetricType::GAUGE)) {
  301. if (result) {
  302. *result = EMetricType::GAUGE;
  303. }
  304. return true;
  305. } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
  306. if (result) {
  307. *result = EMetricType::COUNTER;
  308. }
  309. return true;
  310. } else if (byte == EncodeMetricType(EMetricType::RATE)) {
  311. if (result) {
  312. *result = EMetricType::RATE;
  313. }
  314. return true;
  315. } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
  316. if (result) {
  317. *result = EMetricType::IGAUGE;
  318. }
  319. return true;
  320. } else if (byte == EncodeMetricType(EMetricType::HIST)) {
  321. if (result) {
  322. *result = EMetricType::HIST;
  323. }
  324. return true;
  325. } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
  326. if (result) {
  327. *result = EMetricType::HIST_RATE;
  328. }
  329. return true;
  330. } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
  331. if (result) {
  332. *result = EMetricType::DSUMMARY;
  333. }
  334. return true;
  335. } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) {
  336. if (result) {
  337. *result = EMetricType::LOGHIST;
  338. }
  339. return true;
  340. } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
  341. if (result) {
  342. *result = EMetricType::UNKNOWN;
  343. }
  344. return true;
  345. } else {
  346. return false;
  347. }
  348. }
  349. ui8 EncodeCompression(ECompression c) noexcept {
  350. switch (c) {
  351. case ECompression::IDENTITY:
  352. return 0x00;
  353. case ECompression::ZLIB:
  354. return 0x01;
  355. case ECompression::ZSTD:
  356. return 0x02;
  357. case ECompression::LZ4:
  358. return 0x03;
  359. case ECompression::UNKNOWN:
  360. return Max<ui8>();
  361. }
  362. Y_ABORT(); // for GCC
  363. }
  364. ECompression DecodeCompression(ui8 byte) {
  365. ECompression result;
  366. if (!TryDecodeCompression(byte, &result)) {
  367. throw TSpackDecodeError() << "unknown compression alg: " << byte;
  368. }
  369. return result;
  370. }
  371. bool TryDecodeCompression(ui8 byte, ECompression* result) {
  372. if (byte == EncodeCompression(ECompression::IDENTITY)) {
  373. if (result) {
  374. *result = ECompression::IDENTITY;
  375. }
  376. return true;
  377. } else if (byte == EncodeCompression(ECompression::ZLIB)) {
  378. if (result) {
  379. *result = ECompression::ZLIB;
  380. }
  381. return true;
  382. } else if (byte == EncodeCompression(ECompression::ZSTD)) {
  383. if (result) {
  384. *result = ECompression::ZSTD;
  385. }
  386. return true;
  387. } else if (byte == EncodeCompression(ECompression::LZ4)) {
  388. if (result) {
  389. *result = ECompression::LZ4;
  390. }
  391. return true;
  392. } else {
  393. return false;
  394. }
  395. }
  396. void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
  397. TDecoderSpackV1 decoder(in, metricNameLabel);
  398. decoder.Decode(c);
  399. }
  400. }