json_decoder.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161
  1. #include "json.h"
  2. #include "typed_point.h"
  3. #include <library/cpp/monlib/exception/exception.h>
  4. #include <library/cpp/monlib/metrics/labels.h>
  5. #include <library/cpp/monlib/metrics/metric_value.h>
  6. #include <library/cpp/json/json_reader.h>
  7. #include <util/datetime/base.h>
  8. #include <util/string/cast.h>
  9. #include <limits>
  10. namespace NMonitoring {
  11. #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__)
  12. namespace {
  13. ///////////////////////////////////////////////////////////////////////
  14. // THistogramBuilder
  15. ///////////////////////////////////////////////////////////////////////
  16. class THistogramBuilder {
  17. public:
  18. void AddBound(TBucketBound bound) {
  19. if (!Bounds_.empty()) {
  20. DECODE_ENSURE(Bounds_.back() < bound,
  21. "non sorted bounds, " << Bounds_.back() <<
  22. " >= " << bound);
  23. }
  24. Bounds_.push_back(bound);
  25. }
  26. void AddValue(TBucketValue value) {
  27. Values_.push_back(value);
  28. }
  29. void AddInf(TBucketValue value) {
  30. InfPresented_ = true;
  31. InfValue_ = value;
  32. }
  33. IHistogramSnapshotPtr Build() {
  34. if (InfPresented_) {
  35. Bounds_.push_back(Max<TBucketBound>());
  36. Values_.push_back(InfValue_);
  37. }
  38. auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_, true);
  39. Bounds_.clear();
  40. Values_.clear();
  41. InfPresented_ = false;
  42. return snapshot;
  43. }
  44. bool Empty() const noexcept {
  45. return Bounds_.empty() && Values_.empty();
  46. }
  47. void Clear() {
  48. Bounds_.clear();
  49. Values_.clear();
  50. }
  51. private:
  52. TBucketBounds Bounds_;
  53. TBucketValues Values_;
  54. bool InfPresented_ = false;
  55. TBucketValue InfValue_;
  56. };
  57. class TSummaryDoubleBuilder {
  58. public:
  59. ISummaryDoubleSnapshotPtr Build() const {
  60. return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_);
  61. }
  62. void SetSum(double sum) {
  63. Empty_ = false;
  64. Sum_ = sum;
  65. }
  66. void SetMin(double min) {
  67. Empty_ = false;
  68. Min_ = min;
  69. }
  70. void SetMax(double max) {
  71. Empty_ = false;
  72. Max_ = max;
  73. }
  74. void SetLast(double last) {
  75. Empty_ = false;
  76. Last_ = last;
  77. }
  78. void SetCount(ui64 count) {
  79. Empty_ = false;
  80. Count_ = count;
  81. }
  82. void Clear() {
  83. Empty_ = true;
  84. Sum_ = 0;
  85. Min_ = 0;
  86. Max_ = 0;
  87. Last_ = 0;
  88. Count_ = 0;
  89. }
  90. bool Empty() const {
  91. return Empty_;
  92. }
  93. private:
  94. double Sum_ = 0;
  95. double Min_ = 0;
  96. double Max_ = 0;
  97. double Last_ = 0;
  98. ui64 Count_ = 0;
  99. bool Empty_ = true;
  100. };
  101. class TLogHistogramBuilder {
  102. public:
  103. void SetBase(double base) {
  104. DECODE_ENSURE(base > 0, "base must be positive");
  105. Base_ = base;
  106. }
  107. void SetZerosCount(ui64 zerosCount) {
  108. ZerosCount_ = zerosCount;
  109. }
  110. void SetStartPower(int startPower) {
  111. StartPower_ = startPower;
  112. }
  113. void AddBucketValue(double value) {
  114. DECODE_ENSURE(value > 0.0, "bucket values must be positive");
  115. DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite");
  116. Buckets_.push_back(value);
  117. }
  118. void Clear() {
  119. Buckets_.clear();
  120. Base_ = 1.5;
  121. ZerosCount_ = 0;
  122. StartPower_ = 0;
  123. }
  124. bool Empty() const {
  125. return Buckets_.empty() && ZerosCount_ == 0;
  126. }
  127. TLogHistogramSnapshotPtr Build() {
  128. return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_));
  129. }
  130. private:
  131. double Base_ = 1.5;
  132. ui64 ZerosCount_ = 0;
  133. int StartPower_ = 0;
  134. TVector<double> Buckets_;
  135. };
  136. std::pair<double, bool> ParseSpecDouble(TStringBuf string) {
  137. if (string == TStringBuf("nan") || string == TStringBuf("NaN")) {
  138. return {std::numeric_limits<double>::quiet_NaN(), true};
  139. } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) {
  140. return {std::numeric_limits<double>::infinity(), true};
  141. } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) {
  142. return {-std::numeric_limits<double>::infinity(), true};
  143. } else {
  144. return {0, false};
  145. }
  146. }
  147. ///////////////////////////////////////////////////////////////////////
  148. // TMetricCollector
  149. ///////////////////////////////////////////////////////////////////////
  150. struct TMetricCollector {
  151. EMetricType Type = EMetricType::UNKNOWN;
  152. TLabels Labels;
  153. THistogramBuilder HistogramBuilder;
  154. TSummaryDoubleBuilder SummaryBuilder;
  155. TLogHistogramBuilder LogHistBuilder;
  156. TTypedPoint LastPoint;
  157. TVector<TTypedPoint> TimeSeries;
  158. bool SeenTsOrValue = false;
  159. bool SeenTimeseries = false;
  160. void Clear() {
  161. Type = EMetricType::UNKNOWN;
  162. Labels.Clear();
  163. SeenTsOrValue = false;
  164. SeenTimeseries = false;
  165. TimeSeries.clear();
  166. LastPoint = {};
  167. HistogramBuilder.Clear();
  168. SummaryBuilder.Clear();
  169. LogHistBuilder.Clear();
  170. }
  171. void AddLabel(const TLabel& label) {
  172. Labels.Add(label.Name(), label.Value());
  173. }
  174. void SetLastTime(TInstant time) {
  175. LastPoint.SetTime(time);
  176. }
  177. template <typename T>
  178. void SetLastValue(T value) {
  179. LastPoint.SetValue(value);
  180. }
  181. void SaveLastPoint() {
  182. DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(),
  183. "cannot add point without or zero timestamp");
  184. if (!HistogramBuilder.Empty()) {
  185. auto histogram = HistogramBuilder.Build();
  186. TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get());
  187. } else if (!SummaryBuilder.Empty()) {
  188. auto summary = SummaryBuilder.Build();
  189. TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get());
  190. } else if (!LogHistBuilder.Empty()) {
  191. auto logHist = LogHistBuilder.Build();
  192. TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get());
  193. } else {
  194. TimeSeries.push_back(std::move(LastPoint));
  195. }
  196. }
  197. template <typename TConsumer>
  198. void Consume(TConsumer&& consumer) {
  199. if (TimeSeries.empty()) {
  200. const auto& p = LastPoint;
  201. consumer(p.GetTime(), p.GetValueType(), p.GetValue());
  202. } else {
  203. for (const auto& p: TimeSeries) {
  204. consumer(p.GetTime(), p.GetValueType(), p.GetValue());
  205. }
  206. }
  207. }
  208. };
  209. struct TCommonParts {
  210. TInstant CommonTime;
  211. TLabels CommonLabels;
  212. };
  213. class IHaltableMetricConsumer: public IMetricConsumer {
  214. public:
  215. virtual bool NeedToStop() const = 0;
  216. };
  217. // TODO(ivanzhukov@): check all states for cases when a json document is invalid
  218. // e.g. "metrics" or "commonLabels" keys are specified multiple times
  219. class TCommonPartsCollector: public IHaltableMetricConsumer {
  220. public:
  221. TCommonParts&& CommonParts() {
  222. return std::move(CommonParts_);
  223. }
  224. private:
  225. bool NeedToStop() const override {
  226. return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty();
  227. }
  228. void OnStreamBegin() override {
  229. }
  230. void OnStreamEnd() override {
  231. }
  232. void OnCommonTime(TInstant time) override {
  233. CommonParts_.CommonTime = time;
  234. }
  235. void OnMetricBegin(EMetricType) override {
  236. IsMetric_ = true;
  237. }
  238. void OnMetricEnd() override {
  239. IsMetric_ = false;
  240. }
  241. void OnLabelsBegin() override {
  242. }
  243. void OnLabelsEnd() override {
  244. }
  245. void OnLabel(TStringBuf name, TStringBuf value) override {
  246. if (!IsMetric_) {
  247. CommonParts_.CommonLabels.Add(std::move(name), std::move(value));
  248. }
  249. }
  250. void OnDouble(TInstant, double) override {
  251. }
  252. void OnInt64(TInstant, i64) override {
  253. }
  254. void OnUint64(TInstant, ui64) override {
  255. }
  256. void OnHistogram(TInstant, IHistogramSnapshotPtr) override {
  257. }
  258. void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override {
  259. }
  260. void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override {
  261. }
  262. private:
  263. TCommonParts CommonParts_;
  264. bool IsMetric_{false};
  265. };
  266. class TCommonPartsProxy: public IHaltableMetricConsumer {
  267. public:
  268. TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c)
  269. : CommonParts_{std::move(commonParts)}
  270. , Consumer_{c}
  271. {}
  272. private:
  273. bool NeedToStop() const override {
  274. return false;
  275. }
  276. void OnStreamBegin() override {
  277. Consumer_->OnStreamBegin();
  278. if (!CommonParts_.CommonLabels.Empty()) {
  279. Consumer_->OnLabelsBegin();
  280. for (auto&& label : CommonParts_.CommonLabels) {
  281. Consumer_->OnLabel(label.Name(), label.Value());
  282. }
  283. Consumer_->OnLabelsEnd();
  284. }
  285. if (TInstant::Zero() != CommonParts_.CommonTime) {
  286. Consumer_->OnCommonTime(CommonParts_.CommonTime);
  287. }
  288. }
  289. void OnStreamEnd() override {
  290. Consumer_->OnStreamEnd();
  291. }
  292. void OnCommonTime(TInstant) override {
  293. }
  294. void OnMetricBegin(EMetricType type) override {
  295. IsMetric_ = true;
  296. Consumer_->OnMetricBegin(type);
  297. }
  298. void OnMetricEnd() override {
  299. IsMetric_ = false;
  300. Consumer_->OnMetricEnd();
  301. }
  302. void OnLabelsBegin() override {
  303. if (IsMetric_) {
  304. Consumer_->OnLabelsBegin();
  305. }
  306. }
  307. void OnLabelsEnd() override {
  308. if (IsMetric_) {
  309. Consumer_->OnLabelsEnd();
  310. }
  311. }
  312. void OnLabel(TStringBuf name, TStringBuf value) override {
  313. if (IsMetric_) {
  314. Consumer_->OnLabel(std::move(name), std::move(value));
  315. }
  316. }
  317. void OnDouble(TInstant time, double value) override {
  318. Consumer_->OnDouble(time, value);
  319. }
  320. void OnInt64(TInstant time, i64 value) override {
  321. Consumer_->OnInt64(time, value);
  322. }
  323. void OnUint64(TInstant time, ui64 value) override {
  324. Consumer_->OnUint64(time, value);
  325. }
  326. void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
  327. Consumer_->OnHistogram(time, std::move(snapshot));
  328. }
  329. void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
  330. Consumer_->OnLogHistogram(time, std::move(snapshot));
  331. }
  332. void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
  333. Consumer_->OnSummaryDouble(time, std::move(snapshot));
  334. }
  335. private:
  336. const TCommonParts CommonParts_;
  337. IMetricConsumer* Consumer_;
  338. bool IsMetric_{false};
  339. };
  340. ///////////////////////////////////////////////////////////////////////
  341. // TDecoderJson
  342. ///////////////////////////////////////////////////////////////////////
  343. class TDecoderJson final: public NJson::TJsonCallbacks {
  344. struct TState {
  345. enum EState {
  346. ROOT_OBJECT = 0x01,
  347. COMMON_LABELS,
  348. COMMON_TS,
  349. METRICS_ARRAY,
  350. METRIC_OBJECT,
  351. METRIC_NAME,
  352. METRIC_LABELS,
  353. METRIC_TYPE,
  354. METRIC_MODE, // TODO: must be deleted
  355. METRIC_TIMESERIES,
  356. METRIC_TS,
  357. METRIC_VALUE,
  358. METRIC_HIST,
  359. METRIC_HIST_BOUNDS,
  360. METRIC_HIST_BUCKETS,
  361. METRIC_HIST_INF,
  362. METRIC_DSUMMARY,
  363. METRIC_DSUMMARY_SUM,
  364. METRIC_DSUMMARY_MIN,
  365. METRIC_DSUMMARY_MAX,
  366. METRIC_DSUMMARY_LAST,
  367. METRIC_DSUMMARY_COUNT,
  368. METRIC_LOG_HIST,
  369. METRIC_LOG_HIST_BASE,
  370. METRIC_LOG_HIST_ZEROS,
  371. METRIC_LOG_HIST_START_POWER,
  372. METRIC_LOG_HIST_BUCKETS,
  373. };
  374. constexpr EState Current() const noexcept {
  375. return static_cast<EState>(State_ & 0xFF);
  376. }
  377. void ToNext(EState state) noexcept {
  378. constexpr auto bitSize = 8 * sizeof(ui8);
  379. State_ = (State_ << bitSize) | static_cast<ui8>(state);
  380. }
  381. void ToPrev() noexcept {
  382. constexpr auto bitSize = 8 * sizeof(ui8);
  383. State_ = State_ >> bitSize;
  384. }
  385. private:
  386. ui64 State_ = static_cast<ui64>(ROOT_OBJECT);
  387. };
  388. public:
  389. TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel)
  390. : Data_(data)
  391. , MetricConsumer_(metricConsumer)
  392. , MetricNameLabel_(metricNameLabel)
  393. {
  394. }
  395. private:
  396. #define PARSE_ENSURE(CONDITION, ...) \
  397. do { \
  398. if (Y_UNLIKELY(!(CONDITION))) { \
  399. ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \
  400. return false; \
  401. } \
  402. } while (false)
  403. bool OnInteger(long long value) override {
  404. switch (State_.Current()) {
  405. case TState::COMMON_TS:
  406. PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value);
  407. MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
  408. State_.ToPrev();
  409. if (MetricConsumer_->NeedToStop()) {
  410. IsIntentionallyHalted_ = true;
  411. return false;
  412. }
  413. break;
  414. case TState::METRIC_TS:
  415. PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value);
  416. LastMetric_.SetLastTime(TInstant::Seconds(value));
  417. State_.ToPrev();
  418. break;
  419. case TState::METRIC_VALUE:
  420. LastMetric_.SetLastValue(static_cast<i64>(value));
  421. State_.ToPrev();
  422. break;
  423. case TState::METRIC_HIST_BOUNDS:
  424. LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
  425. break;
  426. case TState::METRIC_HIST_BUCKETS:
  427. PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value);
  428. LastMetric_.HistogramBuilder.AddValue(value);
  429. break;
  430. case TState::METRIC_HIST_INF:
  431. PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value);
  432. LastMetric_.HistogramBuilder.AddInf(value);
  433. State_.ToPrev();
  434. break;
  435. case TState::METRIC_DSUMMARY_COUNT:
  436. LastMetric_.SummaryBuilder.SetCount(value);
  437. State_.ToPrev();
  438. break;
  439. case TState::METRIC_DSUMMARY_SUM:
  440. LastMetric_.SummaryBuilder.SetSum(value);
  441. State_.ToPrev();
  442. break;
  443. case TState::METRIC_DSUMMARY_MIN:
  444. LastMetric_.SummaryBuilder.SetMin(value);
  445. State_.ToPrev();
  446. break;
  447. case TState::METRIC_DSUMMARY_MAX:
  448. LastMetric_.SummaryBuilder.SetMax(value);
  449. State_.ToPrev();
  450. break;
  451. case TState::METRIC_DSUMMARY_LAST:
  452. LastMetric_.SummaryBuilder.SetLast(value);
  453. State_.ToPrev();
  454. break;
  455. case TState::METRIC_LOG_HIST_BASE:
  456. LastMetric_.LogHistBuilder.SetBase(value);
  457. State_.ToPrev();
  458. break;
  459. case TState::METRIC_LOG_HIST_ZEROS:
  460. LastMetric_.LogHistBuilder.SetZerosCount(value);
  461. State_.ToPrev();
  462. break;
  463. case TState::METRIC_LOG_HIST_START_POWER:
  464. LastMetric_.LogHistBuilder.SetStartPower(value);
  465. State_.ToPrev();
  466. break;
  467. case TState::METRIC_LOG_HIST_BUCKETS:
  468. LastMetric_.LogHistBuilder.AddBucketValue(value);
  469. break;
  470. default:
  471. return false;
  472. }
  473. return true;
  474. }
  475. bool OnUInteger(unsigned long long value) override {
  476. switch (State_.Current()) {
  477. case TState::COMMON_TS:
  478. MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
  479. State_.ToPrev();
  480. if (MetricConsumer_->NeedToStop()) {
  481. IsIntentionallyHalted_ = true;
  482. return false;
  483. }
  484. break;
  485. case TState::METRIC_TS:
  486. LastMetric_.SetLastTime(TInstant::Seconds(value));
  487. State_.ToPrev();
  488. break;
  489. case TState::METRIC_VALUE:
  490. PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value);
  491. LastMetric_.SetLastValue(static_cast<ui64>(value));
  492. State_.ToPrev();
  493. break;
  494. case TState::METRIC_HIST_BOUNDS:
  495. LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
  496. break;
  497. case TState::METRIC_HIST_BUCKETS:
  498. PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value);
  499. LastMetric_.HistogramBuilder.AddValue(value);
  500. break;
  501. case TState::METRIC_HIST_INF:
  502. LastMetric_.HistogramBuilder.AddInf(value);
  503. State_.ToPrev();
  504. break;
  505. case TState::METRIC_DSUMMARY_COUNT:
  506. LastMetric_.SummaryBuilder.SetCount(value);
  507. State_.ToPrev();
  508. break;
  509. case TState::METRIC_DSUMMARY_SUM:
  510. LastMetric_.SummaryBuilder.SetSum(value);
  511. State_.ToPrev();
  512. break;
  513. case TState::METRIC_DSUMMARY_MIN:
  514. LastMetric_.SummaryBuilder.SetMin(value);
  515. State_.ToPrev();
  516. break;
  517. case TState::METRIC_DSUMMARY_MAX:
  518. LastMetric_.SummaryBuilder.SetMax(value);
  519. State_.ToPrev();
  520. break;
  521. case TState::METRIC_DSUMMARY_LAST:
  522. LastMetric_.SummaryBuilder.SetLast(value);
  523. State_.ToPrev();
  524. break;
  525. case TState::METRIC_LOG_HIST_BASE:
  526. LastMetric_.LogHistBuilder.SetBase(value);
  527. State_.ToPrev();
  528. break;
  529. case TState::METRIC_LOG_HIST_ZEROS:
  530. LastMetric_.LogHistBuilder.SetZerosCount(value);
  531. State_.ToPrev();
  532. break;
  533. case TState::METRIC_LOG_HIST_START_POWER:
  534. LastMetric_.LogHistBuilder.SetStartPower(value);
  535. State_.ToPrev();
  536. break;
  537. case TState::METRIC_LOG_HIST_BUCKETS:
  538. LastMetric_.LogHistBuilder.AddBucketValue(value);
  539. break;
  540. default:
  541. return false;
  542. }
  543. return true;
  544. }
  545. bool OnDouble(double value) override {
  546. switch (State_.Current()) {
  547. case TState::METRIC_VALUE:
  548. LastMetric_.SetLastValue(value);
  549. State_.ToPrev();
  550. break;
  551. case TState::METRIC_HIST_BOUNDS:
  552. LastMetric_.HistogramBuilder.AddBound(value);
  553. break;
  554. case TState::METRIC_DSUMMARY_SUM:
  555. LastMetric_.SummaryBuilder.SetSum(value);
  556. State_.ToPrev();
  557. break;
  558. case TState::METRIC_DSUMMARY_MIN:
  559. LastMetric_.SummaryBuilder.SetMin(value);
  560. State_.ToPrev();
  561. break;
  562. case TState::METRIC_DSUMMARY_MAX:
  563. LastMetric_.SummaryBuilder.SetMax(value);
  564. State_.ToPrev();
  565. break;
  566. case TState::METRIC_DSUMMARY_LAST:
  567. LastMetric_.SummaryBuilder.SetLast(value);
  568. State_.ToPrev();
  569. break;
  570. case TState::METRIC_LOG_HIST_BASE:
  571. LastMetric_.LogHistBuilder.SetBase(value);
  572. State_.ToPrev();
  573. break;
  574. case TState::METRIC_LOG_HIST_BUCKETS:
  575. LastMetric_.LogHistBuilder.AddBucketValue(value);
  576. break;
  577. default:
  578. return false;
  579. }
  580. return true;
  581. }
  582. bool OnString(const TStringBuf& value) override {
  583. switch (State_.Current()) {
  584. case TState::COMMON_LABELS:
  585. PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels");
  586. MetricConsumer_->OnLabel(LastLabelName_, TString{value});
  587. break;
  588. case TState::METRIC_LABELS:
  589. PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels");
  590. LastMetric_.Labels.Add(LastLabelName_, TString{value});
  591. break;
  592. case TState::METRIC_NAME:
  593. PARSE_ENSURE(!value.empty(), "empty metric name");
  594. LastMetric_.Labels.Add(MetricNameLabel_, TString{value});
  595. State_.ToPrev();
  596. break;
  597. case TState::COMMON_TS:
  598. MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value));
  599. State_.ToPrev();
  600. if (MetricConsumer_->NeedToStop()) {
  601. IsIntentionallyHalted_ = true;
  602. return false;
  603. }
  604. break;
  605. case TState::METRIC_TS:
  606. LastMetric_.SetLastTime(TInstant::ParseIso8601(value));
  607. State_.ToPrev();
  608. break;
  609. case TState::METRIC_VALUE:
  610. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  611. LastMetric_.SetLastValue(doubleValue);
  612. } else {
  613. return false;
  614. }
  615. State_.ToPrev();
  616. break;
  617. case TState::METRIC_TYPE:
  618. LastMetric_.Type = MetricTypeFromStr(value);
  619. State_.ToPrev();
  620. break;
  621. case TState::METRIC_MODE:
  622. if (value == TStringBuf("deriv")) {
  623. LastMetric_.Type = EMetricType::RATE;
  624. }
  625. State_.ToPrev();
  626. break;
  627. case TState::METRIC_DSUMMARY_SUM:
  628. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  629. LastMetric_.SummaryBuilder.SetSum(doubleValue);
  630. } else {
  631. return false;
  632. }
  633. State_.ToPrev();
  634. break;
  635. case TState::METRIC_DSUMMARY_MIN:
  636. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  637. LastMetric_.SummaryBuilder.SetMin(doubleValue);
  638. } else {
  639. return false;
  640. }
  641. State_.ToPrev();
  642. break;
  643. case TState::METRIC_DSUMMARY_MAX:
  644. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  645. LastMetric_.SummaryBuilder.SetMax(doubleValue);
  646. } else {
  647. return false;
  648. }
  649. State_.ToPrev();
  650. break;
  651. case TState::METRIC_DSUMMARY_LAST:
  652. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  653. LastMetric_.SummaryBuilder.SetLast(doubleValue);
  654. } else {
  655. return false;
  656. }
  657. State_.ToPrev();
  658. break;
  659. default:
  660. return false;
  661. }
  662. return true;
  663. }
  664. bool OnMapKey(const TStringBuf& key) override {
  665. switch (State_.Current()) {
  666. case TState::ROOT_OBJECT:
  667. if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) {
  668. State_.ToNext(TState::COMMON_LABELS);
  669. } else if (key == TStringBuf("ts")) {
  670. State_.ToNext(TState::COMMON_TS);
  671. } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) {
  672. State_.ToNext(TState::METRICS_ARRAY);
  673. }
  674. break;
  675. case TState::COMMON_LABELS:
  676. case TState::METRIC_LABELS:
  677. LastLabelName_ = key;
  678. break;
  679. case TState::METRIC_OBJECT:
  680. if (key == TStringBuf("labels")) {
  681. State_.ToNext(TState::METRIC_LABELS);
  682. } else if (key == TStringBuf("name")) {
  683. State_.ToNext(TState::METRIC_NAME);
  684. } else if (key == TStringBuf("ts")) {
  685. PARSE_ENSURE(!LastMetric_.SeenTimeseries,
  686. "mixed timeseries and ts attributes");
  687. LastMetric_.SeenTsOrValue = true;
  688. State_.ToNext(TState::METRIC_TS);
  689. } else if (key == TStringBuf("value")) {
  690. PARSE_ENSURE(!LastMetric_.SeenTimeseries,
  691. "mixed timeseries and value attributes");
  692. LastMetric_.SeenTsOrValue = true;
  693. State_.ToNext(TState::METRIC_VALUE);
  694. } else if (key == TStringBuf("timeseries")) {
  695. PARSE_ENSURE(!LastMetric_.SeenTsOrValue,
  696. "mixed timeseries and ts/value attributes");
  697. LastMetric_.SeenTimeseries = true;
  698. State_.ToNext(TState::METRIC_TIMESERIES);
  699. } else if (key == TStringBuf("mode")) {
  700. State_.ToNext(TState::METRIC_MODE);
  701. } else if (key == TStringBuf("kind") || key == TStringBuf("type")) {
  702. State_.ToNext(TState::METRIC_TYPE);
  703. } else if (key == TStringBuf("hist")) {
  704. State_.ToNext(TState::METRIC_HIST);
  705. } else if (key == TStringBuf("summary")) {
  706. State_.ToNext(TState::METRIC_DSUMMARY);
  707. } else if (key == TStringBuf("log_hist")) {
  708. State_.ToNext(TState::METRIC_LOG_HIST);
  709. } else if (key == TStringBuf("memOnly")) {
  710. // deprecated. Skip it without errors for backward compatibility
  711. } else {
  712. ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema";
  713. return false;
  714. }
  715. break;
  716. case TState::METRIC_TIMESERIES:
  717. if (key == TStringBuf("ts")) {
  718. State_.ToNext(TState::METRIC_TS);
  719. } else if (key == TStringBuf("value")) {
  720. State_.ToNext(TState::METRIC_VALUE);
  721. } else if (key == TStringBuf("hist")) {
  722. State_.ToNext(TState::METRIC_HIST);
  723. } else if (key == TStringBuf("summary")) {
  724. State_.ToNext(TState::METRIC_DSUMMARY);
  725. } else if (key == TStringBuf("log_hist")) {
  726. State_.ToNext(TState::METRIC_LOG_HIST);
  727. }
  728. break;
  729. case TState::METRIC_HIST:
  730. if (key == TStringBuf("bounds")) {
  731. State_.ToNext(TState::METRIC_HIST_BOUNDS);
  732. } else if (key == TStringBuf("buckets")) {
  733. State_.ToNext(TState::METRIC_HIST_BUCKETS);
  734. } else if (key == TStringBuf("inf")) {
  735. State_.ToNext(TState::METRIC_HIST_INF);
  736. }
  737. break;
  738. case TState::METRIC_LOG_HIST:
  739. if (key == TStringBuf("base")) {
  740. State_.ToNext(TState::METRIC_LOG_HIST_BASE);
  741. } else if (key == TStringBuf("zeros_count")) {
  742. State_.ToNext(TState::METRIC_LOG_HIST_ZEROS);
  743. } else if (key == TStringBuf("start_power")) {
  744. State_.ToNext(TState::METRIC_LOG_HIST_START_POWER);
  745. } else if (key == TStringBuf("buckets")) {
  746. State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS);
  747. }
  748. break;
  749. case TState::METRIC_DSUMMARY:
  750. if (key == TStringBuf("sum")) {
  751. State_.ToNext(TState::METRIC_DSUMMARY_SUM);
  752. } else if (key == TStringBuf("min")) {
  753. State_.ToNext(TState::METRIC_DSUMMARY_MIN);
  754. } else if (key == TStringBuf("max")) {
  755. State_.ToNext(TState::METRIC_DSUMMARY_MAX);
  756. } else if (key == TStringBuf("last")) {
  757. State_.ToNext(TState::METRIC_DSUMMARY_LAST);
  758. } else if (key == TStringBuf("count")) {
  759. State_.ToNext(TState::METRIC_DSUMMARY_COUNT);
  760. }
  761. break;
  762. default:
  763. return false;
  764. }
  765. return true;
  766. }
  767. bool OnOpenMap() override {
  768. switch (State_.Current()) {
  769. case TState::ROOT_OBJECT:
  770. MetricConsumer_->OnStreamBegin();
  771. break;
  772. case TState::COMMON_LABELS:
  773. MetricConsumer_->OnLabelsBegin();
  774. break;
  775. case TState::METRICS_ARRAY:
  776. State_.ToNext(TState::METRIC_OBJECT);
  777. LastMetric_.Clear();
  778. break;
  779. default:
  780. break;
  781. }
  782. return true;
  783. }
  784. bool OnCloseMap() override {
  785. switch (State_.Current()) {
  786. case TState::ROOT_OBJECT:
  787. MetricConsumer_->OnStreamEnd();
  788. break;
  789. case TState::METRIC_LABELS:
  790. State_.ToPrev();
  791. break;
  792. case TState::COMMON_LABELS:
  793. MetricConsumer_->OnLabelsEnd();
  794. State_.ToPrev();
  795. if (MetricConsumer_->NeedToStop()) {
  796. IsIntentionallyHalted_ = true;
  797. return false;
  798. }
  799. break;
  800. case TState::METRIC_OBJECT:
  801. ConsumeMetric();
  802. State_.ToPrev();
  803. break;
  804. case TState::METRIC_TIMESERIES:
  805. LastMetric_.SaveLastPoint();
  806. break;
  807. case TState::METRIC_HIST:
  808. case TState::METRIC_DSUMMARY:
  809. case TState::METRIC_LOG_HIST:
  810. State_.ToPrev();
  811. break;
  812. default:
  813. break;
  814. }
  815. return true;
  816. }
  817. bool OnOpenArray() override {
  818. auto currentState = State_.Current();
  819. PARSE_ENSURE(
  820. currentState == TState::METRICS_ARRAY ||
  821. currentState == TState::METRIC_TIMESERIES ||
  822. currentState == TState::METRIC_HIST_BOUNDS ||
  823. currentState == TState::METRIC_HIST_BUCKETS ||
  824. currentState == TState::METRIC_LOG_HIST_BUCKETS,
  825. "unexpected array begin");
  826. return true;
  827. }
  828. bool OnCloseArray() override {
  829. switch (State_.Current()) {
  830. case TState::METRICS_ARRAY:
  831. case TState::METRIC_TIMESERIES:
  832. case TState::METRIC_HIST_BOUNDS:
  833. case TState::METRIC_HIST_BUCKETS:
  834. case TState::METRIC_LOG_HIST_BUCKETS:
  835. State_.ToPrev();
  836. break;
  837. default:
  838. return false;
  839. }
  840. return true;
  841. }
  842. void OnError(size_t off, TStringBuf reason) override {
  843. if (IsIntentionallyHalted_) {
  844. return;
  845. }
  846. size_t snippetBeg = (off < 20) ? 0 : (off - 20);
  847. TStringBuf snippet = Data_.SubStr(snippetBeg, 40);
  848. throw TJsonDecodeError()
  849. << "cannot parse JSON, error at: " << off
  850. << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_})
  851. << "\nsnippet: ..." << snippet << "...";
  852. }
  853. bool OnEnd() override {
  854. return true;
  855. }
  856. void ConsumeMetric() {
  857. // for backwad compatibility all unknown metrics treated as gauges
  858. if (LastMetric_.Type == EMetricType::UNKNOWN) {
  859. if (LastMetric_.HistogramBuilder.Empty()) {
  860. LastMetric_.Type = EMetricType::GAUGE;
  861. } else {
  862. LastMetric_.Type = EMetricType::HIST;
  863. }
  864. }
  865. // (1) begin metric
  866. MetricConsumer_->OnMetricBegin(LastMetric_.Type);
  867. // (2) labels
  868. if (!LastMetric_.Labels.empty()) {
  869. MetricConsumer_->OnLabelsBegin();
  870. for (auto&& label : LastMetric_.Labels) {
  871. MetricConsumer_->OnLabel(label.Name(), label.Value());
  872. }
  873. MetricConsumer_->OnLabelsEnd();
  874. }
  875. // (3) values
  876. switch (LastMetric_.Type) {
  877. case EMetricType::GAUGE:
  878. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  879. MetricConsumer_->OnDouble(time, value.AsDouble(valueType));
  880. });
  881. break;
  882. case EMetricType::IGAUGE:
  883. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  884. MetricConsumer_->OnInt64(time, value.AsInt64(valueType));
  885. });
  886. break;
  887. case EMetricType::COUNTER:
  888. case EMetricType::RATE:
  889. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  890. MetricConsumer_->OnUint64(time, value.AsUint64(valueType));
  891. });
  892. break;
  893. case EMetricType::HIST:
  894. case EMetricType::HIST_RATE:
  895. if (LastMetric_.TimeSeries.empty()) {
  896. auto time = LastMetric_.LastPoint.GetTime();
  897. auto histogram = LastMetric_.HistogramBuilder.Build();
  898. MetricConsumer_->OnHistogram(time, histogram);
  899. } else {
  900. for (const auto& p : LastMetric_.TimeSeries) {
  901. DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram");
  902. MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram());
  903. }
  904. }
  905. break;
  906. case EMetricType::DSUMMARY:
  907. if (LastMetric_.TimeSeries.empty()) {
  908. auto time = LastMetric_.LastPoint.GetTime();
  909. auto summary = LastMetric_.SummaryBuilder.Build();
  910. MetricConsumer_->OnSummaryDouble(time, summary);
  911. } else {
  912. for (const auto& p : LastMetric_.TimeSeries) {
  913. DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary");
  914. MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble());
  915. }
  916. }
  917. break;
  918. case EMetricType::LOGHIST:
  919. if (LastMetric_.TimeSeries.empty()) {
  920. auto time = LastMetric_.LastPoint.GetTime();
  921. auto logHist = LastMetric_.LogHistBuilder.Build();
  922. MetricConsumer_->OnLogHistogram(time, logHist);
  923. } else {
  924. for (const auto& p : LastMetric_.TimeSeries) {
  925. DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram");
  926. MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram());
  927. }
  928. }
  929. break;
  930. case EMetricType::UNKNOWN:
  931. // TODO: output metric labels
  932. ythrow yexception() << "unknown metric type";
  933. }
  934. // (4) end metric
  935. MetricConsumer_->OnMetricEnd();
  936. }
  937. private:
  938. TStringBuf Data_;
  939. IHaltableMetricConsumer* MetricConsumer_;
  940. TString MetricNameLabel_;
  941. TState State_;
  942. TString LastLabelName_;
  943. TMetricCollector LastMetric_;
  944. TString ErrorMsg_;
  945. bool IsIntentionallyHalted_{false};
  946. };
  947. } // namespace
  948. void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
  949. TCommonPartsCollector commonPartsCollector;
  950. {
  951. TMemoryInput memIn(data);
  952. TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel);
  953. // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
  954. NJson::ReadJson(&memIn, &decoder);
  955. }
  956. TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c);
  957. {
  958. TMemoryInput memIn(data);
  959. TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel);
  960. // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
  961. NJson::ReadJson(&memIn, &decoder);
  962. }
  963. }
  964. #undef DECODE_ENSURE
  965. }