#include "spack_v1.h" #include "compression.h" #include "varint.h" #include #include #include #include #ifndef _little_endian_ #error Unsupported platform #endif namespace NMonitoring { namespace { /////////////////////////////////////////////////////////////////////// // TEncoderSpackV1 /////////////////////////////////////////////////////////////////////// class TEncoderSpackV1 final: public TBufferedEncoderBase { public: TEncoderSpackV1( IOutputStream* out, ETimePrecision timePrecision, ECompression compression, EMetricsMergingMode mergingMode, ESpackV1Version version, TStringBuf metricNameLabel ) : Out_(out) , TimePrecision_(timePrecision) , Compression_(compression) , Version_(version) , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr) { MetricsMergingMode_ = mergingMode; LabelNamesPool_.SetSorted(true); LabelValuesPool_.SetSorted(true); } ~TEncoderSpackV1() override { Close(); } private: void OnDouble(TInstant time, double value) override { TBufferedEncoderBase::OnDouble(time, value); } void OnInt64(TInstant time, i64 value) override { TBufferedEncoderBase::OnInt64(time, value); } void OnUint64(TInstant time, ui64 value) override { TBufferedEncoderBase::OnUint64(time, value); } void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override { TBufferedEncoderBase::OnHistogram(time, snapshot); } void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override { TBufferedEncoderBase::OnSummaryDouble(time, snapshot); } void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override { TBufferedEncoderBase::OnLogHistogram(time, snapshot); } void Close() override { if (Closed_) { return; } Closed_ = true; LabelNamesPool_.Build(); LabelValuesPool_.Build(); // Sort all points uniquely by ts -- the size can decrease ui64 pointsCount = 0; for (TMetric& metric : Metrics_) { if (metric.TimeSeries.Size() > 1) { metric.TimeSeries.SortByTs(); } pointsCount += metric.TimeSeries.Size(); } // (1) write header TSpackHeader header; header.Version = Version_; header.TimePrecision = EncodeTimePrecision(TimePrecision_); header.Compression = EncodeCompression(Compression_); header.LabelNamesSize = static_cast( LabelNamesPool_.BytesSize() + LabelNamesPool_.Count()); header.LabelValuesSize = static_cast( LabelValuesPool_.BytesSize() + LabelValuesPool_.Count()); header.MetricCount = Metrics_.size(); header.PointsCount = pointsCount; Out_->Write(&header, sizeof(header)); // if compression enabled all below writes must go throught compressor auto compressedOut = CompressedOutput(Out_, Compression_); if (compressedOut) { Out_ = compressedOut.Get(); } // (2) write string pools auto strPoolWrite = [this](TStringBuf str, ui32, ui32) { Out_->Write(str); Out_->Write('\0'); }; LabelNamesPool_.ForEach(strPoolWrite); LabelValuesPool_.ForEach(strPoolWrite); // (3) write common time WriteTime(CommonTime_); // (4) write common labels' indexes WriteLabels(CommonLabels_, nullptr); // (5) write metrics // metrics count already written in header for (TMetric& metric : Metrics_) { // (5.1) types byte ui8 typesByte = PackTypes(metric); Out_->Write(&typesByte, sizeof(typesByte)); // TODO: implement ui8 flagsByte = 0x00; Out_->Write(&flagsByte, sizeof(flagsByte)); // v1.2 format addition — metric name if (Version_ >= SV1_02) { const auto it = FindIf(metric.Labels, [&](const auto& l) { return l.Key == MetricName_; }); Y_ENSURE(it != metric.Labels.end(), "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, " << "all metric labels '" << FormatLabels(metric.Labels) << "'"); WriteVarUInt32(Out_, it->Value->Index); } // (5.2) labels WriteLabels(metric.Labels, MetricName_); // (5.3) values switch (metric.TimeSeries.Size()) { case 0: break; case 1: { const auto& point = metric.TimeSeries[0]; if (point.GetTime() != TInstant::Zero()) { WriteTime(point.GetTime()); } EMetricValueType valueType = metric.TimeSeries.GetValueType(); WriteValue(metric.MetricType, valueType, point.GetValue()); break; } default: WriteVarUInt32(Out_, static_cast(metric.TimeSeries.Size())); const TMetricTimeSeries& ts = metric.TimeSeries; EMetricType metricType = metric.MetricType; ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) { // workaround for GCC bug // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636 this->WriteTime(time); this->WriteValue(metricType, valueType, value); }); break; } } } // store metric type and values type in one byte ui8 PackTypes(const TMetric& metric) { EValueType valueType; if (metric.TimeSeries.Empty()) { valueType = EValueType::NONE; } else if (metric.TimeSeries.Size() == 1) { TInstant time = metric.TimeSeries[0].GetTime(); valueType = (time == TInstant::Zero()) ? EValueType::ONE_WITHOUT_TS : EValueType::ONE_WITH_TS; } else { valueType = EValueType::MANY_WITH_TS; } return (static_cast(metric.MetricType) << 2) | static_cast(valueType); } void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) { WriteVarUInt32(Out_, static_cast(skipKey ? labels.size() - 1 : labels.size())); for (auto&& label : labels) { if (label.Key == skipKey) { continue; } WriteVarUInt32(Out_, label.Key->Index); WriteVarUInt32(Out_, label.Value->Index); } } void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) { switch (metricType) { case EMetricType::GAUGE: WriteFixed(value.AsDouble(valueType)); break; case EMetricType::IGAUGE: WriteFixed(value.AsInt64(valueType)); break; case EMetricType::COUNTER: case EMetricType::RATE: WriteFixed(value.AsUint64(valueType)); break; case EMetricType::HIST: case EMetricType::HIST_RATE: WriteHistogram(*value.AsHistogram()); break; case EMetricType::DSUMMARY: WriteSummaryDouble(*value.AsSummaryDouble()); break; case EMetricType::LOGHIST: WriteLogHistogram(*value.AsLogHistogram()); break; default: ythrow yexception() << "unsupported metric type: " << metricType; } } void WriteTime(TInstant instant) { switch (TimePrecision_) { case ETimePrecision::SECONDS: { ui32 time = static_cast(instant.Seconds()); Out_->Write(&time, sizeof(time)); break; } case ETimePrecision::MILLIS: { ui64 time = static_cast(instant.MilliSeconds()); Out_->Write(&time, sizeof(time)); } } } template void WriteFixed(T value) { Out_->Write(&value, sizeof(value)); } void WriteHistogram(const IHistogramSnapshot& histogram) { ui32 count = histogram.Count(); WriteVarUInt32(Out_, count); for (ui32 i = 0; i < count; i++) { double bound = histogram.UpperBound(i); Out_->Write(&bound, sizeof(bound)); } for (ui32 i = 0; i < count; i++) { ui64 value = histogram.Value(i); Out_->Write(&value, sizeof(value)); } } void WriteLogHistogram(const TLogHistogramSnapshot& logHist) { WriteFixed(logHist.Base()); WriteFixed(logHist.ZerosCount()); WriteVarUInt32(Out_, static_cast(logHist.StartPower())); WriteVarUInt32(Out_, logHist.Count()); for (ui32 i = 0; i < logHist.Count(); ++i) { WriteFixed(logHist.Bucket(i)); } } void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) { WriteFixed(summary.GetCount()); WriteFixed(summary.GetSum()); WriteFixed(summary.GetMin()); WriteFixed(summary.GetMax()); WriteFixed(summary.GetLast()); } private: IOutputStream* Out_; ETimePrecision TimePrecision_; ECompression Compression_; ESpackV1Version Version_; const TPooledStr* MetricName_; bool Closed_ = false; }; } IMetricEncoderPtr EncoderSpackV1( IOutputStream* out, ETimePrecision timePrecision, ECompression compression, EMetricsMergingMode mergingMode ) { return MakeHolder(out, timePrecision, compression, mergingMode, SV1_01, ""); } IMetricEncoderPtr EncoderSpackV12( IOutputStream* out, ETimePrecision timePrecision, ECompression compression, EMetricsMergingMode mergingMode, TStringBuf metricNameLabel ) { Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty"); return MakeHolder(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel); } }