plugin_profile.cc 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifdef __cplusplus
  3. extern "C" {
  4. #endif
  5. #include "daemon/common.h"
  6. #ifdef __cplusplus
  7. }
  8. #endif
  9. #include <random>
  10. #include <thread>
  11. #include <vector>
  12. #define CONFIG_SECTION_PROFILE "plugin:profile"
  13. static void foo(void) {
  14. for (size_t i = 0; i != 22; i++) {
  15. sleep(1);
  16. }
  17. abort();
  18. }
  19. class Generator {
  20. public:
  21. Generator(size_t N) : Offset(0) {
  22. std::random_device RandDev;
  23. std::mt19937 Gen(RandDev());
  24. std::uniform_int_distribution<int> D(-16, 16);
  25. V.reserve(N);
  26. for (size_t Idx = 0; Idx != N; Idx++)
  27. V.push_back(D(Gen));
  28. }
  29. double getRandValue() {
  30. return V[Offset++ % V.size()];
  31. }
  32. private:
  33. size_t Offset;
  34. std::vector<double> V;
  35. };
  36. class Profiler {
  37. public:
  38. Profiler(size_t ID, size_t NumCharts, size_t NumDimsPerChart, time_t SecondsToBackfill, int UpdateEvery) :
  39. ID(ID),
  40. NumCharts(NumCharts),
  41. NumDimsPerChart(NumDimsPerChart),
  42. SecondsToBackfill(SecondsToBackfill),
  43. UpdateEvery(UpdateEvery),
  44. Gen(1024 * 1024)
  45. {}
  46. void create() {
  47. char ChartId[1024];
  48. char DimId[1024];
  49. Charts.reserve(NumCharts);
  50. for (size_t I = 0; I != NumCharts; I++) {
  51. size_t CID = ID + Charts.size() + 1;
  52. snprintfz(ChartId, 1024 - 1, "chart_%zu", CID);
  53. RRDSET *RS = rrdset_create_localhost(
  54. "profile", // type
  55. ChartId, // id
  56. nullptr, // name,
  57. "profile_family", // family
  58. "profile_context", // context
  59. "profile_title", // title
  60. "profile_units", // units
  61. "profile_plugin", // plugin
  62. "profile_module", // module
  63. 12345678 + CID, // priority
  64. UpdateEvery, // update_every
  65. RRDSET_TYPE_LINE // chart_type
  66. );
  67. if (I != 0)
  68. rrdset_flag_set(RS, RRDSET_FLAG_HIDDEN);
  69. Charts.push_back(RS);
  70. Dimensions.reserve(NumDimsPerChart);
  71. for (size_t J = 0; J != NumDimsPerChart; J++) {
  72. snprintfz(DimId, 1024 - 1, "dim_%zu", J);
  73. RRDDIM *RD = rrddim_add(
  74. RS, // st
  75. DimId, // id
  76. nullptr, // name
  77. 1, // multiplier
  78. 1, // divisor
  79. RRD_ALGORITHM_ABSOLUTE // algorithm
  80. );
  81. Dimensions.push_back(RD);
  82. }
  83. }
  84. }
  85. void update(const struct timeval &Now) {
  86. for (RRDSET *RS: Charts) {
  87. for (RRDDIM *RD : Dimensions) {
  88. rrddim_timed_set_by_pointer(RS, RD, Now, Gen.getRandValue());
  89. }
  90. rrdset_timed_done(RS, Now, RS->counter_done != 0);
  91. }
  92. }
  93. void run() {
  94. #define WORKER_JOB_CREATE_CHARTS 0
  95. #define WORKER_JOB_UPDATE_CHARTS 1
  96. #define WORKER_JOB_METRIC_DURATION_TO_BACKFILL 2
  97. #define WORKER_JOB_METRIC_POINTS_BACKFILLED 3
  98. worker_register("PROFILER");
  99. worker_register_job_name(WORKER_JOB_CREATE_CHARTS, "create charts");
  100. worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "update charts");
  101. worker_register_job_custom_metric(WORKER_JOB_METRIC_DURATION_TO_BACKFILL, "duration to backfill", "seconds", WORKER_METRIC_ABSOLUTE);
  102. worker_register_job_custom_metric(WORKER_JOB_METRIC_POINTS_BACKFILLED, "points backfilled", "points", WORKER_METRIC_ABSOLUTE);
  103. heartbeat_t HB;
  104. heartbeat_init(&HB);
  105. worker_is_busy(WORKER_JOB_CREATE_CHARTS);
  106. create();
  107. struct timeval CollectionTV;
  108. now_realtime_timeval(&CollectionTV);
  109. if (SecondsToBackfill) {
  110. CollectionTV.tv_sec -= SecondsToBackfill;
  111. CollectionTV.tv_sec -= (CollectionTV.tv_sec % UpdateEvery);
  112. CollectionTV.tv_usec = 0;
  113. }
  114. size_t BackfilledPoints = 0;
  115. struct timeval NowTV, PrevTV;
  116. now_realtime_timeval(&NowTV);
  117. PrevTV = NowTV;
  118. while (service_running(SERVICE_COLLECTORS)) {
  119. worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
  120. update(CollectionTV);
  121. CollectionTV.tv_sec += UpdateEvery;
  122. now_realtime_timeval(&NowTV);
  123. ++BackfilledPoints;
  124. if (NowTV.tv_sec > PrevTV.tv_sec) {
  125. PrevTV = NowTV;
  126. worker_set_metric(WORKER_JOB_METRIC_POINTS_BACKFILLED, BackfilledPoints * NumCharts * NumDimsPerChart);
  127. BackfilledPoints = 0;
  128. }
  129. size_t RemainingSeconds = (CollectionTV.tv_sec >= NowTV.tv_sec) ? 0 : (NowTV.tv_sec - CollectionTV.tv_sec);
  130. worker_set_metric(WORKER_JOB_METRIC_DURATION_TO_BACKFILL, RemainingSeconds);
  131. if (CollectionTV.tv_sec >= NowTV.tv_sec) {
  132. worker_is_idle();
  133. heartbeat_next(&HB, UpdateEvery * USEC_PER_SEC);
  134. foo();
  135. }
  136. }
  137. }
  138. private:
  139. size_t ID;
  140. size_t NumCharts;
  141. size_t NumDimsPerChart;
  142. size_t SecondsToBackfill;
  143. int UpdateEvery;
  144. Generator Gen;
  145. std::vector<RRDSET *> Charts;
  146. std::vector<RRDDIM *> Dimensions;
  147. };
  148. static void *subprofile_main(void* Arg) {
  149. Profiler *P = reinterpret_cast<Profiler *>(Arg);
  150. P->run();
  151. return nullptr;
  152. }
  153. static void profile_main_cleanup(void *ptr) {
  154. struct netdata_static_thread *static_thread = (struct netdata_static_thread *) ptr;
  155. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  156. netdata_log_info("cleaning up...");
  157. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  158. }
  159. extern "C" void *profile_main(void *ptr) {
  160. netdata_thread_cleanup_push(profile_main_cleanup, ptr);
  161. int UpdateEvery = (int) config_get_number(CONFIG_SECTION_PROFILE, "update every", 1);
  162. if (UpdateEvery < localhost->rrd_update_every)
  163. UpdateEvery = localhost->rrd_update_every;
  164. // pick low-default values, in case this plugin is ever enabled accidentaly.
  165. size_t NumThreads = config_get_number(CONFIG_SECTION_PROFILE, "number of threads", 2);
  166. size_t NumCharts = config_get_number(CONFIG_SECTION_PROFILE, "number of charts", 2);
  167. size_t NumDimsPerChart = config_get_number(CONFIG_SECTION_PROFILE, "number of dimensions per chart", 2);
  168. size_t SecondsToBackfill = config_get_number(CONFIG_SECTION_PROFILE, "seconds to backfill", 10 * 60);
  169. std::vector<Profiler> Profilers;
  170. for (size_t Idx = 0; Idx != NumThreads; Idx++) {
  171. Profiler P(1e8 + Idx * 1e6, NumCharts, NumDimsPerChart, SecondsToBackfill, UpdateEvery);
  172. Profilers.push_back(P);
  173. }
  174. std::vector<netdata_thread_t> Threads(NumThreads);
  175. for (size_t Idx = 0; Idx != NumThreads; Idx++) {
  176. char Tag[NETDATA_THREAD_TAG_MAX + 1];
  177. snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "PROFILER[%zu]", Idx);
  178. netdata_thread_create(&Threads[Idx], Tag, NETDATA_THREAD_OPTION_JOINABLE, subprofile_main, static_cast<void *>(&Profilers[Idx]));
  179. }
  180. for (size_t Idx = 0; Idx != NumThreads; Idx++)
  181. netdata_thread_join(Threads[Idx], nullptr);
  182. netdata_thread_cleanup_pop(1);
  183. return NULL;
  184. }