unistat_decoder.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. #include "unistat.h"
  2. #include <library/cpp/monlib/metrics/histogram_collector.h>
  3. #include <library/cpp/monlib/metrics/labels.h>
  4. #include <library/cpp/monlib/metrics/metric_type.h>
  5. #include <library/cpp/monlib/metrics/metric_value.h>
  6. #include <library/cpp/monlib/metrics/metric_consumer.h>
  7. #include <library/cpp/json/json_reader.h>
  8. #include <util/datetime/base.h>
  9. #include <util/string/split.h>
  10. #include <contrib/libs/re2/re2/re2.h>
  11. using namespace NJson;
  12. const re2::RE2 NAME_RE{R"((?:[a-zA-Z0-9\.\-/@_]+_)+(?:[ad][vehmntx]{3}|summ|hgram|max))"};
  13. namespace NMonitoring {
  14. namespace {
  15. bool IsNumber(const NJson::TJsonValue& j) {
  16. switch (j.GetType()) {
  17. case EJsonValueType::JSON_INTEGER:
  18. case EJsonValueType::JSON_UINTEGER:
  19. case EJsonValueType::JSON_DOUBLE:
  20. return true;
  21. default:
  22. return false;
  23. }
  24. }
  25. template <typename T>
  26. T ExtractNumber(const TJsonValue& val) {
  27. switch (val.GetType()) {
  28. case EJsonValueType::JSON_INTEGER:
  29. return static_cast<T>(val.GetInteger());
  30. case EJsonValueType::JSON_UINTEGER:
  31. return static_cast<T>(val.GetUInteger());
  32. case EJsonValueType::JSON_DOUBLE:
  33. return static_cast<T>(val.GetDouble());
  34. default:
  35. ythrow yexception() << "Expected number, but found " << val.GetType();
  36. }
  37. }
  38. auto ExtractDouble = ExtractNumber<double>;
  39. auto ExtractUi64 = ExtractNumber<ui64>;
  40. class THistogramBuilder {
  41. public:
  42. void Add(TBucketBound bound, TBucketValue value) {
  43. /// XXX: yasm uses left-closed intervals, while in monlib we use right-closed ones,
  44. /// so (-inf; 0) [0, 100) [100; +inf)
  45. /// becomes (-inf; 0] (0, 100] (100; +inf)
  46. /// but since we've already lost some information these no way to avoid this kind of error here
  47. Bounds_.push_back(bound);
  48. /// this will always be 0 for the first bucket,
  49. /// since there's no way to make (-inf; N) bucket in yasm
  50. Values_.push_back(NextValue_);
  51. /// we will write this value into the next bucket so that [[0, 10], [100, 20], [200, 50]]
  52. /// becomes (-inf; 0] -> 0; (0; 100] -> 10; (100; 200] -> 20; (200; +inf) -> 50
  53. NextValue_ = value;
  54. }
  55. IHistogramSnapshotPtr Finalize() {
  56. Bounds_.push_back(std::numeric_limits<TBucketBound>::max());
  57. Values_.push_back(NextValue_);
  58. return ExplicitHistogramSnapshot(Bounds_, Values_, true);
  59. }
  60. public:
  61. TBucketValue NextValue_ {0};
  62. TBucketBounds Bounds_;
  63. TBucketValues Values_;
  64. };
  65. class LogHistogramBuilder {
  66. public:
  67. void Add(double value) {
  68. while (Values_.size() < HISTOGRAM_MAX_BUCKETS_COUNT && value > MaxValueWithBickets(Values_.size())) {
  69. Values_.push_back(0);
  70. }
  71. ui32 index = 0;
  72. if (value > MaxValueWithBickets(HISTOGRAM_MAX_BUCKETS_COUNT)) {
  73. index = Values_.size() - 1;
  74. } else if (value > MIN_VALUE) {
  75. double logBase = std::log(value) / std::log(BASE);
  76. index = static_cast<ui32>(std::ceil(logBase));
  77. }
  78. ++Values_[index];
  79. }
  80. IHistogramSnapshotPtr Finalize() && {
  81. return new TExponentialHistogramSnapshot(BASE, MIN_VALUE, std::move(Values_));
  82. }
  83. private:
  84. static constexpr double BASE = 2;
  85. static constexpr double MIN_VALUE = 1;
  86. static double MaxValueWithBickets(ui64 buckets) {
  87. return std::pow(BASE, buckets - 2);
  88. }
  89. private:
  90. TBucketValues Values_ = TBucketValues(2, 0);
  91. };
  92. class TDecoderUnistat {
  93. private:
  94. public:
  95. explicit TDecoderUnistat(IMetricConsumer* consumer, IInputStream* is, TStringBuf metricNameLabel, TInstant ts)
  96. : Consumer_{consumer},
  97. MetricNameLabel(metricNameLabel),
  98. Timestamp_{ts} {
  99. ReadJsonTree(is, &Json_, /* throw */ true);
  100. }
  101. void Decode() {
  102. Y_ENSURE(Json_.IsArray(), "Expected array at the top level, but found " << Json_.GetType());
  103. for (auto&& metric : Json_.GetArray()) {
  104. Y_ENSURE(metric.IsArray(), "Metric must be an array");
  105. auto&& arr = metric.GetArray();
  106. Y_ENSURE(arr.size() == 2, "Metric must be an array of 2 elements");
  107. auto&& name = arr[0];
  108. auto&& value = arr[1];
  109. MetricContext_ = {};
  110. ParseName(name.GetString());
  111. if (value.IsArray()) {
  112. const auto& array = value.GetArray();
  113. if (!array.empty() && IsNumber(array[0])) {
  114. OnLogHistogram(value);
  115. } else {
  116. OnHistogram(value);
  117. }
  118. } else if (IsNumber(value)) {
  119. if (MetricContext_.Name.EndsWith("_ahhh")) {
  120. OnLogHistogram(value);
  121. } else {
  122. OnScalar(value);
  123. }
  124. } else {
  125. ythrow yexception() << "Expected list or number, but found " << value.GetType();
  126. }
  127. WriteValue();
  128. }
  129. }
  130. private:
  131. void OnScalar(const TJsonValue& jsonValue) {
  132. if (MetricContext_.IsDeriv) {
  133. MetricContext_.Type = EMetricType::RATE;
  134. MetricContext_.Value = TMetricValue{ExtractUi64(jsonValue)};
  135. } else {
  136. MetricContext_.Type = EMetricType::GAUGE;
  137. MetricContext_.Value = TMetricValue{ExtractDouble(jsonValue)};
  138. }
  139. }
  140. void OnLogHistogram(const TJsonValue& value) {
  141. Y_ENSURE(MetricContext_.Name.EndsWith("_ahhh"), "Values list is supported only for _ahhh metrics");
  142. MetricContext_.Type = EMetricType::HIST;
  143. LogHistogramBuilder histogramBuilder;
  144. if (IsNumber(value)) {
  145. histogramBuilder.Add(value.GetDouble());
  146. } else {
  147. for (auto&& item: value.GetArray()) {
  148. Y_ENSURE(IsNumber(item), "Expected a number, but found " << item.GetType());
  149. histogramBuilder.Add(item.GetDouble());
  150. }
  151. }
  152. MetricContext_.Histogram = std::move(histogramBuilder).Finalize();
  153. MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
  154. }
  155. void OnHistogram(const TJsonValue& jsonHist) {
  156. if (MetricContext_.IsDeriv) {
  157. MetricContext_.Type = EMetricType::HIST_RATE;
  158. } else {
  159. MetricContext_.Type = EMetricType::HIST;
  160. }
  161. auto histogramBuilder = THistogramBuilder();
  162. for (auto&& bucket : jsonHist.GetArray()) {
  163. Y_ENSURE(bucket.IsArray(), "Expected an array, but found " << bucket.GetType());
  164. auto&& arr = bucket.GetArray();
  165. Y_ENSURE(arr.size() == 2, "Histogram bucket must be an array of 2 elements");
  166. const auto bound = ExtractDouble(arr[0]);
  167. const auto weight = ExtractUi64(arr[1]);
  168. histogramBuilder.Add(bound, weight);
  169. }
  170. MetricContext_.Histogram = histogramBuilder.Finalize();
  171. MetricContext_.Value = TMetricValue{MetricContext_.Histogram.Get()};
  172. }
  173. bool IsDeriv(TStringBuf name) {
  174. TStringBuf ignore, suffix;
  175. name.RSplit('_', ignore, suffix);
  176. Y_ENSURE(suffix.size() >= 3 && suffix.size() <= 5, "Disallowed suffix value: " << suffix);
  177. if (suffix == TStringBuf("summ") || suffix == TStringBuf("hgram")) {
  178. return true;
  179. } else if (suffix == TStringBuf("max")) {
  180. return false;
  181. }
  182. return suffix[0] == 'd';
  183. }
  184. void ParseName(TStringBuf value) {
  185. TVector<TStringBuf> parts;
  186. StringSplitter(value).Split(';').SkipEmpty().Collect(&parts);
  187. Y_ENSURE(parts.size() >= 1 && parts.size() <= 16);
  188. TStringBuf name = parts.back();
  189. parts.pop_back();
  190. Y_ENSURE(RE2::FullMatch(re2::StringPiece{name.data(), name.size()}, NAME_RE),
  191. "Metric name " << name << " doesn't match regex " << NAME_RE.pattern());
  192. MetricContext_.Name = name;
  193. MetricContext_.IsDeriv = IsDeriv(MetricContext_.Name);
  194. for (auto tag : parts) {
  195. TStringBuf n, v;
  196. tag.Split('=', n, v);
  197. Y_ENSURE(n && v, "Unexpected tag format in " << tag);
  198. MetricContext_.Labels.Add(n, v);
  199. }
  200. }
  201. private:
  202. void WriteValue() {
  203. Consumer_->OnMetricBegin(MetricContext_.Type);
  204. Consumer_->OnLabelsBegin();
  205. Consumer_->OnLabel(MetricNameLabel, TString{MetricContext_.Name});
  206. for (auto&& l : MetricContext_.Labels) {
  207. Consumer_->OnLabel(l.Name(), l.Value());
  208. }
  209. Consumer_->OnLabelsEnd();
  210. switch (MetricContext_.Type) {
  211. case EMetricType::GAUGE:
  212. Consumer_->OnDouble(Timestamp_, MetricContext_.Value.AsDouble());
  213. break;
  214. case EMetricType::RATE:
  215. Consumer_->OnUint64(Timestamp_, MetricContext_.Value.AsUint64());
  216. break;
  217. case EMetricType::HIST:
  218. case EMetricType::HIST_RATE:
  219. Consumer_->OnHistogram(Timestamp_, MetricContext_.Value.AsHistogram());
  220. break;
  221. case EMetricType::LOGHIST:
  222. case EMetricType::DSUMMARY:
  223. case EMetricType::IGAUGE:
  224. case EMetricType::COUNTER:
  225. case EMetricType::UNKNOWN:
  226. ythrow yexception() << "Unexpected metric type: " << MetricContext_.Type;
  227. }
  228. Consumer_->OnMetricEnd();
  229. }
  230. private:
  231. IMetricConsumer* Consumer_;
  232. NJson::TJsonValue Json_;
  233. TStringBuf MetricNameLabel;
  234. TInstant Timestamp_;
  235. struct {
  236. TStringBuf Name;
  237. EMetricType Type{EMetricType::UNKNOWN};
  238. TMetricValue Value;
  239. bool IsDeriv{false};
  240. TLabels Labels;
  241. IHistogramSnapshotPtr Histogram;
  242. } MetricContext_;
  243. };
  244. }
  245. void DecodeUnistat(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
  246. c->OnStreamBegin();
  247. DecodeUnistatToStream(data, c, metricNameLabel, ts);
  248. c->OnStreamEnd();
  249. }
  250. void DecodeUnistatToStream(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel, TInstant ts) {
  251. TMemoryInput in{data.data(), data.size()};
  252. TDecoderUnistat decoder(c, &in, metricNameLabel, ts);
  253. decoder.Decode();
  254. }
  255. }