json_decoder.cpp 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. #include "json.h"
  2. #include "typed_point.h"
  3. #include <library/cpp/monlib/exception/exception.h>
  4. #include <library/cpp/monlib/metrics/labels.h>
  5. #include <library/cpp/monlib/metrics/metric_value.h>
  6. #include <library/cpp/json/json_reader.h>
  7. #include <util/datetime/base.h>
  8. #include <util/string/cast.h>
  9. #include <limits>
  10. namespace NMonitoring {
  11. #define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TJsonDecodeError() << __VA_ARGS__)
  12. namespace {
  13. ///////////////////////////////////////////////////////////////////////
  14. // THistogramBuilder
  15. ///////////////////////////////////////////////////////////////////////
  16. class THistogramBuilder {
  17. public:
  18. void AddBound(TBucketBound bound) {
  19. if (!Bounds_.empty()) {
  20. DECODE_ENSURE(Bounds_.back() < bound,
  21. "non sorted bounds, " << Bounds_.back() <<
  22. " >= " << bound);
  23. }
  24. Bounds_.push_back(bound);
  25. }
  26. void AddValue(TBucketValue value) {
  27. Values_.push_back(value);
  28. }
  29. void AddInf(TBucketValue value) {
  30. InfPresented_ = true;
  31. InfValue_ = value;
  32. }
  33. IHistogramSnapshotPtr Build() {
  34. if (InfPresented_) {
  35. Bounds_.push_back(Max<TBucketBound>());
  36. Values_.push_back(InfValue_);
  37. }
  38. auto snapshot = ExplicitHistogramSnapshot(Bounds_, Values_);
  39. Bounds_.clear();
  40. Values_.clear();
  41. InfPresented_ = false;
  42. return snapshot;
  43. }
  44. bool Empty() const noexcept {
  45. return Bounds_.empty() && Values_.empty();
  46. }
  47. void Clear() {
  48. Bounds_.clear();
  49. Values_.clear();
  50. }
  51. private:
  52. TBucketBounds Bounds_;
  53. TBucketValues Values_;
  54. bool InfPresented_ = false;
  55. TBucketValue InfValue_;
  56. };
  57. class TSummaryDoubleBuilder {
  58. public:
  59. ISummaryDoubleSnapshotPtr Build() const {
  60. return MakeIntrusive<TSummaryDoubleSnapshot>(Sum_, Min_, Max_, Last_, Count_);
  61. }
  62. void SetSum(double sum) {
  63. Empty_ = false;
  64. Sum_ = sum;
  65. }
  66. void SetMin(double min) {
  67. Empty_ = false;
  68. Min_ = min;
  69. }
  70. void SetMax(double max) {
  71. Empty_ = false;
  72. Max_ = max;
  73. }
  74. void SetLast(double last) {
  75. Empty_ = false;
  76. Last_ = last;
  77. }
  78. void SetCount(ui64 count) {
  79. Empty_ = false;
  80. Count_ = count;
  81. }
  82. void Clear() {
  83. Empty_ = true;
  84. Sum_ = 0;
  85. Min_ = 0;
  86. Max_ = 0;
  87. Last_ = 0;
  88. Count_ = 0;
  89. }
  90. bool Empty() const {
  91. return Empty_;
  92. }
  93. private:
  94. double Sum_ = 0;
  95. double Min_ = 0;
  96. double Max_ = 0;
  97. double Last_ = 0;
  98. ui64 Count_ = 0;
  99. bool Empty_ = true;
  100. };
  101. class TLogHistogramBuilder {
  102. public:
  103. void SetBase(double base) {
  104. DECODE_ENSURE(base > 0, "base must be positive");
  105. Base_ = base;
  106. }
  107. void SetZerosCount(ui64 zerosCount) {
  108. DECODE_ENSURE(zerosCount >= 0, "zeros count must be positive");
  109. ZerosCount_ = zerosCount;
  110. }
  111. void SetStartPower(int startPower) {
  112. StartPower_ = startPower;
  113. }
  114. void AddBucketValue(double value) {
  115. DECODE_ENSURE(value > 0.0, "bucket values must be positive");
  116. DECODE_ENSURE(value < std::numeric_limits<double>::max(), "bucket values must be finite");
  117. Buckets_.push_back(value);
  118. }
  119. void Clear() {
  120. Buckets_.clear();
  121. Base_ = 1.5;
  122. ZerosCount_ = 0;
  123. StartPower_ = 0;
  124. }
  125. bool Empty() const {
  126. return Buckets_.empty() && ZerosCount_ == 0;
  127. }
  128. TLogHistogramSnapshotPtr Build() {
  129. return MakeIntrusive<TLogHistogramSnapshot>(Base_, ZerosCount_, StartPower_, std::move(Buckets_));
  130. }
  131. private:
  132. double Base_ = 1.5;
  133. ui64 ZerosCount_ = 0;
  134. int StartPower_ = 0;
  135. TVector<double> Buckets_;
  136. };
  137. std::pair<double, bool> ParseSpecDouble(TStringBuf string) {
  138. if (string == TStringBuf("nan") || string == TStringBuf("NaN")) {
  139. return {std::numeric_limits<double>::quiet_NaN(), true};
  140. } else if (string == TStringBuf("inf") || string == TStringBuf("Infinity")) {
  141. return {std::numeric_limits<double>::infinity(), true};
  142. } else if (string == TStringBuf("-inf") || string == TStringBuf("-Infinity")) {
  143. return {-std::numeric_limits<double>::infinity(), true};
  144. } else {
  145. return {0, false};
  146. }
  147. }
  148. ///////////////////////////////////////////////////////////////////////
  149. // TMetricCollector
  150. ///////////////////////////////////////////////////////////////////////
  151. struct TMetricCollector {
  152. EMetricType Type = EMetricType::UNKNOWN;
  153. TLabels Labels;
  154. THistogramBuilder HistogramBuilder;
  155. TSummaryDoubleBuilder SummaryBuilder;
  156. TLogHistogramBuilder LogHistBuilder;
  157. TTypedPoint LastPoint;
  158. TVector<TTypedPoint> TimeSeries;
  159. bool SeenTsOrValue = false;
  160. bool SeenTimeseries = false;
  161. void Clear() {
  162. Type = EMetricType::UNKNOWN;
  163. Labels.Clear();
  164. SeenTsOrValue = false;
  165. SeenTimeseries = false;
  166. TimeSeries.clear();
  167. LastPoint = {};
  168. HistogramBuilder.Clear();
  169. SummaryBuilder.Clear();
  170. LogHistBuilder.Clear();
  171. }
  172. void AddLabel(const TLabel& label) {
  173. Labels.Add(label.Name(), label.Value());
  174. }
  175. void SetLastTime(TInstant time) {
  176. LastPoint.SetTime(time);
  177. }
  178. template <typename T>
  179. void SetLastValue(T value) {
  180. LastPoint.SetValue(value);
  181. }
  182. void SaveLastPoint() {
  183. DECODE_ENSURE(LastPoint.GetTime() != TInstant::Zero(),
  184. "cannot add point without or zero timestamp");
  185. if (!HistogramBuilder.Empty()) {
  186. auto histogram = HistogramBuilder.Build();
  187. TimeSeries.emplace_back(LastPoint.GetTime(), histogram.Get());
  188. } else if (!SummaryBuilder.Empty()) {
  189. auto summary = SummaryBuilder.Build();
  190. TimeSeries.emplace_back(LastPoint.GetTime(), summary.Get());
  191. } else if (!LogHistBuilder.Empty()) {
  192. auto logHist = LogHistBuilder.Build();
  193. TimeSeries.emplace_back(LastPoint.GetTime(), logHist.Get());
  194. } else {
  195. TimeSeries.push_back(std::move(LastPoint));
  196. }
  197. }
  198. template <typename TConsumer>
  199. void Consume(TConsumer&& consumer) {
  200. if (TimeSeries.empty()) {
  201. const auto& p = LastPoint;
  202. consumer(p.GetTime(), p.GetValueType(), p.GetValue());
  203. } else {
  204. for (const auto& p: TimeSeries) {
  205. consumer(p.GetTime(), p.GetValueType(), p.GetValue());
  206. }
  207. }
  208. }
  209. };
  210. struct TCommonParts {
  211. TInstant CommonTime;
  212. TLabels CommonLabels;
  213. };
  214. class IHaltableMetricConsumer: public IMetricConsumer {
  215. public:
  216. virtual bool NeedToStop() const = 0;
  217. };
  218. // TODO(ivanzhukov@): check all states for cases when a json document is invalid
  219. // e.g. "metrics" or "commonLabels" keys are specified multiple times
  220. class TCommonPartsCollector: public IHaltableMetricConsumer {
  221. public:
  222. TCommonParts&& CommonParts() {
  223. return std::move(CommonParts_);
  224. }
  225. private:
  226. bool NeedToStop() const override {
  227. return TInstant::Zero() != CommonParts_.CommonTime && !CommonParts_.CommonLabels.Empty();
  228. }
  229. void OnStreamBegin() override {
  230. }
  231. void OnStreamEnd() override {
  232. }
  233. void OnCommonTime(TInstant time) override {
  234. CommonParts_.CommonTime = time;
  235. }
  236. void OnMetricBegin(EMetricType) override {
  237. IsMetric_ = true;
  238. }
  239. void OnMetricEnd() override {
  240. IsMetric_ = false;
  241. }
  242. void OnLabelsBegin() override {
  243. }
  244. void OnLabelsEnd() override {
  245. }
  246. void OnLabel(TStringBuf name, TStringBuf value) override {
  247. if (!IsMetric_) {
  248. CommonParts_.CommonLabels.Add(std::move(name), std::move(value));
  249. }
  250. }
  251. void OnDouble(TInstant, double) override {
  252. }
  253. void OnInt64(TInstant, i64) override {
  254. }
  255. void OnUint64(TInstant, ui64) override {
  256. }
  257. void OnHistogram(TInstant, IHistogramSnapshotPtr) override {
  258. }
  259. void OnLogHistogram(TInstant, TLogHistogramSnapshotPtr) override {
  260. }
  261. void OnSummaryDouble(TInstant, ISummaryDoubleSnapshotPtr) override {
  262. }
  263. private:
  264. TCommonParts CommonParts_;
  265. bool IsMetric_{false};
  266. };
  267. class TCommonPartsProxy: public IHaltableMetricConsumer {
  268. public:
  269. TCommonPartsProxy(TCommonParts&& commonParts, IMetricConsumer* c)
  270. : CommonParts_{std::move(commonParts)}
  271. , Consumer_{c}
  272. {}
  273. private:
  274. bool NeedToStop() const override {
  275. return false;
  276. }
  277. void OnStreamBegin() override {
  278. Consumer_->OnStreamBegin();
  279. if (!CommonParts_.CommonLabels.Empty()) {
  280. Consumer_->OnLabelsBegin();
  281. for (auto&& label : CommonParts_.CommonLabels) {
  282. Consumer_->OnLabel(label.Name(), label.Value());
  283. }
  284. Consumer_->OnLabelsEnd();
  285. }
  286. if (TInstant::Zero() != CommonParts_.CommonTime) {
  287. Consumer_->OnCommonTime(CommonParts_.CommonTime);
  288. }
  289. }
  290. void OnStreamEnd() override {
  291. Consumer_->OnStreamEnd();
  292. }
  293. void OnCommonTime(TInstant) override {
  294. }
  295. void OnMetricBegin(EMetricType type) override {
  296. IsMetric_ = true;
  297. Consumer_->OnMetricBegin(type);
  298. }
  299. void OnMetricEnd() override {
  300. IsMetric_ = false;
  301. Consumer_->OnMetricEnd();
  302. }
  303. void OnLabelsBegin() override {
  304. if (IsMetric_) {
  305. Consumer_->OnLabelsBegin();
  306. }
  307. }
  308. void OnLabelsEnd() override {
  309. if (IsMetric_) {
  310. Consumer_->OnLabelsEnd();
  311. }
  312. }
  313. void OnLabel(TStringBuf name, TStringBuf value) override {
  314. if (IsMetric_) {
  315. Consumer_->OnLabel(std::move(name), std::move(value));
  316. }
  317. }
  318. void OnDouble(TInstant time, double value) override {
  319. Consumer_->OnDouble(time, value);
  320. }
  321. void OnInt64(TInstant time, i64 value) override {
  322. Consumer_->OnInt64(time, value);
  323. }
  324. void OnUint64(TInstant time, ui64 value) override {
  325. Consumer_->OnUint64(time, value);
  326. }
  327. void OnHistogram(TInstant time, IHistogramSnapshotPtr snapshot) override {
  328. Consumer_->OnHistogram(time, std::move(snapshot));
  329. }
  330. void OnLogHistogram(TInstant time, TLogHistogramSnapshotPtr snapshot) override {
  331. Consumer_->OnLogHistogram(time, std::move(snapshot));
  332. }
  333. void OnSummaryDouble(TInstant time, ISummaryDoubleSnapshotPtr snapshot) override {
  334. Consumer_->OnSummaryDouble(time, std::move(snapshot));
  335. }
  336. private:
  337. const TCommonParts CommonParts_;
  338. IMetricConsumer* Consumer_;
  339. bool IsMetric_{false};
  340. };
  341. ///////////////////////////////////////////////////////////////////////
  342. // TDecoderJson
  343. ///////////////////////////////////////////////////////////////////////
  344. class TDecoderJson final: public NJson::TJsonCallbacks {
  345. struct TState {
  346. enum EState {
  347. ROOT_OBJECT = 0x01,
  348. COMMON_LABELS,
  349. COMMON_TS,
  350. METRICS_ARRAY,
  351. METRIC_OBJECT,
  352. METRIC_NAME,
  353. METRIC_LABELS,
  354. METRIC_TYPE,
  355. METRIC_MODE, // TODO: must be deleted
  356. METRIC_TIMESERIES,
  357. METRIC_TS,
  358. METRIC_VALUE,
  359. METRIC_HIST,
  360. METRIC_HIST_BOUNDS,
  361. METRIC_HIST_BUCKETS,
  362. METRIC_HIST_INF,
  363. METRIC_DSUMMARY,
  364. METRIC_DSUMMARY_SUM,
  365. METRIC_DSUMMARY_MIN,
  366. METRIC_DSUMMARY_MAX,
  367. METRIC_DSUMMARY_LAST,
  368. METRIC_DSUMMARY_COUNT,
  369. METRIC_LOG_HIST,
  370. METRIC_LOG_HIST_BASE,
  371. METRIC_LOG_HIST_ZEROS,
  372. METRIC_LOG_HIST_START_POWER,
  373. METRIC_LOG_HIST_BUCKETS,
  374. };
  375. constexpr EState Current() const noexcept {
  376. return static_cast<EState>(State_ & 0xFF);
  377. }
  378. void ToNext(EState state) noexcept {
  379. constexpr auto bitSize = 8 * sizeof(ui8);
  380. State_ = (State_ << bitSize) | static_cast<ui8>(state);
  381. }
  382. void ToPrev() noexcept {
  383. constexpr auto bitSize = 8 * sizeof(ui8);
  384. State_ = State_ >> bitSize;
  385. }
  386. private:
  387. ui64 State_ = static_cast<ui64>(ROOT_OBJECT);
  388. };
  389. public:
  390. TDecoderJson(TStringBuf data, IHaltableMetricConsumer* metricConsumer, TStringBuf metricNameLabel)
  391. : Data_(data)
  392. , MetricConsumer_(metricConsumer)
  393. , MetricNameLabel_(metricNameLabel)
  394. {
  395. }
  396. private:
  397. #define PARSE_ENSURE(CONDITION, ...) \
  398. do { \
  399. if (Y_UNLIKELY(!(CONDITION))) { \
  400. ErrorMsg_ = TStringBuilder() << __VA_ARGS__; \
  401. return false; \
  402. } \
  403. } while (false)
  404. bool OnInteger(long long value) override {
  405. switch (State_.Current()) {
  406. case TState::COMMON_TS:
  407. PARSE_ENSURE(value >= 0, "unexpected negative number in a common timestamp: " << value);
  408. MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
  409. State_.ToPrev();
  410. if (MetricConsumer_->NeedToStop()) {
  411. IsIntentionallyHalted_ = true;
  412. return false;
  413. }
  414. break;
  415. case TState::METRIC_TS:
  416. PARSE_ENSURE(value >= 0, "unexpected negative number in a metric timestamp: " << value);
  417. LastMetric_.SetLastTime(TInstant::Seconds(value));
  418. State_.ToPrev();
  419. break;
  420. case TState::METRIC_VALUE:
  421. LastMetric_.SetLastValue(static_cast<i64>(value));
  422. State_.ToPrev();
  423. break;
  424. case TState::METRIC_HIST_BOUNDS:
  425. LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
  426. break;
  427. case TState::METRIC_HIST_BUCKETS:
  428. PARSE_ENSURE(value >= 0 && static_cast<ui64>(value) <= Max<TBucketValues::value_type>(), "value is out of bounds " << value);
  429. LastMetric_.HistogramBuilder.AddValue(value);
  430. break;
  431. case TState::METRIC_HIST_INF:
  432. PARSE_ENSURE(value >= 0, "unexpected negative number in histogram inf: " << value);
  433. LastMetric_.HistogramBuilder.AddInf(value);
  434. State_.ToPrev();
  435. break;
  436. case TState::METRIC_DSUMMARY_COUNT:
  437. LastMetric_.SummaryBuilder.SetCount(value);
  438. State_.ToPrev();
  439. break;
  440. case TState::METRIC_DSUMMARY_SUM:
  441. LastMetric_.SummaryBuilder.SetSum(value);
  442. State_.ToPrev();
  443. break;
  444. case TState::METRIC_DSUMMARY_MIN:
  445. LastMetric_.SummaryBuilder.SetMin(value);
  446. State_.ToPrev();
  447. break;
  448. case TState::METRIC_DSUMMARY_MAX:
  449. LastMetric_.SummaryBuilder.SetMax(value);
  450. State_.ToPrev();
  451. break;
  452. case TState::METRIC_DSUMMARY_LAST:
  453. LastMetric_.SummaryBuilder.SetLast(value);
  454. State_.ToPrev();
  455. break;
  456. case TState::METRIC_LOG_HIST_BASE:
  457. LastMetric_.LogHistBuilder.SetBase(value);
  458. State_.ToPrev();
  459. break;
  460. case TState::METRIC_LOG_HIST_ZEROS:
  461. LastMetric_.LogHistBuilder.SetZerosCount(value);
  462. State_.ToPrev();
  463. break;
  464. case TState::METRIC_LOG_HIST_START_POWER:
  465. LastMetric_.LogHistBuilder.SetStartPower(value);
  466. State_.ToPrev();
  467. break;
  468. case TState::METRIC_LOG_HIST_BUCKETS:
  469. LastMetric_.LogHistBuilder.AddBucketValue(value);
  470. break;
  471. default:
  472. return false;
  473. }
  474. return true;
  475. }
  476. bool OnUInteger(unsigned long long value) override {
  477. switch (State_.Current()) {
  478. case TState::COMMON_TS:
  479. MetricConsumer_->OnCommonTime(TInstant::Seconds(value));
  480. State_.ToPrev();
  481. if (MetricConsumer_->NeedToStop()) {
  482. IsIntentionallyHalted_ = true;
  483. return false;
  484. }
  485. break;
  486. case TState::METRIC_TS:
  487. LastMetric_.SetLastTime(TInstant::Seconds(value));
  488. State_.ToPrev();
  489. break;
  490. case TState::METRIC_VALUE:
  491. PARSE_ENSURE(value <= Max<ui64>(), "Metric value is out of bounds: " << value);
  492. LastMetric_.SetLastValue(static_cast<ui64>(value));
  493. State_.ToPrev();
  494. break;
  495. case TState::METRIC_HIST_BOUNDS:
  496. LastMetric_.HistogramBuilder.AddBound(static_cast<double>(value));
  497. break;
  498. case TState::METRIC_HIST_BUCKETS:
  499. PARSE_ENSURE(value <= Max<TBucketValues::value_type>(), "Histogram bucket value is out of bounds: " << value);
  500. LastMetric_.HistogramBuilder.AddValue(value);
  501. break;
  502. case TState::METRIC_HIST_INF:
  503. LastMetric_.HistogramBuilder.AddInf(value);
  504. State_.ToPrev();
  505. break;
  506. case TState::METRIC_DSUMMARY_COUNT:
  507. LastMetric_.SummaryBuilder.SetCount(value);
  508. State_.ToPrev();
  509. break;
  510. case TState::METRIC_DSUMMARY_SUM:
  511. LastMetric_.SummaryBuilder.SetSum(value);
  512. State_.ToPrev();
  513. break;
  514. case TState::METRIC_DSUMMARY_MIN:
  515. LastMetric_.SummaryBuilder.SetMin(value);
  516. State_.ToPrev();
  517. break;
  518. case TState::METRIC_DSUMMARY_MAX:
  519. LastMetric_.SummaryBuilder.SetMax(value);
  520. State_.ToPrev();
  521. break;
  522. case TState::METRIC_DSUMMARY_LAST:
  523. LastMetric_.SummaryBuilder.SetLast(value);
  524. State_.ToPrev();
  525. break;
  526. case TState::METRIC_LOG_HIST_BASE:
  527. LastMetric_.LogHistBuilder.SetBase(value);
  528. State_.ToPrev();
  529. break;
  530. case TState::METRIC_LOG_HIST_ZEROS:
  531. LastMetric_.LogHistBuilder.SetZerosCount(value);
  532. State_.ToPrev();
  533. break;
  534. case TState::METRIC_LOG_HIST_START_POWER:
  535. LastMetric_.LogHistBuilder.SetStartPower(value);
  536. State_.ToPrev();
  537. break;
  538. case TState::METRIC_LOG_HIST_BUCKETS:
  539. LastMetric_.LogHistBuilder.AddBucketValue(value);
  540. break;
  541. default:
  542. return false;
  543. }
  544. return true;
  545. }
  546. bool OnDouble(double value) override {
  547. switch (State_.Current()) {
  548. case TState::METRIC_VALUE:
  549. LastMetric_.SetLastValue(value);
  550. State_.ToPrev();
  551. break;
  552. case TState::METRIC_HIST_BOUNDS:
  553. LastMetric_.HistogramBuilder.AddBound(value);
  554. break;
  555. case TState::METRIC_DSUMMARY_SUM:
  556. LastMetric_.SummaryBuilder.SetSum(value);
  557. State_.ToPrev();
  558. break;
  559. case TState::METRIC_DSUMMARY_MIN:
  560. LastMetric_.SummaryBuilder.SetMin(value);
  561. State_.ToPrev();
  562. break;
  563. case TState::METRIC_DSUMMARY_MAX:
  564. LastMetric_.SummaryBuilder.SetMax(value);
  565. State_.ToPrev();
  566. break;
  567. case TState::METRIC_DSUMMARY_LAST:
  568. LastMetric_.SummaryBuilder.SetLast(value);
  569. State_.ToPrev();
  570. break;
  571. case TState::METRIC_LOG_HIST_BASE:
  572. LastMetric_.LogHistBuilder.SetBase(value);
  573. State_.ToPrev();
  574. break;
  575. case TState::METRIC_LOG_HIST_BUCKETS:
  576. LastMetric_.LogHistBuilder.AddBucketValue(value);
  577. break;
  578. default:
  579. return false;
  580. }
  581. return true;
  582. }
  583. bool OnString(const TStringBuf& value) override {
  584. switch (State_.Current()) {
  585. case TState::COMMON_LABELS:
  586. PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in common labels");
  587. MetricConsumer_->OnLabel(LastLabelName_, TString{value});
  588. break;
  589. case TState::METRIC_LABELS:
  590. PARSE_ENSURE(!LastLabelName_.empty(), "empty label name in metric labels");
  591. LastMetric_.Labels.Add(LastLabelName_, TString{value});
  592. break;
  593. case TState::METRIC_NAME:
  594. PARSE_ENSURE(!value.empty(), "empty metric name");
  595. LastMetric_.Labels.Add(MetricNameLabel_, TString{value});
  596. State_.ToPrev();
  597. break;
  598. case TState::COMMON_TS:
  599. MetricConsumer_->OnCommonTime(TInstant::ParseIso8601(value));
  600. State_.ToPrev();
  601. if (MetricConsumer_->NeedToStop()) {
  602. IsIntentionallyHalted_ = true;
  603. return false;
  604. }
  605. break;
  606. case TState::METRIC_TS:
  607. LastMetric_.SetLastTime(TInstant::ParseIso8601(value));
  608. State_.ToPrev();
  609. break;
  610. case TState::METRIC_VALUE:
  611. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  612. LastMetric_.SetLastValue(doubleValue);
  613. } else {
  614. return false;
  615. }
  616. State_.ToPrev();
  617. break;
  618. case TState::METRIC_TYPE:
  619. LastMetric_.Type = MetricTypeFromStr(value);
  620. State_.ToPrev();
  621. break;
  622. case TState::METRIC_MODE:
  623. if (value == TStringBuf("deriv")) {
  624. LastMetric_.Type = EMetricType::RATE;
  625. }
  626. State_.ToPrev();
  627. break;
  628. case TState::METRIC_DSUMMARY_SUM:
  629. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  630. LastMetric_.SummaryBuilder.SetSum(doubleValue);
  631. } else {
  632. return false;
  633. }
  634. State_.ToPrev();
  635. break;
  636. case TState::METRIC_DSUMMARY_MIN:
  637. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  638. LastMetric_.SummaryBuilder.SetMin(doubleValue);
  639. } else {
  640. return false;
  641. }
  642. State_.ToPrev();
  643. break;
  644. case TState::METRIC_DSUMMARY_MAX:
  645. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  646. LastMetric_.SummaryBuilder.SetMax(doubleValue);
  647. } else {
  648. return false;
  649. }
  650. State_.ToPrev();
  651. break;
  652. case TState::METRIC_DSUMMARY_LAST:
  653. if (auto [doubleValue, ok] = ParseSpecDouble(value); ok) {
  654. LastMetric_.SummaryBuilder.SetLast(doubleValue);
  655. } else {
  656. return false;
  657. }
  658. State_.ToPrev();
  659. break;
  660. default:
  661. return false;
  662. }
  663. return true;
  664. }
  665. bool OnMapKey(const TStringBuf& key) override {
  666. switch (State_.Current()) {
  667. case TState::ROOT_OBJECT:
  668. if (key == TStringBuf("commonLabels") || key == TStringBuf("labels")) {
  669. State_.ToNext(TState::COMMON_LABELS);
  670. } else if (key == TStringBuf("ts")) {
  671. State_.ToNext(TState::COMMON_TS);
  672. } else if (key == TStringBuf("sensors") || key == TStringBuf("metrics")) {
  673. State_.ToNext(TState::METRICS_ARRAY);
  674. }
  675. break;
  676. case TState::COMMON_LABELS:
  677. case TState::METRIC_LABELS:
  678. LastLabelName_ = key;
  679. break;
  680. case TState::METRIC_OBJECT:
  681. if (key == TStringBuf("labels")) {
  682. State_.ToNext(TState::METRIC_LABELS);
  683. } else if (key == TStringBuf("name")) {
  684. State_.ToNext(TState::METRIC_NAME);
  685. } else if (key == TStringBuf("ts")) {
  686. PARSE_ENSURE(!LastMetric_.SeenTimeseries,
  687. "mixed timeseries and ts attributes");
  688. LastMetric_.SeenTsOrValue = true;
  689. State_.ToNext(TState::METRIC_TS);
  690. } else if (key == TStringBuf("value")) {
  691. PARSE_ENSURE(!LastMetric_.SeenTimeseries,
  692. "mixed timeseries and value attributes");
  693. LastMetric_.SeenTsOrValue = true;
  694. State_.ToNext(TState::METRIC_VALUE);
  695. } else if (key == TStringBuf("timeseries")) {
  696. PARSE_ENSURE(!LastMetric_.SeenTsOrValue,
  697. "mixed timeseries and ts/value attributes");
  698. LastMetric_.SeenTimeseries = true;
  699. State_.ToNext(TState::METRIC_TIMESERIES);
  700. } else if (key == TStringBuf("mode")) {
  701. State_.ToNext(TState::METRIC_MODE);
  702. } else if (key == TStringBuf("kind") || key == TStringBuf("type")) {
  703. State_.ToNext(TState::METRIC_TYPE);
  704. } else if (key == TStringBuf("hist")) {
  705. State_.ToNext(TState::METRIC_HIST);
  706. } else if (key == TStringBuf("summary")) {
  707. State_.ToNext(TState::METRIC_DSUMMARY);
  708. } else if (key == TStringBuf("log_hist")) {
  709. State_.ToNext(TState::METRIC_LOG_HIST);
  710. } else if (key == TStringBuf("memOnly")) {
  711. // deprecated. Skip it without errors for backward compatibility
  712. } else {
  713. ErrorMsg_ = TStringBuilder() << "unexpected key \"" << key << "\" in a metric schema";
  714. return false;
  715. }
  716. break;
  717. case TState::METRIC_TIMESERIES:
  718. if (key == TStringBuf("ts")) {
  719. State_.ToNext(TState::METRIC_TS);
  720. } else if (key == TStringBuf("value")) {
  721. State_.ToNext(TState::METRIC_VALUE);
  722. } else if (key == TStringBuf("hist")) {
  723. State_.ToNext(TState::METRIC_HIST);
  724. } else if (key == TStringBuf("summary")) {
  725. State_.ToNext(TState::METRIC_DSUMMARY);
  726. } else if (key == TStringBuf("log_hist")) {
  727. State_.ToNext(TState::METRIC_LOG_HIST);
  728. }
  729. break;
  730. case TState::METRIC_HIST:
  731. if (key == TStringBuf("bounds")) {
  732. State_.ToNext(TState::METRIC_HIST_BOUNDS);
  733. } else if (key == TStringBuf("buckets")) {
  734. State_.ToNext(TState::METRIC_HIST_BUCKETS);
  735. } else if (key == TStringBuf("inf")) {
  736. State_.ToNext(TState::METRIC_HIST_INF);
  737. }
  738. break;
  739. case TState::METRIC_LOG_HIST:
  740. if (key == TStringBuf("base")) {
  741. State_.ToNext(TState::METRIC_LOG_HIST_BASE);
  742. } else if (key == TStringBuf("zeros_count")) {
  743. State_.ToNext(TState::METRIC_LOG_HIST_ZEROS);
  744. } else if (key == TStringBuf("start_power")) {
  745. State_.ToNext(TState::METRIC_LOG_HIST_START_POWER);
  746. } else if (key == TStringBuf("buckets")) {
  747. State_.ToNext(TState::METRIC_LOG_HIST_BUCKETS);
  748. }
  749. break;
  750. case TState::METRIC_DSUMMARY:
  751. if (key == TStringBuf("sum")) {
  752. State_.ToNext(TState::METRIC_DSUMMARY_SUM);
  753. } else if (key == TStringBuf("min")) {
  754. State_.ToNext(TState::METRIC_DSUMMARY_MIN);
  755. } else if (key == TStringBuf("max")) {
  756. State_.ToNext(TState::METRIC_DSUMMARY_MAX);
  757. } else if (key == TStringBuf("last")) {
  758. State_.ToNext(TState::METRIC_DSUMMARY_LAST);
  759. } else if (key == TStringBuf("count")) {
  760. State_.ToNext(TState::METRIC_DSUMMARY_COUNT);
  761. }
  762. break;
  763. default:
  764. return false;
  765. }
  766. return true;
  767. }
  768. bool OnOpenMap() override {
  769. switch (State_.Current()) {
  770. case TState::ROOT_OBJECT:
  771. MetricConsumer_->OnStreamBegin();
  772. break;
  773. case TState::COMMON_LABELS:
  774. MetricConsumer_->OnLabelsBegin();
  775. break;
  776. case TState::METRICS_ARRAY:
  777. State_.ToNext(TState::METRIC_OBJECT);
  778. LastMetric_.Clear();
  779. break;
  780. default:
  781. break;
  782. }
  783. return true;
  784. }
  785. bool OnCloseMap() override {
  786. switch (State_.Current()) {
  787. case TState::ROOT_OBJECT:
  788. MetricConsumer_->OnStreamEnd();
  789. break;
  790. case TState::METRIC_LABELS:
  791. State_.ToPrev();
  792. break;
  793. case TState::COMMON_LABELS:
  794. MetricConsumer_->OnLabelsEnd();
  795. State_.ToPrev();
  796. if (MetricConsumer_->NeedToStop()) {
  797. IsIntentionallyHalted_ = true;
  798. return false;
  799. }
  800. break;
  801. case TState::METRIC_OBJECT:
  802. ConsumeMetric();
  803. State_.ToPrev();
  804. break;
  805. case TState::METRIC_TIMESERIES:
  806. LastMetric_.SaveLastPoint();
  807. break;
  808. case TState::METRIC_HIST:
  809. case TState::METRIC_DSUMMARY:
  810. case TState::METRIC_LOG_HIST:
  811. State_.ToPrev();
  812. break;
  813. default:
  814. break;
  815. }
  816. return true;
  817. }
  818. bool OnOpenArray() override {
  819. auto currentState = State_.Current();
  820. PARSE_ENSURE(
  821. currentState == TState::METRICS_ARRAY ||
  822. currentState == TState::METRIC_TIMESERIES ||
  823. currentState == TState::METRIC_HIST_BOUNDS ||
  824. currentState == TState::METRIC_HIST_BUCKETS ||
  825. currentState == TState::METRIC_LOG_HIST_BUCKETS,
  826. "unexpected array begin");
  827. return true;
  828. }
  829. bool OnCloseArray() override {
  830. switch (State_.Current()) {
  831. case TState::METRICS_ARRAY:
  832. case TState::METRIC_TIMESERIES:
  833. case TState::METRIC_HIST_BOUNDS:
  834. case TState::METRIC_HIST_BUCKETS:
  835. case TState::METRIC_LOG_HIST_BUCKETS:
  836. State_.ToPrev();
  837. break;
  838. default:
  839. return false;
  840. }
  841. return true;
  842. }
  843. void OnError(size_t off, TStringBuf reason) override {
  844. if (IsIntentionallyHalted_) {
  845. return;
  846. }
  847. size_t snippetBeg = (off < 20) ? 0 : (off - 20);
  848. TStringBuf snippet = Data_.SubStr(snippetBeg, 40);
  849. throw TJsonDecodeError()
  850. << "cannot parse JSON, error at: " << off
  851. << ", reason: " << (ErrorMsg_.empty() ? reason : TStringBuf{ErrorMsg_})
  852. << "\nsnippet: ..." << snippet << "...";
  853. }
  854. bool OnEnd() override {
  855. return true;
  856. }
  857. void ConsumeMetric() {
  858. // for backwad compatibility all unknown metrics treated as gauges
  859. if (LastMetric_.Type == EMetricType::UNKNOWN) {
  860. if (LastMetric_.HistogramBuilder.Empty()) {
  861. LastMetric_.Type = EMetricType::GAUGE;
  862. } else {
  863. LastMetric_.Type = EMetricType::HIST;
  864. }
  865. }
  866. // (1) begin metric
  867. MetricConsumer_->OnMetricBegin(LastMetric_.Type);
  868. // (2) labels
  869. if (!LastMetric_.Labels.empty()) {
  870. MetricConsumer_->OnLabelsBegin();
  871. for (auto&& label : LastMetric_.Labels) {
  872. MetricConsumer_->OnLabel(label.Name(), label.Value());
  873. }
  874. MetricConsumer_->OnLabelsEnd();
  875. }
  876. // (3) values
  877. switch (LastMetric_.Type) {
  878. case EMetricType::GAUGE:
  879. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  880. MetricConsumer_->OnDouble(time, value.AsDouble(valueType));
  881. });
  882. break;
  883. case EMetricType::IGAUGE:
  884. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  885. MetricConsumer_->OnInt64(time, value.AsInt64(valueType));
  886. });
  887. break;
  888. case EMetricType::COUNTER:
  889. case EMetricType::RATE:
  890. LastMetric_.Consume([this](TInstant time, EMetricValueType valueType, TMetricValue value) {
  891. MetricConsumer_->OnUint64(time, value.AsUint64(valueType));
  892. });
  893. break;
  894. case EMetricType::HIST:
  895. case EMetricType::HIST_RATE:
  896. if (LastMetric_.TimeSeries.empty()) {
  897. auto time = LastMetric_.LastPoint.GetTime();
  898. auto histogram = LastMetric_.HistogramBuilder.Build();
  899. MetricConsumer_->OnHistogram(time, histogram);
  900. } else {
  901. for (const auto& p : LastMetric_.TimeSeries) {
  902. DECODE_ENSURE(p.GetValueType() == EMetricValueType::HISTOGRAM, "Value is not a histogram");
  903. MetricConsumer_->OnHistogram(p.GetTime(), p.GetValue().AsHistogram());
  904. }
  905. }
  906. break;
  907. case EMetricType::DSUMMARY:
  908. if (LastMetric_.TimeSeries.empty()) {
  909. auto time = LastMetric_.LastPoint.GetTime();
  910. auto summary = LastMetric_.SummaryBuilder.Build();
  911. MetricConsumer_->OnSummaryDouble(time, summary);
  912. } else {
  913. for (const auto& p : LastMetric_.TimeSeries) {
  914. DECODE_ENSURE(p.GetValueType() == EMetricValueType::SUMMARY, "Value is not a summary");
  915. MetricConsumer_->OnSummaryDouble(p.GetTime(), p.GetValue().AsSummaryDouble());
  916. }
  917. }
  918. break;
  919. case EMetricType::LOGHIST:
  920. if (LastMetric_.TimeSeries.empty()) {
  921. auto time = LastMetric_.LastPoint.GetTime();
  922. auto logHist = LastMetric_.LogHistBuilder.Build();
  923. MetricConsumer_->OnLogHistogram(time, logHist);
  924. } else {
  925. for (const auto& p : LastMetric_.TimeSeries) {
  926. DECODE_ENSURE(p.GetValueType() == EMetricValueType::LOGHISTOGRAM, "Value is not a log_histogram");
  927. MetricConsumer_->OnLogHistogram(p.GetTime(), p.GetValue().AsLogHistogram());
  928. }
  929. }
  930. break;
  931. case EMetricType::UNKNOWN:
  932. // TODO: output metric labels
  933. ythrow yexception() << "unknown metric type";
  934. }
  935. // (4) end metric
  936. MetricConsumer_->OnMetricEnd();
  937. }
  938. private:
  939. TStringBuf Data_;
  940. IHaltableMetricConsumer* MetricConsumer_;
  941. TString MetricNameLabel_;
  942. TState State_;
  943. TString LastLabelName_;
  944. TMetricCollector LastMetric_;
  945. TString ErrorMsg_;
  946. bool IsIntentionallyHalted_{false};
  947. };
  948. } // namespace
  949. void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
  950. TCommonPartsCollector commonPartsCollector;
  951. {
  952. TMemoryInput memIn(data);
  953. TDecoderJson decoder(data, &commonPartsCollector, metricNameLabel);
  954. // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
  955. NJson::ReadJson(&memIn, &decoder);
  956. }
  957. TCommonPartsProxy commonPartsProxy(std::move(commonPartsCollector.CommonParts()), c);
  958. {
  959. TMemoryInput memIn(data);
  960. TDecoderJson decoder(data, &commonPartsProxy, metricNameLabel);
  961. // no need to check a return value. If there is an error, a TJsonDecodeError is thrown
  962. NJson::ReadJson(&memIn, &decoder);
  963. }
  964. }
  965. #undef DECODE_ENSURE
  966. }