process_data.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "exporting_engine.h"
  3. /**
  4. * Normalize chart and dimension names
  5. *
  6. * Substitute '_' for any special character except '.'.
  7. *
  8. * @param dst where to copy name to.
  9. * @param src where to copy name from.
  10. * @param max_len the maximum size of copied name.
  11. * @return Returns the size of the copied name.
  12. */
  13. size_t exporting_name_copy(char *dst, const char *src, size_t max_len)
  14. {
  15. size_t n;
  16. for (n = 0; *src && n < max_len; dst++, src++, n++) {
  17. char c = *src;
  18. if (c != '.' && !isalnum(c))
  19. *dst = '_';
  20. else
  21. *dst = c;
  22. }
  23. *dst = '\0';
  24. return n;
  25. }
  26. /**
  27. * Mark scheduled instances
  28. *
  29. * Any instance can have its own update interval. On every exporting engine update only those instances are picked,
  30. * which are scheduled for the update.
  31. *
  32. * @param engine an engine data structure.
  33. * @return Returns 1 if there are instances to process
  34. */
  35. int mark_scheduled_instances(struct engine *engine)
  36. {
  37. int instances_were_scheduled = 0;
  38. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  39. if (!instance->disabled && (engine->now % instance->config.update_every < localhost->rrd_update_every)) {
  40. instance->scheduled = 1;
  41. instances_were_scheduled = 1;
  42. instance->before = engine->now;
  43. }
  44. }
  45. return instances_were_scheduled;
  46. }
  47. /**
  48. * Calculate the SUM or AVERAGE of a dimension, for any timeframe
  49. *
  50. * May return NAN if the database does not have any value in the give timeframe.
  51. *
  52. * @param instance an instance data structure.
  53. * @param rd a dimension(metric) in the Netdata database.
  54. * @param last_timestamp the timestamp that should be reported to the exporting connector instance.
  55. * @return Returns the value, calculated over the given period.
  56. */
  57. calculated_number exporting_calculate_value_from_stored_data(
  58. struct instance *instance,
  59. RRDDIM *rd,
  60. time_t *last_timestamp)
  61. {
  62. RRDSET *st = rd->rrdset;
  63. RRDHOST *host = st->rrdhost;
  64. time_t after = instance->after;
  65. time_t before = instance->before;
  66. // find the edges of the rrd database for this chart
  67. time_t first_t = rd->state->query_ops.oldest_time(rd);
  68. time_t last_t = rd->state->query_ops.latest_time(rd);
  69. time_t update_every = st->update_every;
  70. struct rrddim_query_handle handle;
  71. storage_number n;
  72. // step back a little, to make sure we have complete data collection
  73. // for all metrics
  74. after -= update_every * 2;
  75. before -= update_every * 2;
  76. // align the time-frame
  77. after = after - (after % update_every);
  78. before = before - (before % update_every);
  79. // for before, loose another iteration
  80. // the latest point will be reported the next time
  81. before -= update_every;
  82. if (unlikely(after > before))
  83. // this can happen when update_every > before - after
  84. after = before;
  85. if (unlikely(after < first_t))
  86. after = first_t;
  87. if (unlikely(before > last_t))
  88. before = last_t;
  89. if (unlikely(before < first_t || after > last_t)) {
  90. // the chart has not been updated in the wanted timeframe
  91. debug(
  92. D_BACKEND,
  93. "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
  94. host->hostname,
  95. st->id,
  96. rd->id,
  97. (unsigned long)after,
  98. (unsigned long)before,
  99. (unsigned long)first_t,
  100. (unsigned long)last_t);
  101. return NAN;
  102. }
  103. *last_timestamp = before;
  104. size_t counter = 0;
  105. calculated_number sum = 0;
  106. for (rd->state->query_ops.init(rd, &handle, after, before); !rd->state->query_ops.is_finished(&handle);) {
  107. time_t curr_t;
  108. n = rd->state->query_ops.next_metric(&handle, &curr_t);
  109. if (unlikely(!does_storage_number_exist(n))) {
  110. // not collected
  111. continue;
  112. }
  113. calculated_number value = unpack_storage_number(n);
  114. sum += value;
  115. counter++;
  116. }
  117. rd->state->query_ops.finalize(&handle);
  118. if (unlikely(!counter)) {
  119. debug(
  120. D_BACKEND,
  121. "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu",
  122. host->hostname,
  123. st->id,
  124. rd->id,
  125. (unsigned long)after,
  126. (unsigned long)before);
  127. return NAN;
  128. }
  129. if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
  130. return sum;
  131. return sum / (calculated_number)counter;
  132. }
  133. /**
  134. * Start batch formatting for every connector instance's buffer
  135. *
  136. * @param engine an engine data structure.
  137. */
  138. void start_batch_formatting(struct engine *engine)
  139. {
  140. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  141. if (instance->scheduled) {
  142. uv_mutex_lock(&instance->mutex);
  143. if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
  144. error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
  145. disable_instance(instance);
  146. }
  147. }
  148. }
  149. }
  150. /**
  151. * Start host formatting for every connector instance's buffer
  152. *
  153. * @param engine an engine data structure.
  154. * @param host a data collecting host.
  155. */
  156. void start_host_formatting(struct engine *engine, RRDHOST *host)
  157. {
  158. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  159. if (instance->scheduled) {
  160. if (rrdhost_is_exportable(instance, host)) {
  161. if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
  162. error("EXPORTING: cannot start host formatting for %s", instance->config.name);
  163. disable_instance(instance);
  164. }
  165. } else {
  166. instance->skip_host = 1;
  167. }
  168. }
  169. }
  170. }
  171. /**
  172. * Start chart formatting for every connector instance's buffer
  173. *
  174. * @param engine an engine data structure.
  175. * @param st a chart.
  176. */
  177. void start_chart_formatting(struct engine *engine, RRDSET *st)
  178. {
  179. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  180. if (instance->scheduled && !instance->skip_host) {
  181. if (rrdset_is_exportable(instance, st)) {
  182. if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
  183. error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
  184. disable_instance(instance);
  185. }
  186. } else {
  187. instance->skip_chart = 1;
  188. }
  189. }
  190. }
  191. }
  192. /**
  193. * Format metric for every connector instance's buffer
  194. *
  195. * @param engine an engine data structure.
  196. * @param rd a dimension(metric) in the Netdata database.
  197. */
  198. void metric_formatting(struct engine *engine, RRDDIM *rd)
  199. {
  200. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  201. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  202. if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
  203. error("EXPORTING: cannot format metric for %s", instance->config.name);
  204. disable_instance(instance);
  205. continue;
  206. }
  207. instance->stats.buffered_metrics++;
  208. }
  209. }
  210. }
  211. /**
  212. * End chart formatting for every connector instance's buffer
  213. *
  214. * @param engine an engine data structure.
  215. * @param a chart.
  216. */
  217. void end_chart_formatting(struct engine *engine, RRDSET *st)
  218. {
  219. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  220. if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
  221. if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
  222. error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
  223. disable_instance(instance);
  224. continue;
  225. }
  226. }
  227. instance->skip_chart = 0;
  228. }
  229. }
  230. /**
  231. * End host formatting for every connector instance's buffer
  232. *
  233. * @param engine an engine data structure.
  234. * @param host a data collecting host.
  235. */
  236. void end_host_formatting(struct engine *engine, RRDHOST *host)
  237. {
  238. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  239. if (instance->scheduled && !instance->skip_host) {
  240. if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
  241. error("EXPORTING: cannot end host formatting for %s", instance->config.name);
  242. disable_instance(instance);
  243. continue;
  244. }
  245. }
  246. instance->skip_host = 0;
  247. }
  248. }
  249. /**
  250. * End batch formatting for every connector instance's buffer
  251. *
  252. * @param engine an engine data structure.
  253. */
  254. void end_batch_formatting(struct engine *engine)
  255. {
  256. for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
  257. if (instance->scheduled) {
  258. if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
  259. error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
  260. disable_instance(instance);
  261. continue;
  262. }
  263. uv_mutex_unlock(&instance->mutex);
  264. uv_cond_signal(&instance->cond_var);
  265. instance->scheduled = 0;
  266. instance->after = instance->before;
  267. }
  268. }
  269. }
  270. /**
  271. * Prepare buffers
  272. *
  273. * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to
  274. * configured rules.
  275. *
  276. * @param engine an engine data structure.
  277. */
  278. void prepare_buffers(struct engine *engine)
  279. {
  280. netdata_thread_disable_cancelability();
  281. start_batch_formatting(engine);
  282. rrd_rdlock();
  283. RRDHOST *host;
  284. rrdhost_foreach_read(host)
  285. {
  286. rrdhost_rdlock(host);
  287. start_host_formatting(engine, host);
  288. RRDSET *st;
  289. rrdset_foreach_read(st, host)
  290. {
  291. rrdset_rdlock(st);
  292. start_chart_formatting(engine, st);
  293. RRDDIM *rd;
  294. rrddim_foreach_read(rd, st)
  295. metric_formatting(engine, rd);
  296. end_chart_formatting(engine, st);
  297. rrdset_unlock(st);
  298. }
  299. end_host_formatting(engine, host);
  300. rrdhost_unlock(host);
  301. }
  302. rrd_unlock();
  303. netdata_thread_enable_cancelability();
  304. end_batch_formatting(engine);
  305. }
  306. /**
  307. * Flush a buffer with host labels
  308. *
  309. * @param instance an instance data structure.
  310. * @param host a data collecting host.
  311. * @return Always returns 0.
  312. */
  313. int flush_host_labels(struct instance *instance, RRDHOST *host)
  314. {
  315. (void)host;
  316. if (instance->labels)
  317. buffer_flush(instance->labels);
  318. return 0;
  319. }
  320. /**
  321. * Update stats for buffered bytes
  322. *
  323. * @param instance an instance data structure.
  324. * @return Always returns 0.
  325. */
  326. int simple_connector_update_buffered_bytes(struct instance *instance)
  327. {
  328. instance->stats.buffered_bytes = (collected_number)buffer_strlen((BUFFER *)(instance->buffer));
  329. return 0;
  330. }