remote_write.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "remote_write.h"
  3. static int as_collected;
  4. static int homogeneous;
  5. char context[PROMETHEUS_ELEMENT_MAX + 1];
  6. char chart[PROMETHEUS_ELEMENT_MAX + 1];
  7. char family[PROMETHEUS_ELEMENT_MAX + 1];
  8. char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
  9. /**
  10. * Prepare HTTP header
  11. *
  12. * @param instance an instance data structure.
  13. */
  14. void prometheus_remote_write_prepare_header(struct instance *instance)
  15. {
  16. struct prometheus_remote_write_specific_config *connector_specific_config =
  17. instance->config.connector_specific_config;
  18. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  19. buffer_sprintf(
  20. simple_connector_data->last_buffer->header,
  21. "POST %s HTTP/1.1\r\n"
  22. "Host: %s\r\n"
  23. "Accept: */*\r\n"
  24. "%s"
  25. "Content-Encoding: snappy\r\n"
  26. "Content-Type: application/x-protobuf\r\n"
  27. "X-Prometheus-Remote-Write-Version: 0.1.0\r\n"
  28. "Content-Length: %zu\r\n"
  29. "\r\n",
  30. connector_specific_config->remote_write_path,
  31. simple_connector_data->connected_to,
  32. simple_connector_data->auth_string ? simple_connector_data->auth_string : "",
  33. buffer_strlen(simple_connector_data->last_buffer->buffer));
  34. }
  35. /**
  36. * Process a response received after Prometheus remote write connector had sent data
  37. *
  38. * @param buffer a response from a remote service.
  39. * @param instance an instance data structure.
  40. * @return Returns 0 on success, 1 on failure.
  41. */
  42. int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance)
  43. {
  44. if (unlikely(!buffer))
  45. return 1;
  46. const char *s = buffer_tostring(buffer);
  47. int len = buffer_strlen(buffer);
  48. // do nothing with HTTP responses 200 or 204
  49. while (!isspace(*s) && len) {
  50. s++;
  51. len--;
  52. }
  53. s++;
  54. len--;
  55. if (likely(len > 4 && (!strncmp(s, "200 ", 4) || !strncmp(s, "204 ", 4))))
  56. return 0;
  57. else
  58. return exporting_discard_response(buffer, instance);
  59. }
  60. /**
  61. * Release specific data allocated.
  62. *
  63. * @param instance an instance data structure.
  64. */
  65. void clean_prometheus_remote_write(struct instance *instance)
  66. {
  67. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  68. freez(simple_connector_data->connector_specific_data);
  69. struct prometheus_remote_write_specific_config *connector_specific_config =
  70. instance->config.connector_specific_config;
  71. freez(connector_specific_config->remote_write_path);
  72. }
  73. /**
  74. * Initialize Prometheus Remote Write connector instance
  75. *
  76. * @param instance an instance data structure.
  77. * @return Returns 0 on success, 1 on failure.
  78. */
  79. int init_prometheus_remote_write_instance(struct instance *instance)
  80. {
  81. instance->worker = simple_connector_worker;
  82. instance->start_batch_formatting = NULL;
  83. instance->start_host_formatting = format_host_prometheus_remote_write;
  84. instance->start_chart_formatting = format_chart_prometheus_remote_write;
  85. instance->metric_formatting = format_dimension_prometheus_remote_write;
  86. instance->end_chart_formatting = NULL;
  87. instance->variables_formatting = format_variables_prometheus_remote_write;
  88. instance->end_host_formatting = NULL;
  89. instance->end_batch_formatting = format_batch_prometheus_remote_write;
  90. instance->prepare_header = prometheus_remote_write_prepare_header;
  91. instance->check_response = process_prometheus_remote_write_response;
  92. instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  93. if (uv_mutex_init(&instance->mutex))
  94. return 1;
  95. if (uv_cond_init(&instance->cond_var))
  96. return 1;
  97. struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
  98. instance->connector_specific_data = simple_connector_data;
  99. #ifdef ENABLE_HTTPS
  100. simple_connector_data->ssl = NETDATA_SSL_UNSET_CONNECTION;
  101. if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
  102. netdata_ssl_initialize_ctx(NETDATA_SSL_EXPORTING_CTX);
  103. }
  104. #endif
  105. struct prometheus_remote_write_specific_data *connector_specific_data =
  106. callocz(1, sizeof(struct prometheus_remote_write_specific_data));
  107. simple_connector_data->connector_specific_data = (void *)connector_specific_data;
  108. simple_connector_init(instance);
  109. connector_specific_data->write_request = init_write_request();
  110. instance->engine->protocol_buffers_initialized = 1;
  111. return 0;
  112. }
  113. struct format_remote_write_label_callback {
  114. struct instance *instance;
  115. void *write_request;
  116. };
  117. static int format_remote_write_label_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  118. struct format_remote_write_label_callback *d = (struct format_remote_write_label_callback *)data;
  119. if (!should_send_label(d->instance, ls)) return 0;
  120. char k[PROMETHEUS_ELEMENT_MAX + 1];
  121. char v[PROMETHEUS_ELEMENT_MAX + 1];
  122. prometheus_name_copy(k, name, PROMETHEUS_ELEMENT_MAX);
  123. prometheus_label_copy(v, value, PROMETHEUS_ELEMENT_MAX);
  124. add_label(d->write_request, k, v);
  125. return 1;
  126. }
  127. /**
  128. * Format host data for Prometheus Remote Write connector
  129. *
  130. * @param instance an instance data structure.
  131. * @param host a data collecting host.
  132. * @return Always returns 0.
  133. */
  134. int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  135. {
  136. struct simple_connector_data *simple_connector_data =
  137. (struct simple_connector_data *)instance->connector_specific_data;
  138. struct prometheus_remote_write_specific_data *connector_specific_data =
  139. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  140. char hostname[PROMETHEUS_ELEMENT_MAX + 1];
  141. prometheus_label_copy(
  142. hostname,
  143. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  144. PROMETHEUS_ELEMENT_MAX);
  145. add_host_info(
  146. connector_specific_data->write_request,
  147. "netdata_info", hostname, rrdhost_program_name(host), rrdhost_program_version(host), now_realtime_usec() / USEC_PER_MS);
  148. if (unlikely(sending_labels_configured(instance))) {
  149. struct format_remote_write_label_callback tmp = {
  150. .write_request = connector_specific_data->write_request,
  151. .instance = instance
  152. };
  153. rrdlabels_walkthrough_read(host->rrdlabels, format_remote_write_label_callback, &tmp);
  154. }
  155. return 0;
  156. }
  157. /**
  158. * Format chart data for Prometheus Remote Write connector
  159. *
  160. * @param instance an instance data structure.
  161. * @param st a chart.
  162. * @return Always returns 0.
  163. */
  164. int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st)
  165. {
  166. prometheus_label_copy(
  167. chart,
  168. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? rrdset_name(st) : rrdset_id(st),
  169. PROMETHEUS_ELEMENT_MAX);
  170. prometheus_label_copy(family, rrdset_family(st), PROMETHEUS_ELEMENT_MAX);
  171. prometheus_name_copy(context, rrdset_context(st), PROMETHEUS_ELEMENT_MAX);
  172. as_collected = (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED);
  173. homogeneous = 1;
  174. if (as_collected) {
  175. if (rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
  176. rrdset_update_heterogeneous_flag(st);
  177. if (rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
  178. homogeneous = 0;
  179. } else {
  180. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  181. prometheus_units_copy(units, rrdset_units(st), PROMETHEUS_ELEMENT_MAX, 0);
  182. }
  183. return 0;
  184. }
  185. /**
  186. * Format dimension data for Prometheus Remote Write connector
  187. *
  188. * @param instance an instance data structure.
  189. * @param rd a dimension.
  190. * @return Always returns 0.
  191. */
  192. int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd)
  193. {
  194. struct simple_connector_data *simple_connector_data =
  195. (struct simple_connector_data *)instance->connector_specific_data;
  196. struct prometheus_remote_write_specific_data *connector_specific_data =
  197. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  198. if (rd->collector.counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
  199. char name[PROMETHEUS_LABELS_MAX + 1];
  200. char dimension[PROMETHEUS_ELEMENT_MAX + 1];
  201. char *suffix = "";
  202. RRDHOST *host = rd->rrdset->rrdhost;
  203. if (as_collected) {
  204. // we need as-collected / raw data
  205. if (unlikely(rd->collector.last_collected_time.tv_sec < instance->after)) {
  206. netdata_log_debug(
  207. D_EXPORTING,
  208. "EXPORTING: not sending dimension '%s' of chart '%s' from host '%s', "
  209. "its last data collection (%lu) is not within our timeframe (%lu to %lu)",
  210. rrddim_id(rd), rrdset_id(rd->rrdset),
  211. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  212. (unsigned long)rd->collector.last_collected_time.tv_sec,
  213. (unsigned long)instance->after,
  214. (unsigned long)instance->before);
  215. return 0;
  216. }
  217. if (rd->algorithm == RRD_ALGORITHM_INCREMENTAL || rd->algorithm == RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL) {
  218. if (strcmp(rrdset_module_name(rd->rrdset), "prometheus"))
  219. suffix = "_total";
  220. }
  221. if (homogeneous) {
  222. // all the dimensions of the chart, has the same algorithm, multiplier and divisor
  223. // we add all dimensions as labels
  224. prometheus_label_copy(
  225. dimension,
  226. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  227. PROMETHEUS_ELEMENT_MAX);
  228. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  229. add_metric(
  230. connector_specific_data->write_request,
  231. name, chart, family, dimension,
  232. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  233. rd->collector.last_collected_value, timeval_msec(&rd->collector.last_collected_time));
  234. } else {
  235. // the dimensions of the chart, do not have the same algorithm, multiplier or divisor
  236. // we create a metric per dimension
  237. prometheus_name_copy(
  238. dimension,
  239. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  240. PROMETHEUS_ELEMENT_MAX);
  241. snprintf(
  242. name, PROMETHEUS_LABELS_MAX, "%s_%s_%s%s", instance->config.prefix, context, dimension,
  243. suffix);
  244. add_metric(
  245. connector_specific_data->write_request,
  246. name, chart, family, NULL,
  247. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  248. rd->collector.last_collected_value, timeval_msec(&rd->collector.last_collected_time));
  249. }
  250. } else {
  251. // we need average or sum of the data
  252. time_t last_t = instance->before;
  253. NETDATA_DOUBLE value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
  254. if (!isnan(value) && !isinf(value)) {
  255. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  256. suffix = "_average";
  257. else if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)
  258. suffix = "_sum";
  259. prometheus_label_copy(
  260. dimension,
  261. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  262. PROMETHEUS_ELEMENT_MAX);
  263. snprintf(
  264. name, PROMETHEUS_LABELS_MAX, "%s_%s%s%s", instance->config.prefix, context, units, suffix);
  265. add_metric(
  266. connector_specific_data->write_request,
  267. name, chart, family, dimension,
  268. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  269. value, last_t * MSEC_PER_SEC);
  270. }
  271. }
  272. }
  273. return 0;
  274. }
  275. static int format_variable_prometheus_remote_write_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rv_ptr __maybe_unused, void *data) {
  276. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  277. struct prometheus_remote_write_variables_callback_options *opts = data;
  278. if (rrdvar_flags(rv) & (RRDVAR_FLAG_CUSTOM_HOST_VAR | RRDVAR_FLAG_CUSTOM_CHART_VAR)) {
  279. RRDHOST *host = opts->host;
  280. struct instance *instance = opts->instance;
  281. struct simple_connector_data *simple_connector_data =
  282. (struct simple_connector_data *)instance->connector_specific_data;
  283. struct prometheus_remote_write_specific_data *connector_specific_data =
  284. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  285. char name[PROMETHEUS_LABELS_MAX + 1];
  286. char *suffix = "";
  287. prometheus_name_copy(context, rrdvar_name(rv), PROMETHEUS_ELEMENT_MAX);
  288. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  289. NETDATA_DOUBLE value = rrdvar2number(rv);
  290. add_variable(connector_specific_data->write_request, name,
  291. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host), value, opts->now / USEC_PER_MS);
  292. }
  293. return 0;
  294. }
  295. /**
  296. * Format a variable for Prometheus Remote Write connector
  297. *
  298. * @param rv a variable.
  299. * @param instance an instance data structure.
  300. * @return Always returns 0.
  301. */
  302. int format_variables_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  303. {
  304. struct prometheus_remote_write_variables_callback_options opt = {
  305. .host = host,
  306. .instance = instance,
  307. .now = now_realtime_usec(),
  308. };
  309. return rrdvar_walkthrough_read(host->rrdvars, format_variable_prometheus_remote_write_callback, &opt);
  310. }
  311. /**
  312. * Format a batch for Prometheus Remote Write connector
  313. *
  314. * @param instance an instance data structure.
  315. * @return Returns 0 on success, 1 on failure.
  316. */
  317. int format_batch_prometheus_remote_write(struct instance *instance)
  318. {
  319. struct simple_connector_data *simple_connector_data =
  320. (struct simple_connector_data *)instance->connector_specific_data;
  321. struct prometheus_remote_write_specific_data *connector_specific_data =
  322. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  323. size_t data_size = get_write_request_size(connector_specific_data->write_request);
  324. if (unlikely(!data_size)) {
  325. netdata_log_error("EXPORTING: write request size is out of range");
  326. return 1;
  327. }
  328. BUFFER *buffer = instance->buffer;
  329. buffer_need_bytes(buffer, data_size);
  330. if (unlikely(pack_and_clear_write_request(connector_specific_data->write_request, buffer->buffer, &data_size))) {
  331. netdata_log_error("EXPORTING: cannot pack write request");
  332. return 1;
  333. }
  334. buffer->len = data_size;
  335. simple_connector_end_batch(instance);
  336. return 0;
  337. }