spack_v1_encoder.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. #include "spack_v1.h"
  2. #include "compression.h"
  3. #include "varint.h"
  4. #include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h>
  5. #include <util/generic/cast.h>
  6. #include <util/datetime/base.h>
  7. #include <util/string/builder.h>
  8. #ifndef _little_endian_
  9. #error Unsupported platform
  10. #endif
  11. namespace NMonitoring {
  12. namespace {
  13. ///////////////////////////////////////////////////////////////////////
  14. // TEncoderSpackV1
  15. ///////////////////////////////////////////////////////////////////////
  16. class TEncoderSpackV1 final: public TBufferedEncoderBase {
  17. public:
  18. TEncoderSpackV1(
  19. IOutputStream* out,
  20. ETimePrecision timePrecision,
  21. ECompression compression,
  22. EMetricsMergingMode mergingMode,
  23. ESpackV1Version version,
  24. TStringBuf metricNameLabel
  25. )
  26. : Out_(out)
  27. , TimePrecision_(timePrecision)
  28. , Compression_(compression)
  29. , Version_(version)
  30. , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
  31. {
  32. MetricsMergingMode_ = mergingMode;
  33. LabelNamesPool_.SetSorted(true);
  34. LabelValuesPool_.SetSorted(true);
  35. }
  36. ~TEncoderSpackV1() override {
  37. Close();
  38. }
  39. private:
  40. void OnDouble(TInstant time, double value) override {
  41. TBufferedEncoderBase::OnDouble(time, value);
  42. }
  43. void OnInt64(TInstant time, i64 value) override {
  44. TBufferedEncoderBase::OnInt64(time, value);
  45. }
  46. void OnUint64(TInstant time, ui64 value) override {
  47. TBufferedEncoderBase::OnUint64(time, value);
  48. }
  49. void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
  50. TBufferedEncoderBase::OnHistogram(time, snapshot);
  51. }
  52. void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
  53. TBufferedEncoderBase::OnSummaryDouble(time, snapshot);
  54. }
  55. void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
  56. TBufferedEncoderBase::OnLogHistogram(time, snapshot);
  57. }
  58. void Close() override {
  59. if (Closed_) {
  60. return;
  61. }
  62. Closed_ = true;
  63. LabelNamesPool_.Build();
  64. LabelValuesPool_.Build();
  65. // Sort all points uniquely by ts -- the size can decrease
  66. ui64 pointsCount = 0;
  67. for (TMetric& metric : Metrics_) {
  68. if (metric.TimeSeries.Size() > 1) {
  69. metric.TimeSeries.SortByTs();
  70. }
  71. pointsCount += metric.TimeSeries.Size();
  72. }
  73. // (1) write header
  74. TSpackHeader header;
  75. header.Version = Version_;
  76. header.TimePrecision = EncodeTimePrecision(TimePrecision_);
  77. header.Compression = EncodeCompression(Compression_);
  78. header.LabelNamesSize = static_cast<ui32>(
  79. LabelNamesPool_.BytesSize() + LabelNamesPool_.Count());
  80. header.LabelValuesSize = static_cast<ui32>(
  81. LabelValuesPool_.BytesSize() + LabelValuesPool_.Count());
  82. header.MetricCount = Metrics_.size();
  83. header.PointsCount = pointsCount;
  84. Out_->Write(&header, sizeof(header));
  85. // if compression enabled all below writes must go throught compressor
  86. auto compressedOut = CompressedOutput(Out_, Compression_);
  87. if (compressedOut) {
  88. Out_ = compressedOut.Get();
  89. }
  90. // (2) write string pools
  91. auto strPoolWrite = [this](TStringBuf str, ui32, ui32) {
  92. Out_->Write(str);
  93. Out_->Write('\0');
  94. };
  95. LabelNamesPool_.ForEach(strPoolWrite);
  96. LabelValuesPool_.ForEach(strPoolWrite);
  97. // (3) write common time
  98. WriteTime(CommonTime_);
  99. // (4) write common labels' indexes
  100. WriteLabels(CommonLabels_, nullptr);
  101. // (5) write metrics
  102. // metrics count already written in header
  103. for (TMetric& metric : Metrics_) {
  104. // (5.1) types byte
  105. ui8 typesByte = PackTypes(metric);
  106. Out_->Write(&typesByte, sizeof(typesByte));
  107. // TODO: implement
  108. ui8 flagsByte = 0x00;
  109. Out_->Write(&flagsByte, sizeof(flagsByte));
  110. // v1.2 format addition — metric name
  111. if (Version_ >= SV1_02) {
  112. const auto it = FindIf(metric.Labels, [&](const auto& l) {
  113. return l.Key == MetricName_;
  114. });
  115. Y_ENSURE(it != metric.Labels.end(),
  116. "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
  117. << "all metric labels '" << FormatLabels(metric.Labels) << "'");
  118. WriteVarUInt32(Out_, it->Value->Index);
  119. }
  120. // (5.2) labels
  121. WriteLabels(metric.Labels, MetricName_);
  122. // (5.3) values
  123. switch (metric.TimeSeries.Size()) {
  124. case 0:
  125. break;
  126. case 1: {
  127. const auto& point = metric.TimeSeries[0];
  128. if (point.GetTime() != TInstant::Zero()) {
  129. WriteTime(point.GetTime());
  130. }
  131. EMetricValueType valueType = metric.TimeSeries.GetValueType();
  132. WriteValue(metric.MetricType, valueType, point.GetValue());
  133. break;
  134. }
  135. default:
  136. WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size()));
  137. const TMetricTimeSeries& ts = metric.TimeSeries;
  138. EMetricType metricType = metric.MetricType;
  139. ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) {
  140. // workaround for GCC bug
  141. // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636
  142. this->WriteTime(time);
  143. this->WriteValue(metricType, valueType, value);
  144. });
  145. break;
  146. }
  147. }
  148. }
  149. // store metric type and values type in one byte
  150. ui8 PackTypes(const TMetric& metric) {
  151. EValueType valueType;
  152. if (metric.TimeSeries.Empty()) {
  153. valueType = EValueType::NONE;
  154. } else if (metric.TimeSeries.Size() == 1) {
  155. TInstant time = metric.TimeSeries[0].GetTime();
  156. valueType = (time == TInstant::Zero())
  157. ? EValueType::ONE_WITHOUT_TS
  158. : EValueType::ONE_WITH_TS;
  159. } else {
  160. valueType = EValueType::MANY_WITH_TS;
  161. }
  162. return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType);
  163. }
  164. void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
  165. WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
  166. for (auto&& label : labels) {
  167. if (label.Key == skipKey) {
  168. continue;
  169. }
  170. WriteVarUInt32(Out_, label.Key->Index);
  171. WriteVarUInt32(Out_, label.Value->Index);
  172. }
  173. }
  174. void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) {
  175. switch (metricType) {
  176. case EMetricType::GAUGE:
  177. WriteFixed(value.AsDouble(valueType));
  178. break;
  179. case EMetricType::IGAUGE:
  180. WriteFixed(value.AsInt64(valueType));
  181. break;
  182. case EMetricType::COUNTER:
  183. case EMetricType::RATE:
  184. WriteFixed(value.AsUint64(valueType));
  185. break;
  186. case EMetricType::HIST:
  187. case EMetricType::HIST_RATE:
  188. WriteHistogram(*value.AsHistogram());
  189. break;
  190. case EMetricType::DSUMMARY:
  191. WriteSummaryDouble(*value.AsSummaryDouble());
  192. break;
  193. case EMetricType::LOGHIST:
  194. WriteLogHistogram(*value.AsLogHistogram());
  195. break;
  196. default:
  197. ythrow yexception() << "unsupported metric type: " << metricType;
  198. }
  199. }
  200. void WriteTime(TInstant instant) {
  201. switch (TimePrecision_) {
  202. case ETimePrecision::SECONDS: {
  203. ui32 time = static_cast<ui32>(instant.Seconds());
  204. Out_->Write(&time, sizeof(time));
  205. break;
  206. }
  207. case ETimePrecision::MILLIS: {
  208. ui64 time = static_cast<ui64>(instant.MilliSeconds());
  209. Out_->Write(&time, sizeof(time));
  210. }
  211. }
  212. }
  213. template <typename T>
  214. void WriteFixed(T value) {
  215. Out_->Write(&value, sizeof(value));
  216. }
  217. void WriteHistogram(const IHistogramSnapshot& histogram) {
  218. ui32 count = histogram.Count();
  219. WriteVarUInt32(Out_, count);
  220. for (ui32 i = 0; i < count; i++) {
  221. double bound = histogram.UpperBound(i);
  222. Out_->Write(&bound, sizeof(bound));
  223. }
  224. for (ui32 i = 0; i < count; i++) {
  225. ui64 value = histogram.Value(i);
  226. Out_->Write(&value, sizeof(value));
  227. }
  228. }
  229. void WriteLogHistogram(const TLogHistogramSnapshot& logHist) {
  230. WriteFixed(logHist.Base());
  231. WriteFixed(logHist.ZerosCount());
  232. WriteVarUInt32(Out_, static_cast<ui32>(logHist.StartPower()));
  233. WriteVarUInt32(Out_, logHist.Count());
  234. for (ui32 i = 0; i < logHist.Count(); ++i) {
  235. WriteFixed(logHist.Bucket(i));
  236. }
  237. }
  238. void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) {
  239. WriteFixed(summary.GetCount());
  240. WriteFixed(summary.GetSum());
  241. WriteFixed(summary.GetMin());
  242. WriteFixed(summary.GetMax());
  243. WriteFixed(summary.GetLast());
  244. }
  245. private:
  246. IOutputStream* Out_;
  247. ETimePrecision TimePrecision_;
  248. ECompression Compression_;
  249. ESpackV1Version Version_;
  250. const TPooledStr* MetricName_;
  251. bool Closed_ = false;
  252. };
  253. }
  254. IMetricEncoderPtr EncoderSpackV1(
  255. IOutputStream* out,
  256. ETimePrecision timePrecision,
  257. ECompression compression,
  258. EMetricsMergingMode mergingMode
  259. ) {
  260. return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
  261. }
  262. IMetricEncoderPtr EncoderSpackV12(
  263. IOutputStream* out,
  264. ETimePrecision timePrecision,
  265. ECompression compression,
  266. EMetricsMergingMode mergingMode,
  267. TStringBuf metricNameLabel
  268. ) {
  269. Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
  270. return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
  271. }
  272. }