123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- #include "spack_v1.h"
- #include "compression.h"
- #include "varint.h"
- #include <library/cpp/monlib/encode/buffered/buffered_encoder_base.h>
- #include <util/generic/cast.h>
- #include <util/datetime/base.h>
- #include <util/string/builder.h>
- #ifndef _little_endian_
- #error Unsupported platform
- #endif
- namespace NMonitoring {
- namespace {
- ///////////////////////////////////////////////////////////////////////
- // TEncoderSpackV1
- ///////////////////////////////////////////////////////////////////////
- class TEncoderSpackV1 final: public TBufferedEncoderBase {
- public:
- TEncoderSpackV1(
- IOutputStream* out,
- ETimePrecision timePrecision,
- ECompression compression,
- EMetricsMergingMode mergingMode,
- ESpackV1Version version,
- TStringBuf metricNameLabel
- )
- : Out_(out)
- , TimePrecision_(timePrecision)
- , Compression_(compression)
- , Version_(version)
- , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
- {
- MetricsMergingMode_ = mergingMode;
- LabelNamesPool_.SetSorted(true);
- LabelValuesPool_.SetSorted(true);
- }
- ~TEncoderSpackV1() override {
- Close();
- }
- private:
- void OnDouble(TInstant time, double value) override {
- TBufferedEncoderBase::OnDouble(time, value);
- }
- void OnInt64(TInstant time, i64 value) override {
- TBufferedEncoderBase::OnInt64(time, value);
- }
- void OnUint64(TInstant time, ui64 value) override {
- TBufferedEncoderBase::OnUint64(time, value);
- }
- void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
- TBufferedEncoderBase::OnHistogram(time, snapshot);
- }
- void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
- TBufferedEncoderBase::OnSummaryDouble(time, snapshot);
- }
- void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
- TBufferedEncoderBase::OnLogHistogram(time, snapshot);
- }
- void Close() override {
- if (Closed_) {
- return;
- }
- Closed_ = true;
- LabelNamesPool_.Build();
- LabelValuesPool_.Build();
- // Sort all points uniquely by ts -- the size can decrease
- ui64 pointsCount = 0;
- for (TMetric& metric : Metrics_) {
- if (metric.TimeSeries.Size() > 1) {
- metric.TimeSeries.SortByTs();
- }
- pointsCount += metric.TimeSeries.Size();
- }
- // (1) write header
- TSpackHeader header;
- header.Version = Version_;
- header.TimePrecision = EncodeTimePrecision(TimePrecision_);
- header.Compression = EncodeCompression(Compression_);
- header.LabelNamesSize = static_cast<ui32>(
- LabelNamesPool_.BytesSize() + LabelNamesPool_.Count());
- header.LabelValuesSize = static_cast<ui32>(
- LabelValuesPool_.BytesSize() + LabelValuesPool_.Count());
- header.MetricCount = Metrics_.size();
- header.PointsCount = pointsCount;
- Out_->Write(&header, sizeof(header));
- // if compression enabled all below writes must go throught compressor
- auto compressedOut = CompressedOutput(Out_, Compression_);
- if (compressedOut) {
- Out_ = compressedOut.Get();
- }
- // (2) write string pools
- auto strPoolWrite = [this](TStringBuf str, ui32, ui32) {
- Out_->Write(str);
- Out_->Write('\0');
- };
- LabelNamesPool_.ForEach(strPoolWrite);
- LabelValuesPool_.ForEach(strPoolWrite);
- // (3) write common time
- WriteTime(CommonTime_);
- // (4) write common labels' indexes
- WriteLabels(CommonLabels_, nullptr);
- // (5) write metrics
- // metrics count already written in header
- for (TMetric& metric : Metrics_) {
- // (5.1) types byte
- ui8 typesByte = PackTypes(metric);
- Out_->Write(&typesByte, sizeof(typesByte));
- // TODO: implement
- ui8 flagsByte = 0x00;
- Out_->Write(&flagsByte, sizeof(flagsByte));
- // v1.2 format addition — metric name
- if (Version_ >= SV1_02) {
- const auto it = FindIf(metric.Labels, [&](const auto& l) {
- return l.Key == MetricName_;
- });
- Y_ENSURE(it != metric.Labels.end(),
- "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
- << "all metric labels '" << FormatLabels(metric.Labels) << "'");
- WriteVarUInt32(Out_, it->Value->Index);
- }
- // (5.2) labels
- WriteLabels(metric.Labels, MetricName_);
- // (5.3) values
- switch (metric.TimeSeries.Size()) {
- case 0:
- break;
- case 1: {
- const auto& point = metric.TimeSeries[0];
- if (point.GetTime() != TInstant::Zero()) {
- WriteTime(point.GetTime());
- }
- EMetricValueType valueType = metric.TimeSeries.GetValueType();
- WriteValue(metric.MetricType, valueType, point.GetValue());
- break;
- }
- default:
- WriteVarUInt32(Out_, static_cast<ui32>(metric.TimeSeries.Size()));
- const TMetricTimeSeries& ts = metric.TimeSeries;
- EMetricType metricType = metric.MetricType;
- ts.ForEach([this, metricType](TInstant time, EMetricValueType valueType, TMetricValue value) {
- // workaround for GCC bug
- // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636
- this->WriteTime(time);
- this->WriteValue(metricType, valueType, value);
- });
- break;
- }
- }
- }
- // store metric type and values type in one byte
- ui8 PackTypes(const TMetric& metric) {
- EValueType valueType;
- if (metric.TimeSeries.Empty()) {
- valueType = EValueType::NONE;
- } else if (metric.TimeSeries.Size() == 1) {
- TInstant time = metric.TimeSeries[0].GetTime();
- valueType = (time == TInstant::Zero())
- ? EValueType::ONE_WITHOUT_TS
- : EValueType::ONE_WITH_TS;
- } else {
- valueType = EValueType::MANY_WITH_TS;
- }
- return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType);
- }
- void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
- WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
- for (auto&& label : labels) {
- if (label.Key == skipKey) {
- continue;
- }
- WriteVarUInt32(Out_, label.Key->Index);
- WriteVarUInt32(Out_, label.Value->Index);
- }
- }
- void WriteValue(EMetricType metricType, EMetricValueType valueType, TMetricValue value) {
- switch (metricType) {
- case EMetricType::GAUGE:
- WriteFixed(value.AsDouble(valueType));
- break;
- case EMetricType::IGAUGE:
- WriteFixed(value.AsInt64(valueType));
- break;
- case EMetricType::COUNTER:
- case EMetricType::RATE:
- WriteFixed(value.AsUint64(valueType));
- break;
- case EMetricType::HIST:
- case EMetricType::HIST_RATE:
- WriteHistogram(*value.AsHistogram());
- break;
- case EMetricType::DSUMMARY:
- WriteSummaryDouble(*value.AsSummaryDouble());
- break;
- case EMetricType::LOGHIST:
- WriteLogHistogram(*value.AsLogHistogram());
- break;
- default:
- ythrow yexception() << "unsupported metric type: " << metricType;
- }
- }
- void WriteTime(TInstant instant) {
- switch (TimePrecision_) {
- case ETimePrecision::SECONDS: {
- ui32 time = static_cast<ui32>(instant.Seconds());
- Out_->Write(&time, sizeof(time));
- break;
- }
- case ETimePrecision::MILLIS: {
- ui64 time = static_cast<ui64>(instant.MilliSeconds());
- Out_->Write(&time, sizeof(time));
- }
- }
- }
- template <typename T>
- void WriteFixed(T value) {
- Out_->Write(&value, sizeof(value));
- }
- void WriteHistogram(const IHistogramSnapshot& histogram) {
- ui32 count = histogram.Count();
- WriteVarUInt32(Out_, count);
- for (ui32 i = 0; i < count; i++) {
- double bound = histogram.UpperBound(i);
- Out_->Write(&bound, sizeof(bound));
- }
- for (ui32 i = 0; i < count; i++) {
- ui64 value = histogram.Value(i);
- Out_->Write(&value, sizeof(value));
- }
- }
- void WriteLogHistogram(const TLogHistogramSnapshot& logHist) {
- WriteFixed(logHist.Base());
- WriteFixed(logHist.ZerosCount());
- WriteVarUInt32(Out_, static_cast<ui32>(logHist.StartPower()));
- WriteVarUInt32(Out_, logHist.Count());
- for (ui32 i = 0; i < logHist.Count(); ++i) {
- WriteFixed(logHist.Bucket(i));
- }
- }
- void WriteSummaryDouble(const ISummaryDoubleSnapshot& summary) {
- WriteFixed(summary.GetCount());
- WriteFixed(summary.GetSum());
- WriteFixed(summary.GetMin());
- WriteFixed(summary.GetMax());
- WriteFixed(summary.GetLast());
- }
- private:
- IOutputStream* Out_;
- ETimePrecision TimePrecision_;
- ECompression Compression_;
- ESpackV1Version Version_;
- const TPooledStr* MetricName_;
- bool Closed_ = false;
- };
- }
- IMetricEncoderPtr EncoderSpackV1(
- IOutputStream* out,
- ETimePrecision timePrecision,
- ECompression compression,
- EMetricsMergingMode mergingMode
- ) {
- return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
- }
- IMetricEncoderPtr EncoderSpackV12(
- IOutputStream* out,
- ETimePrecision timePrecision,
- ECompression compression,
- EMetricsMergingMode mergingMode,
- TStringBuf metricNameLabel
- ) {
- Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
- return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
- }
- }
|