process_data.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "exporting_engine.h"
  3. /**
  4. * Normalize chart and dimension names
  5. *
  6. * Substitute '_' for any special character except '.'.
  7. *
  8. * @param dst where to copy name to.
  9. * @param src where to copy name from.
  10. * @param max_len the maximum size of copied name.
  11. * @return Returns the size of the copied name.
  12. */
  13. size_t exporting_name_copy(char *dst, const char *src, size_t max_len)
  14. {
  15. size_t n;
  16. for (n = 0; *src && n < max_len; dst++, src++, n++) {
  17. char c = *src;
  18. if (c != '.' && !isalnum(c))
  19. *dst = '_';
  20. else
  21. *dst = c;
  22. }
  23. *dst = '\0';
  24. return n;
  25. }
  26. /**
  27. * Mark scheduled instances
  28. *
  29. * Any instance can have its own update interval. On every exporting engine update only those instances are picked,
  30. * which are scheduled for the update.
  31. *
  32. * @param engine an engine data structure.
  33. * @return Returns 1 if there are instances to process
  34. */
  35. int mark_scheduled_instances(struct engine *engine)
  36. {
  37. int instances_were_scheduled = 0;
  38. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  39. if (!instance->disabled && (engine->now % instance->config.update_every >=
  40. instance->config.update_every - localhost->rrd_update_every)) {
  41. instance->scheduled = 1;
  42. instances_were_scheduled = 1;
  43. instance->before = engine->now;
  44. }
  45. }
  46. return instances_were_scheduled;
  47. }
  48. /**
  49. * Calculate the SUM or AVERAGE of a dimension, for any timeframe
  50. *
  51. * May return NAN if the database does not have any value in the give timeframe.
  52. *
  53. * @param instance an instance data structure.
  54. * @param rd a dimension(metric) in the Netdata database.
  55. * @param last_timestamp the timestamp that should be reported to the exporting connector instance.
  56. * @return Returns the value, calculated over the given period.
  57. */
  58. NETDATA_DOUBLE exporting_calculate_value_from_stored_data(
  59. struct instance *instance,
  60. RRDDIM *rd,
  61. time_t *last_timestamp)
  62. {
  63. RRDSET *st = rd->rrdset;
  64. #ifdef NETDATA_INTERNAL_CHECKS
  65. RRDHOST *host = st->rrdhost;
  66. #endif
  67. time_t after = instance->after;
  68. time_t before = instance->before;
  69. // find the edges of the rrd database for this chart
  70. time_t first_t = rd->tiers[0]->query_ops.oldest_time(rd->tiers[0]->db_metric_handle);
  71. time_t last_t = rd->tiers[0]->query_ops.latest_time(rd->tiers[0]->db_metric_handle);
  72. time_t update_every = st->update_every;
  73. struct rrddim_query_handle handle;
  74. // step back a little, to make sure we have complete data collection
  75. // for all metrics
  76. after -= update_every * 2;
  77. before -= update_every * 2;
  78. // align the time-frame
  79. after = after - (after % update_every);
  80. before = before - (before % update_every);
  81. // for before, loose another iteration
  82. // the latest point will be reported the next time
  83. before -= update_every;
  84. if (unlikely(after > before))
  85. // this can happen when update_every > before - after
  86. after = before;
  87. if (unlikely(after < first_t))
  88. after = first_t;
  89. if (unlikely(before > last_t))
  90. before = last_t;
  91. if (unlikely(before < first_t || after > last_t)) {
  92. // the chart has not been updated in the wanted timeframe
  93. debug(
  94. D_EXPORTING,
  95. "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
  96. host->hostname,
  97. st->id,
  98. rd->id,
  99. (unsigned long)after,
  100. (unsigned long)before,
  101. (unsigned long)first_t,
  102. (unsigned long)last_t);
  103. return NAN;
  104. }
  105. *last_timestamp = before;
  106. size_t counter = 0;
  107. NETDATA_DOUBLE sum = 0;
  108. for (rd->tiers[0]->query_ops.init(rd->tiers[0]->db_metric_handle, &handle, after, before, TIER_QUERY_FETCH_SUM); !rd->tiers[0]->query_ops.is_finished(&handle);) {
  109. STORAGE_POINT sp = rd->tiers[0]->query_ops.next_metric(&handle);
  110. if (unlikely(storage_point_is_empty(sp))) {
  111. // not collected
  112. continue;
  113. }
  114. sum += sp.sum;
  115. counter += sp.count;
  116. }
  117. rd->tiers[0]->query_ops.finalize(&handle);
  118. if (unlikely(!counter)) {
  119. debug(
  120. D_EXPORTING,
  121. "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu",
  122. host->hostname,
  123. st->id,
  124. rd->id,
  125. (unsigned long)after,
  126. (unsigned long)before);
  127. return NAN;
  128. }
  129. if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
  130. return sum;
  131. return sum / (NETDATA_DOUBLE)counter;
  132. }
  133. /**
  134. * Start batch formatting for every connector instance's buffer
  135. *
  136. * @param engine an engine data structure.
  137. */
  138. void start_batch_formatting(struct engine *engine)
  139. {
  140. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  141. if (instance->scheduled) {
  142. uv_mutex_lock(&instance->mutex);
  143. if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
  144. error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
  145. disable_instance(instance);
  146. }
  147. }
  148. }
  149. }
  150. /**
  151. * Start host formatting for every connector instance's buffer
  152. *
  153. * @param engine an engine data structure.
  154. * @param host a data collecting host.
  155. */
  156. void start_host_formatting(struct engine *engine, RRDHOST *host)
  157. {
  158. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  159. if (instance->scheduled) {
  160. if (rrdhost_is_exportable(instance, host)) {
  161. if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
  162. error("EXPORTING: cannot start host formatting for %s", instance->config.name);
  163. disable_instance(instance);
  164. }
  165. } else {
  166. instance->skip_host = 1;
  167. }
  168. }
  169. }
  170. }
  171. /**
  172. * Start chart formatting for every connector instance's buffer
  173. *
  174. * @param engine an engine data structure.
  175. * @param st a chart.
  176. */
  177. void start_chart_formatting(struct engine *engine, RRDSET *st)
  178. {
  179. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  180. if (instance->scheduled && !instance->skip_host) {
  181. if (rrdset_is_exportable(instance, st)) {
  182. if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
  183. error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
  184. disable_instance(instance);
  185. }
  186. } else {
  187. instance->skip_chart = 1;
  188. }
  189. }
  190. }
  191. }
  192. /**
  193. * Format metric for every connector instance's buffer
  194. *
  195. * @param engine an engine data structure.
  196. * @param rd a dimension(metric) in the Netdata database.
  197. */
  198. void metric_formatting(struct engine *engine, RRDDIM *rd)
  199. {
  200. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  201. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  202. if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
  203. error("EXPORTING: cannot format metric for %s", instance->config.name);
  204. disable_instance(instance);
  205. continue;
  206. }
  207. instance->stats.buffered_metrics++;
  208. }
  209. }
  210. }
  211. /**
  212. * End chart formatting for every connector instance's buffer
  213. *
  214. * @param engine an engine data structure.
  215. * @param a chart.
  216. */
  217. void end_chart_formatting(struct engine *engine, RRDSET *st)
  218. {
  219. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  220. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  221. if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
  222. error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
  223. disable_instance(instance);
  224. continue;
  225. }
  226. }
  227. instance->skip_chart = 0;
  228. }
  229. }
  230. /**
  231. * Format variables for every connector instance's buffer
  232. *
  233. * @param engine an engine data structure.
  234. * @param host a data collecting host.
  235. */
  236. void variables_formatting(struct engine *engine, RRDHOST *host)
  237. {
  238. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  239. if (instance->scheduled && !instance->skip_host && should_send_variables(instance)) {
  240. if (instance->variables_formatting && instance->variables_formatting(instance, host) != 0){
  241. error("EXPORTING: cannot format variables for %s", instance->config.name);
  242. disable_instance(instance);
  243. continue;
  244. }
  245. // sum all variables as one metrics
  246. instance->stats.buffered_metrics++;
  247. }
  248. }
  249. }
  250. /**
  251. * End host formatting for every connector instance's buffer
  252. *
  253. * @param engine an engine data structure.
  254. * @param host a data collecting host.
  255. */
  256. void end_host_formatting(struct engine *engine, RRDHOST *host)
  257. {
  258. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  259. if (instance->scheduled && !instance->skip_host) {
  260. if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
  261. error("EXPORTING: cannot end host formatting for %s", instance->config.name);
  262. disable_instance(instance);
  263. continue;
  264. }
  265. }
  266. instance->skip_host = 0;
  267. }
  268. }
  269. /**
  270. * End batch formatting for every connector instance's buffer
  271. *
  272. * @param engine an engine data structure.
  273. */
  274. void end_batch_formatting(struct engine *engine)
  275. {
  276. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  277. if (instance->scheduled) {
  278. if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
  279. error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
  280. disable_instance(instance);
  281. continue;
  282. }
  283. uv_mutex_unlock(&instance->mutex);
  284. instance->data_is_ready = 1;
  285. uv_cond_signal(&instance->cond_var);
  286. instance->scheduled = 0;
  287. instance->after = instance->before;
  288. }
  289. }
  290. }
  291. /**
  292. * Prepare buffers
  293. *
  294. * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to
  295. * configured rules.
  296. *
  297. * @param engine an engine data structure.
  298. */
  299. void prepare_buffers(struct engine *engine)
  300. {
  301. netdata_thread_disable_cancelability();
  302. start_batch_formatting(engine);
  303. rrd_rdlock();
  304. RRDHOST *host;
  305. rrdhost_foreach_read(host)
  306. {
  307. rrdhost_rdlock(host);
  308. start_host_formatting(engine, host);
  309. RRDSET *st;
  310. rrdset_foreach_read(st, host)
  311. {
  312. rrdset_rdlock(st);
  313. start_chart_formatting(engine, st);
  314. RRDDIM *rd;
  315. rrddim_foreach_read(rd, st)
  316. metric_formatting(engine, rd);
  317. end_chart_formatting(engine, st);
  318. rrdset_unlock(st);
  319. }
  320. variables_formatting(engine, host);
  321. end_host_formatting(engine, host);
  322. rrdhost_unlock(host);
  323. }
  324. rrd_unlock();
  325. netdata_thread_enable_cancelability();
  326. end_batch_formatting(engine);
  327. }
  328. /**
  329. * Flush a buffer with host labels
  330. *
  331. * @param instance an instance data structure.
  332. * @param host a data collecting host.
  333. * @return Always returns 0.
  334. */
  335. int flush_host_labels(struct instance *instance, RRDHOST *host)
  336. {
  337. (void)host;
  338. if (instance->labels_buffer)
  339. buffer_flush(instance->labels_buffer);
  340. return 0;
  341. }
  342. /**
  343. * End a batch for a simple connector
  344. *
  345. * @param instance an instance data structure.
  346. * @return Returns 0 on success, 1 on failure.
  347. */
  348. int simple_connector_end_batch(struct instance *instance)
  349. {
  350. struct simple_connector_data *simple_connector_data =
  351. (struct simple_connector_data *)instance->connector_specific_data;
  352. struct stats *stats = &instance->stats;
  353. BUFFER *instance_buffer = (BUFFER *)instance->buffer;
  354. struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer;
  355. if (!last_buffer->buffer) {
  356. last_buffer->buffer = buffer_create(0);
  357. }
  358. if (last_buffer->used) {
  359. // ring buffer is full, reuse the oldest element
  360. simple_connector_data->first_buffer = simple_connector_data->first_buffer->next;
  361. stats->data_lost_events++;
  362. stats->lost_metrics += last_buffer->buffered_metrics;
  363. stats->lost_bytes += last_buffer->buffered_bytes;
  364. }
  365. // swap buffers
  366. BUFFER *tmp_buffer = last_buffer->buffer;
  367. last_buffer->buffer = instance_buffer;
  368. instance->buffer = instance_buffer = tmp_buffer;
  369. buffer_flush(instance_buffer);
  370. if (last_buffer->header)
  371. buffer_flush(last_buffer->header);
  372. else
  373. last_buffer->header = buffer_create(0);
  374. if (instance->prepare_header)
  375. instance->prepare_header(instance);
  376. // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number
  377. // of metrics, added in the current iteration, so we are clearing it here. We will use the
  378. // simple_connector_data->total_buffered_metrics in the worker to show the statistics.
  379. size_t buffered_metrics = (size_t)stats->buffered_metrics;
  380. stats->buffered_metrics = 0;
  381. size_t buffered_bytes = buffer_strlen(last_buffer->buffer);
  382. last_buffer->buffered_metrics = buffered_metrics;
  383. last_buffer->buffered_bytes = buffered_bytes;
  384. last_buffer->used++;
  385. simple_connector_data->total_buffered_metrics += buffered_metrics;
  386. stats->buffered_bytes += buffered_bytes;
  387. simple_connector_data->last_buffer = simple_connector_data->last_buffer->next;
  388. return 0;
  389. }