unistat_decoder.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #include "unistat.h"
  2. #include <library/cpp/monlib/metrics/histogram_collector.h>
  3. #include <library/cpp/monlib/metrics/labels.h>
  4. #include <library/cpp/monlib/metrics/metric_type.h>
  5. #include <library/cpp/monlib/metrics/metric_value.h>
  6. #include <library/cpp/monlib/metrics/metric_consumer.h>
  7. #include <library/cpp/json/json_reader.h>
  8. #include <util/datetime/base.h>
  9. #include <util/string/split.h>
  10. #include <contrib/libs/re2/re2/re2.h>
  11. using namespace NJson;
  12. const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[advehmntx][vehmntx]{3}|summ|hgram|max))"};
  13. namespace NMonitoring {
  14. namespace {
  15. bool IsNumber(const NJson::TJsonValue& j) {
  16. switch (j.GetType()) {
  17. case EJsonValueType::JSON_INTEGER:
  18. case EJsonValueType::JSON_UINTEGER:
  19. case EJsonValueType::JSON_DOUBLE:
  20. return true;
  21. default:
  22. return false;
  23. }
  24. }
  25. template <typename T>
  26. T ExtractNumber(const TJsonValue& val) {
  27. switch (val.GetType()) {
  28. case EJsonValueType::JSON_INTEGER:
  29. return static_cast<T>(val.GetInteger());
  30. case EJsonValueType::JSON_UINTEGER:
  31. return static_cast<T>(val.GetUInteger());
  32. case EJsonValueType::JSON_DOUBLE:
  33. return static_cast<T>(val.GetDouble());
  34. default:
  35. ythrow yexception() << "Expected number, but found " << val.GetType();
  36. }
  37. }
  38. auto ExtractDouble = ExtractNumber<double>;
  39. auto ExtractUi64 = ExtractNumber<ui64>;
  40. class THistogramBuilder {
  41. public:
  42. void Add(TBucketBound bound, TBucketValue value) {
  43. /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones,
  44. /// so (-inf; 0) [0, 100) [100; +inf)
  45. /// becomes (-inf; 0] (0, 100] (100; +inf)
  46. /// but since we've already lost some information these no way to avoid this kind of error here
  47. Bounds_.push_back(bound);
  48. /// this will always be 0 for the first bucket,
  49. /// since there's no way to make (-inf; N) bucket in yasm
  50. Values_.push_back(NextValue_);
  51. /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]]
  52. /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50
  53. NextValue_ = value;
  54. }
  55. IHistogramSnapshotPtr Finalize() {
  56. Bounds_.push_back(std::numeric_limits<TBucketBound>::max());
  57. Values_.push_back(NextValue_);
  58. return ExplicitHistogramSnapshot(Bounds_, Values_, true);
  59. }
  60. public:
  61. TBucketValue NextValue_ {0};
  62. TBucketBounds Bounds_;
  63. TBucketValues Values_;
  64. };
  65. class LogHistogramBuilder {
  66. public:
  67. void Add(double value) {
  68. while (Values_.size() < HISTOGRAM_MAX_BUCKETS_COUNT && value > MaxValueWithBickets(Values_.size())) {
  69. Values_.push_back(0);
  70. }
  71. ui32 index = 0;
  72. if (value > MaxValueWithBickets(HISTOGRAM_MAX_BUCKETS_COUNT)) {
  73. index = Values_.size() - 1;
  74. } else if (value > MIN_VALUE) {
  75. double logBase = std::log(value) / std::log(BASE);
  76. index = static_cast<ui32>(std::ceil(logBase));
  77. }
  78. ++Values_[index];
  79. }
  80. IHistogramSnapshotPtr Finalize() && {
  81. return new TExponentialHistogramSnapshot(BASE, MIN_VALUE, std::move(Values_));
  82. }
  83. private:
  84. static constexpr double BASE = 2;
  85. static constexpr double MIN_VALUE = 1;
  86. static double MaxValueWithBickets(ui64 buckets) {
  87. return std::pow(BASE, buckets - 2);
  88. }
  89. private:
  90. TBucketValues Values_ = TBucketValues(2, 0);
  91. };
  92. class TDecoderUnistat {
  93. private:
  94. public:
  95. explicit TDecoderUnistat(
  96. IMetricConsumer* consumer,
  97. IInputStream* is,
  98. TStringBuf metricNameLabel,
  99. TStringBuf metricNamePrefix,
  100. TInstant ts)
  101. : Consumer_{consumer},
  102. MetricNameLabel_(metricNameLabel),
  103. MetricNamePrefix_(metricNamePrefix),
  104. Timestamp_{ts} {
  105. ReadJsonTree(is, &Json_, /* throw */ true);
  106. }
  107. void Decode() {
  108. Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType());
  109. for (auto&& metric : Json_.GetArray()) {
  110. Y_ENSURE(metric.IsArray(), "Metric must be an array");
  111. auto&& arr = metric.GetArray();
  112. Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements");
  113. auto&& name = arr[0];
  114. auto&& value = arr[1];
  115. MetricContext_ = {};
  116. ParseName(name.GetString());
  117. if (value.IsArray()) {
  118. const auto& array = value.GetArray();
  119. if (!array.empty() && IsNumber(array[0])) {
  120. OnLogHistogram(value);
  121. } else {
  122. OnHistogram(value);
  123. }
  124. } else if (IsNumber(value)) {
  125. if (MetricContext_.Name.EndsWith("hhh") && !MetricContext_.IsDeriv) {
  126. OnLogHistogram(value);
  127. } else {
  128. OnScalar(value);
  129. }
  130. } else {
  131. ythrow yexception() << "Expected list or number, but found " << value.GetType();
  132. }
  133. WriteValue();
  134. }
  135. }
  136. private:
  137. void OnScalar(const TJsonValue& jsonValue) {
  138. if (MetricContext_.IsDeriv) {
  139. MetricContext_.Type = EMetricType::RATE;
  140. MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)};
  141. } else {
  142. MetricContext_.Type = EMetricType::GAUGE;
  143. MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)};
  144. }
  145. }
  146. void OnLogHistogram(const TJsonValue& value) {
  147. Y_ENSURE(MetricContext_.Name.EndsWith("hhh") && !MetricContext_.IsDeriv,
  148. "Values list is supported only for histogram metrics");
  149. MetricContext_.Type = EMetricType::HIST;
  150. LogHistogramBuilder histogramBuilder;
  151. if (IsNumber(value)) {
  152. histogramBuilder.Add(value.GetDouble());
  153. } else {
  154. for (auto&& item: value.GetArray()) {
  155. Y_ENSURE(IsNumber(item), "Expected a number, but found " << item.GetType());
  156. histogramBuilder.Add(item.GetDouble());
  157. }
  158. }
  159. MetricContext_.Histogram = std::move(histogramBuilder).Finalize();
  160. MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
  161. }
  162. void OnHistogram(const TJsonValue& jsonHist) {
  163. if (MetricContext_.IsDeriv) {
  164. MetricContext_.Type = EMetricType::HIST_RATE;
  165. } else {
  166. MetricContext_.Type = EMetricType::HIST;
  167. }
  168. auto histogramBuilder = THistogramBuilder();
  169. for (auto&& bucket : jsonHist.GetArray()) {
  170. Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType());
  171. auto&& arr = bucket.GetArray();
  172. Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements");
  173. const auto bound = ExtractDouble(arr[0]);
  174. const auto weight = ExtractUi64(arr[1]);
  175. histogramBuilder.Add(bound, weight);
  176. }
  177. MetricContext_.Histogram = histogramBuilder.Finalize();
  178. MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
  179. }
  180. bool IsDeriv(TStringBuf name) {
  181. TStringBuf ignore, suffix;
  182. name.RSplit('_', ignore, suffix);
  183. Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix);
  184. if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) {
  185. return true;
  186. } else if (suffix == TStringBuf("max")) {
  187. return false;
  188. }
  189. return suffix[0] == 'd';
  190. }
  191. void ParseName(TStringBuf value) {
  192. TVector<TStringBuf> parts;
  193. StringSplitter(value).Split(';').SkipEmpty().Collect(&parts);
  194. Y_ENSURE(parts.size() >= 1 && parts.size() <= 16);
  195. TStringBuf name = parts.back();
  196. parts.pop_back();
  197. Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE),
  198. "Metric name " << name << " doesn't match regex " << NAME_RE.pattern());
  199. MetricContext_.Name = name;
  200. MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name);
  201. for (auto tag : parts) {
  202. TStringBuf n, v;
  203. tag.Split('=', n, v);
  204. Y_ENSURE(n && v, "Unexpected tag format in " << tag);
  205. MetricContext_.Labels.Add(n, v);
  206. }
  207. }
  208. private:
  209. void WriteValue() {
  210. Consumer_->OnMetricBegin(MetricContext_.Type);
  211. Consumer_->OnLabelsBegin();
  212. Consumer_->OnLabel(MetricNameLabel_, TStringBuilder{} << MetricNamePrefix_ << MetricContext_.Name);
  213. for (auto&& l : MetricContext_.Labels) {
  214. Consumer_->OnLabel(l.Name(), l.Value());
  215. }
  216. Consumer_->OnLabelsEnd();
  217. switch (MetricContext_.Type) {
  218. case EMetricType::GAUGE:
  219. Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble());
  220. break;
  221. case EMetricType::RATE:
  222. Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64());
  223. break;
  224. case EMetricType::HIST:
  225. case EMetricType::HIST_RATE:
  226. Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram());
  227. break;
  228. case EMetricType::LOGHIST:
  229. case EMetricType::DSUMMARY:
  230. case EMetricType::IGAUGE:
  231. case EMetricType::COUNTER:
  232. case EMetricType::UNKNOWN:
  233. ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type;
  234. }
  235. Consumer_->OnMetricEnd();
  236. }
  237. private:
  238. IMetricConsumer* Consumer_;
  239. NJson::TJsonValue Json_;
  240. TStringBuf MetricNameLabel_;
  241. TStringBuf MetricNamePrefix_;
  242. TInstant Timestamp_;
  243. struct {
  244. TStringBuf Name;
  245. EMetricType Type{EMetricType::UNKNOWN};
  246. TMetricValue Value;
  247. bool IsDeriv{false};
  248. TLabels Labels;
  249. IHistogramSnapshotPtr Histogram;
  250. } MetricContext_;
  251. };
  252. }
  253. void DecodeUnistat(
  254. TStringBuf data,
  255. IMetricConsumer* c,
  256. TStringBuf metricNameLabel,
  257. TStringBuf metricNamePrefix,
  258. TInstant ts)
  259. {
  260. c->OnStreamBegin();
  261. DecodeUnistatToStream(data, c, metricNameLabel, metricNamePrefix, ts);
  262. c->OnStreamEnd();
  263. }
  264. void DecodeUnistatToStream(
  265. TStringBuf data,
  266. IMetricConsumer* c,
  267. TStringBuf metricNameLabel,
  268. TStringBuf metricNamePrefix,
  269. TInstant ts)
  270. {
  271. TMemoryInput in{data.data(), data.size()};
  272. TDecoderUnistat decoder(c, &in, metricNameLabel, metricNamePrefix, ts);
  273. decoder.Decode();
  274. }
  275. }