metrics_registry.cpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. #include "metrics_registry.h"
  2. #include <yql/essentials/providers/common/metrics/protos/metrics_registry.pb.h>
  3. #include <util/generic/hash.h>
  4. #include <util/generic/maybe.h>
  5. #include <util/generic/stack.h>
  6. namespace NYql {
  7. namespace {
  8. //////////////////////////////////////////////////////////////////////////////
  9. // TCountersPhotographer
  10. //////////////////////////////////////////////////////////////////////////////
  11. class TCountersPhotographer: public NMonitoring::ICountableConsumer {
  12. public:
  13. TCountersPhotographer(NProto::TCounterGroup* groupProto, bool invalidate)
  14. : HasAnyCounters_(false)
  15. , Invalidate_(invalidate)
  16. {
  17. GroupsProto_.push(groupProto);
  18. }
  19. bool HasAnyCounters() const {
  20. return HasAnyCounters_;
  21. }
  22. private:
  23. void OnCounter(
  24. const TString& labelName, const TString& labelValue,
  25. const TSensorCounter* counter) override
  26. {
  27. HasAnyCounters_ = true;
  28. auto* counterProto = GroupsProto_.top()->AddCounters();
  29. counterProto->SetDerivative(counter->ForDerivative());
  30. auto counterVal = counter->Val();
  31. counterProto->SetValue(counterVal);
  32. if (Invalidate_) {
  33. const_cast<TSensorCounter*>(counter)->Sub(counterVal);
  34. }
  35. auto* label = counterProto->MutableLabel();
  36. label->SetName(labelName);
  37. label->SetValue(labelValue);
  38. }
  39. void OnHistogram(const TString& labelName, const TString& labelValue, NMonitoring::IHistogramSnapshotPtr snapshot, bool) override {
  40. if (Invalidate_) {
  41. return;
  42. }
  43. auto* counterProto = GroupsProto_.top()->AddCounters();
  44. auto* label = counterProto->MutableLabel();
  45. label->SetName(labelName);
  46. label->SetValue(labelValue);
  47. auto bucketsCount = snapshot->Count();
  48. for (ui32 i = 0; i < bucketsCount; i += 1) {
  49. auto upperBound = snapshot->UpperBound(i);
  50. auto value = snapshot->Value(i);
  51. auto* bucket = counterProto->AddBucket();
  52. bucket->SetUpperBound(upperBound);
  53. bucket->SetValue(value);
  54. }
  55. }
  56. void OnGroupBegin(
  57. const TString& labelName, const TString& labelValue,
  58. const TSensorsGroup*) override
  59. {
  60. if (labelName.empty() && labelValue.empty()) {
  61. // root group is alrady present
  62. return;
  63. }
  64. auto* groupProto = GroupsProto_.top()->AddGroups();
  65. auto* label = groupProto->MutableLabel();
  66. label->SetName(labelName);
  67. label->SetValue(labelValue);
  68. GroupsProto_.push(groupProto);
  69. }
  70. void OnGroupEnd(const TString&, const TString&, const TSensorsGroup*) override {
  71. GroupsProto_.pop();
  72. }
  73. private:
  74. TStack<NProto::TCounterGroup*> GroupsProto_;
  75. bool HasAnyCounters_;
  76. bool Invalidate_;
  77. };
  78. //////////////////////////////////////////////////////////////////////////////
  79. // TMetricsRegistryImpl
  80. //////////////////////////////////////////////////////////////////////////////
  81. class TMetricsRegistryImpl final: public IMetricsRegistry {
  82. public:
  83. TMetricsRegistryImpl(
  84. TSensorsGroupPtr sensors,
  85. TMaybe<TString> userName)
  86. : Sensors_(std::move(sensors))
  87. , UserName_(std::move(userName))
  88. {
  89. }
  90. void SetCounter(
  91. const TString& labelName,
  92. const TString& labelValue,
  93. i64 value,
  94. bool derivative) override
  95. {
  96. if (UserName_) {
  97. // per user counter
  98. auto userCnt = GetCounter(labelName, labelValue, UserName_.Get(),
  99. derivative);
  100. if (userCnt) {
  101. *userCnt = value;
  102. }
  103. return;
  104. }
  105. auto totalCnt = GetCounter(labelName, labelValue, nullptr, derivative);
  106. if (totalCnt) {
  107. *totalCnt = value;
  108. }
  109. }
  110. void IncCounter(
  111. const TString& labelName,
  112. const TString& labelValue,
  113. bool derivative) override
  114. {
  115. // total aggregate counter
  116. auto totalCnt = GetCounter(labelName, labelValue, nullptr, derivative);
  117. if (totalCnt) {
  118. totalCnt->Inc();
  119. }
  120. if (UserName_) {
  121. // per user counter
  122. auto userCnt = GetCounter(labelName, labelValue, UserName_.Get(),
  123. derivative);
  124. if (userCnt) {
  125. userCnt->Inc();
  126. }
  127. }
  128. }
  129. void AddCounter(
  130. const TString& labelName,
  131. const TString& labelValue,
  132. i64 value,
  133. bool derivative) override
  134. {
  135. // total aggregate counter
  136. auto totalCnt = GetCounter(labelName, labelValue, nullptr, derivative);
  137. if (totalCnt) {
  138. totalCnt->Add(value);
  139. }
  140. if (UserName_) {
  141. // per user counter
  142. auto userCnt = GetCounter(labelName, labelValue, UserName_.Get(),
  143. derivative);
  144. if (userCnt) {
  145. userCnt->Add(value);
  146. }
  147. }
  148. }
  149. bool TakeSnapshot(NProto::TMetricsRegistrySnapshot* snapshot) const override {
  150. bool hasRootGroupBefore = snapshot->HasRootGroup();
  151. TCountersPhotographer photographer(snapshot->MutableRootGroup(), snapshot->GetDontIncrement() == false);
  152. Sensors_->Accept(TString(), TString(), photographer);
  153. if (!photographer.HasAnyCounters() && !hasRootGroupBefore) {
  154. // remove prematurely allocated group
  155. snapshot->ClearRootGroup();
  156. return false;
  157. }
  158. return true;
  159. }
  160. void MergeSnapshot(const NProto::TMetricsRegistrySnapshot& snapshot) override {
  161. MergeFromGroupProto(
  162. snapshot.HasMergeToRoot()
  163. ? GetSensorsRootGroup().Get()
  164. : Sensors_.Get(),
  165. snapshot.GetRootGroup(),
  166. snapshot.HasDontIncrement()
  167. ? snapshot.GetDontIncrement()
  168. : false);
  169. }
  170. IMetricsRegistryPtr Personalized(const TString& userName) const override {
  171. return new TMetricsRegistryImpl(Sensors_, MakeMaybe(userName));
  172. }
  173. void Flush() override {
  174. // do nothing
  175. }
  176. TSensorsGroupPtr GetSensors() override {
  177. return Sensors_.Get();
  178. }
  179. private:
  180. TSensorCounterPtr GetCounter(
  181. const TString& labelName,
  182. const TString& labelValue,
  183. const TString* userName,
  184. bool derivative)
  185. {
  186. static const TString USER("user");
  187. static const TString USER_ABSOLUTE("user_absolute");
  188. static const TString TOTAL("total");
  189. const TString& userGroup = derivative ? USER : USER_ABSOLUTE;
  190. return Sensors_
  191. ->GetSubgroup(userGroup, userName ? *userName : TOTAL)
  192. ->GetNamedCounter(labelName, labelValue, derivative);
  193. }
  194. void MergeFromGroupProto(
  195. TSensorsGroup* group, const NProto::TCounterGroup& groupProto, bool asIs)
  196. {
  197. for (const auto& counterProto: groupProto.GetCounters()) {
  198. const auto& label = counterProto.GetLabel();
  199. if (!counterProto.GetBucket().empty()) {
  200. NMonitoring::TBucketBounds bounds;
  201. auto histSnapshot = NMonitoring::TExplicitHistogramSnapshot::New(counterProto.GetBucket().size());
  202. bounds.reserve(counterProto.GetBucket().size());
  203. int i = 0;
  204. for (const auto& b : counterProto.GetBucket()) {
  205. if (i < counterProto.GetBucket().size() - 1) {
  206. // skip inf
  207. bounds.push_back(b.GetUpperBound());
  208. }
  209. (*histSnapshot)[i].first = b.GetUpperBound();
  210. (*histSnapshot)[i].second = b.GetValue();
  211. i += 1;
  212. }
  213. auto collector = NMonitoring::ExplicitHistogram(bounds).Release();
  214. auto histogram = group->GetNamedHistogram(
  215. label.GetName(), label.GetValue(),
  216. THolder(collector));
  217. Histograms.insert(std::make_pair(histogram, collector));
  218. Histograms[histogram]->Collect(*histSnapshot);
  219. } else {
  220. auto counter = group->GetNamedCounter(
  221. label.GetName(), label.GetValue(),
  222. counterProto.GetDerivative());
  223. if (asIs) {
  224. *counter = counterProto.GetValue();
  225. } else {
  226. *counter += counterProto.GetValue();
  227. }
  228. }
  229. }
  230. for (const auto& subGroupProto: groupProto.GetGroups()) {
  231. const auto& label = subGroupProto.GetLabel();
  232. auto subGroup = group->GetSubgroup(
  233. label.GetName(), label.GetValue());
  234. MergeFromGroupProto(subGroup.Get(), subGroupProto, asIs);
  235. }
  236. }
  237. private:
  238. TSensorsGroupPtr Sensors_;
  239. const TMaybe<TString> UserName_;
  240. THashMap<NMonitoring::THistogramPtr, NMonitoring::IHistogramCollector*> Histograms;
  241. };
  242. } // namespace
  243. IMetricsRegistryPtr CreateMetricsRegistry(TSensorsGroupPtr sensors) {
  244. return new TMetricsRegistryImpl(std::move(sensors), Nothing());
  245. }
  246. } // namespace NYql