process_data.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "exporting_engine.h"
  3. /**
  4. * Normalize chart and dimension names
  5. *
  6. * Substitute '_' for any special character except '.'.
  7. *
  8. * @param dst where to copy name to.
  9. * @param src where to copy name from.
  10. * @param max_len the maximum size of copied name.
  11. * @return Returns the size of the copied name.
  12. */
  13. size_t exporting_name_copy(char *dst, const char *src, size_t max_len)
  14. {
  15. size_t n;
  16. for (n = 0; *src && n < max_len; dst++, src++, n++) {
  17. char c = *src;
  18. if (c != '.' && !isalnum(c))
  19. *dst = '_';
  20. else
  21. *dst = c;
  22. }
  23. *dst = '\0';
  24. return n;
  25. }
  26. /**
  27. * Mark scheduled instances
  28. *
  29. * Any instance can have its own update interval. On every exporting engine update only those instances are picked,
  30. * which are scheduled for the update.
  31. *
  32. * @param engine an engine data structure.
  33. * @return Returns 1 if there are instances to process
  34. */
  35. int mark_scheduled_instances(struct engine *engine)
  36. {
  37. int instances_were_scheduled = 0;
  38. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  39. if (!instance->disabled && (engine->now % instance->config.update_every >=
  40. instance->config.update_every - localhost->rrd_update_every)) {
  41. instance->scheduled = 1;
  42. instances_were_scheduled = 1;
  43. instance->before = engine->now;
  44. }
  45. }
  46. return instances_were_scheduled;
  47. }
  48. /**
  49. * Calculate the SUM or AVERAGE of a dimension, for any timeframe
  50. *
  51. * May return NAN if the database does not have any value in the give timeframe.
  52. *
  53. * @param instance an instance data structure.
  54. * @param rd a dimension(metric) in the Netdata database.
  55. * @param last_timestamp the timestamp that should be reported to the exporting connector instance.
  56. * @return Returns the value, calculated over the given period.
  57. */
  58. NETDATA_DOUBLE exporting_calculate_value_from_stored_data(
  59. struct instance *instance,
  60. RRDDIM *rd,
  61. time_t *last_timestamp)
  62. {
  63. RRDSET *st = rd->rrdset;
  64. #ifdef NETDATA_INTERNAL_CHECKS
  65. RRDHOST *host = st->rrdhost;
  66. #endif
  67. time_t after = instance->after;
  68. time_t before = instance->before;
  69. // find the edges of the rrd database for this chart
  70. time_t first_t = storage_engine_oldest_time_s(rd->tiers[0].seb, rd->tiers[0].smh);
  71. time_t last_t = storage_engine_latest_time_s(rd->tiers[0].seb, rd->tiers[0].smh);
  72. time_t update_every = st->update_every;
  73. struct storage_engine_query_handle handle;
  74. // step back a little, to make sure we have complete data collection
  75. // for all metrics
  76. after -= update_every * 2;
  77. before -= update_every * 2;
  78. // align the time-frame
  79. after = after - (after % update_every);
  80. before = before - (before % update_every);
  81. // for before, loose another iteration
  82. // the latest point will be reported the next time
  83. before -= update_every;
  84. if (unlikely(after > before))
  85. // this can happen when update_every > before - after
  86. after = before;
  87. if (unlikely(after < first_t))
  88. after = first_t;
  89. if (unlikely(before > last_t))
  90. before = last_t;
  91. if (unlikely(before < first_t || after > last_t)) {
  92. // the chart has not been updated in the wanted timeframe
  93. netdata_log_debug(
  94. D_EXPORTING,
  95. "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
  96. rrdhost_hostname(host),
  97. rrdset_id(st),
  98. rrddim_id(rd),
  99. (unsigned long)after,
  100. (unsigned long)before,
  101. (unsigned long)first_t,
  102. (unsigned long)last_t);
  103. return NAN;
  104. }
  105. *last_timestamp = before;
  106. size_t points_read = 0;
  107. size_t counter = 0;
  108. NETDATA_DOUBLE sum = 0;
  109. for (storage_engine_query_init(rd->tiers[0].seb, rd->tiers[0].smh, &handle, after, before, STORAGE_PRIORITY_SYNCHRONOUS); !storage_engine_query_is_finished(&handle);) {
  110. STORAGE_POINT sp = storage_engine_query_next_metric(&handle);
  111. points_read++;
  112. if (unlikely(storage_point_is_gap(sp))) {
  113. // not collected
  114. continue;
  115. }
  116. sum += sp.sum;
  117. counter += sp.count;
  118. }
  119. storage_engine_query_finalize(&handle);
  120. global_statistics_exporters_query_completed(points_read);
  121. if (unlikely(!counter)) {
  122. netdata_log_debug(
  123. D_EXPORTING,
  124. "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu",
  125. rrdhost_hostname(host),
  126. rrdset_id(st),
  127. rrddim_id(rd),
  128. (unsigned long)after,
  129. (unsigned long)before);
  130. return NAN;
  131. }
  132. if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
  133. return sum;
  134. return sum / (NETDATA_DOUBLE)counter;
  135. }
  136. /**
  137. * Start batch formatting for every connector instance's buffer
  138. *
  139. * @param engine an engine data structure.
  140. */
  141. void start_batch_formatting(struct engine *engine)
  142. {
  143. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  144. if (instance->scheduled) {
  145. uv_mutex_lock(&instance->mutex);
  146. if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
  147. netdata_log_error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
  148. disable_instance(instance);
  149. }
  150. }
  151. }
  152. }
  153. /**
  154. * Start host formatting for every connector instance's buffer
  155. *
  156. * @param engine an engine data structure.
  157. * @param host a data collecting host.
  158. */
  159. void start_host_formatting(struct engine *engine, RRDHOST *host)
  160. {
  161. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  162. if (instance->scheduled) {
  163. if (rrdhost_is_exportable(instance, host)) {
  164. if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
  165. netdata_log_error("EXPORTING: cannot start host formatting for %s", instance->config.name);
  166. disable_instance(instance);
  167. }
  168. } else {
  169. instance->skip_host = 1;
  170. }
  171. }
  172. }
  173. }
  174. /**
  175. * Start chart formatting for every connector instance's buffer
  176. *
  177. * @param engine an engine data structure.
  178. * @param st a chart.
  179. */
  180. void start_chart_formatting(struct engine *engine, RRDSET *st)
  181. {
  182. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  183. if (instance->scheduled && !instance->skip_host) {
  184. if (rrdset_is_exportable(instance, st)) {
  185. if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
  186. netdata_log_error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
  187. disable_instance(instance);
  188. }
  189. } else {
  190. instance->skip_chart = 1;
  191. }
  192. }
  193. }
  194. }
  195. /**
  196. * Format metric for every connector instance's buffer
  197. *
  198. * @param engine an engine data structure.
  199. * @param rd a dimension(metric) in the Netdata database.
  200. */
  201. void metric_formatting(struct engine *engine, RRDDIM *rd)
  202. {
  203. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  204. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  205. if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
  206. netdata_log_error("EXPORTING: cannot format metric for %s", instance->config.name);
  207. disable_instance(instance);
  208. continue;
  209. }
  210. instance->stats.buffered_metrics++;
  211. }
  212. }
  213. }
  214. /**
  215. * End chart formatting for every connector instance's buffer
  216. *
  217. * @param engine an engine data structure.
  218. * @param a chart.
  219. */
  220. void end_chart_formatting(struct engine *engine, RRDSET *st)
  221. {
  222. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  223. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  224. if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
  225. netdata_log_error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
  226. disable_instance(instance);
  227. continue;
  228. }
  229. }
  230. instance->skip_chart = 0;
  231. }
  232. }
  233. /**
  234. * Format variables for every connector instance's buffer
  235. *
  236. * @param engine an engine data structure.
  237. * @param host a data collecting host.
  238. */
  239. void variables_formatting(struct engine *engine, RRDHOST *host)
  240. {
  241. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  242. if (instance->scheduled && !instance->skip_host && should_send_variables(instance)) {
  243. if (instance->variables_formatting && instance->variables_formatting(instance, host) != 0){
  244. netdata_log_error("EXPORTING: cannot format variables for %s", instance->config.name);
  245. disable_instance(instance);
  246. continue;
  247. }
  248. // sum all variables as one metrics
  249. instance->stats.buffered_metrics++;
  250. }
  251. }
  252. }
  253. /**
  254. * End host formatting for every connector instance's buffer
  255. *
  256. * @param engine an engine data structure.
  257. * @param host a data collecting host.
  258. */
  259. void end_host_formatting(struct engine *engine, RRDHOST *host)
  260. {
  261. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  262. if (instance->scheduled && !instance->skip_host) {
  263. if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
  264. netdata_log_error("EXPORTING: cannot end host formatting for %s", instance->config.name);
  265. disable_instance(instance);
  266. continue;
  267. }
  268. }
  269. instance->skip_host = 0;
  270. }
  271. }
  272. /**
  273. * End batch formatting for every connector instance's buffer
  274. *
  275. * @param engine an engine data structure.
  276. */
  277. void end_batch_formatting(struct engine *engine)
  278. {
  279. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  280. if (instance->scheduled) {
  281. if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
  282. netdata_log_error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
  283. disable_instance(instance);
  284. continue;
  285. }
  286. uv_mutex_unlock(&instance->mutex);
  287. instance->data_is_ready = 1;
  288. uv_cond_signal(&instance->cond_var);
  289. instance->scheduled = 0;
  290. instance->after = instance->before;
  291. }
  292. }
  293. }
  294. /**
  295. * Prepare buffers
  296. *
  297. * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to
  298. * configured rules.
  299. *
  300. * @param engine an engine data structure.
  301. */
  302. void prepare_buffers(struct engine *engine)
  303. {
  304. netdata_thread_disable_cancelability();
  305. start_batch_formatting(engine);
  306. rrd_rdlock();
  307. RRDHOST *host;
  308. rrdhost_foreach_read(host) {
  309. start_host_formatting(engine, host);
  310. RRDSET *st;
  311. rrdset_foreach_read(st, host) {
  312. start_chart_formatting(engine, st);
  313. RRDDIM *rd;
  314. rrddim_foreach_read(rd, st)
  315. metric_formatting(engine, rd);
  316. rrddim_foreach_done(rd);
  317. end_chart_formatting(engine, st);
  318. }
  319. rrdset_foreach_done(st);
  320. variables_formatting(engine, host);
  321. end_host_formatting(engine, host);
  322. }
  323. rrd_unlock();
  324. netdata_thread_enable_cancelability();
  325. end_batch_formatting(engine);
  326. }
  327. /**
  328. * Flush a buffer with host labels
  329. *
  330. * @param instance an instance data structure.
  331. * @param host a data collecting host.
  332. * @return Always returns 0.
  333. */
  334. int flush_host_labels(struct instance *instance, RRDHOST *host)
  335. {
  336. (void)host;
  337. if (instance->labels_buffer)
  338. buffer_flush(instance->labels_buffer);
  339. return 0;
  340. }
  341. /**
  342. * End a batch for a simple connector
  343. *
  344. * @param instance an instance data structure.
  345. * @return Returns 0 on success, 1 on failure.
  346. */
  347. int simple_connector_end_batch(struct instance *instance)
  348. {
  349. struct simple_connector_data *simple_connector_data =
  350. (struct simple_connector_data *)instance->connector_specific_data;
  351. struct stats *stats = &instance->stats;
  352. BUFFER *instance_buffer = (BUFFER *)instance->buffer;
  353. struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer;
  354. if (!last_buffer->buffer) {
  355. last_buffer->buffer = buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  356. }
  357. if (last_buffer->used) {
  358. // ring buffer is full, reuse the oldest element
  359. simple_connector_data->first_buffer = simple_connector_data->first_buffer->next;
  360. stats->data_lost_events++;
  361. stats->lost_metrics += last_buffer->buffered_metrics;
  362. stats->lost_bytes += last_buffer->buffered_bytes;
  363. }
  364. // swap buffers
  365. BUFFER *tmp_buffer = last_buffer->buffer;
  366. last_buffer->buffer = instance_buffer;
  367. instance->buffer = instance_buffer = tmp_buffer;
  368. buffer_flush(instance_buffer);
  369. if (last_buffer->header)
  370. buffer_flush(last_buffer->header);
  371. else
  372. last_buffer->header = buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  373. if (instance->prepare_header)
  374. instance->prepare_header(instance);
  375. // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number
  376. // of metrics, added in the current iteration, so we are clearing it here. We will use the
  377. // simple_connector_data->total_buffered_metrics in the worker to show the statistics.
  378. size_t buffered_metrics = (size_t)stats->buffered_metrics;
  379. stats->buffered_metrics = 0;
  380. size_t buffered_bytes = buffer_strlen(last_buffer->buffer);
  381. last_buffer->buffered_metrics = buffered_metrics;
  382. last_buffer->buffered_bytes = buffered_bytes;
  383. last_buffer->used++;
  384. simple_connector_data->total_buffered_metrics += buffered_metrics;
  385. stats->buffered_bytes += buffered_bytes;
  386. simple_connector_data->last_buffer = simple_connector_data->last_buffer->next;
  387. return 0;
  388. }