remote_write.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "remote_write.h"
  3. static int as_collected;
  4. static int homogeneous;
  5. char context[PROMETHEUS_ELEMENT_MAX + 1];
  6. char chart[PROMETHEUS_ELEMENT_MAX + 1];
  7. char family[PROMETHEUS_ELEMENT_MAX + 1];
  8. char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
  9. /**
  10. * Prepare HTTP header
  11. *
  12. * @param instance an instance data structure.
  13. */
  14. void prometheus_remote_write_prepare_header(struct instance *instance)
  15. {
  16. struct prometheus_remote_write_specific_config *connector_specific_config =
  17. instance->config.connector_specific_config;
  18. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  19. buffer_sprintf(
  20. simple_connector_data->last_buffer->header,
  21. "POST %s HTTP/1.1\r\n"
  22. "Host: %s\r\n"
  23. "Accept: */*\r\n"
  24. "%s"
  25. "Content-Encoding: snappy\r\n"
  26. "Content-Type: application/x-protobuf\r\n"
  27. "X-Prometheus-Remote-Write-Version: 0.1.0\r\n"
  28. "Content-Length: %zu\r\n"
  29. "\r\n",
  30. connector_specific_config->remote_write_path,
  31. simple_connector_data->connected_to,
  32. simple_connector_data->auth_string ? simple_connector_data->auth_string : "",
  33. buffer_strlen(simple_connector_data->last_buffer->buffer));
  34. }
  35. /**
  36. * Process a response received after Prometheus remote write connector had sent data
  37. *
  38. * @param buffer a response from a remote service.
  39. * @param instance an instance data structure.
  40. * @return Returns 0 on success, 1 on failure.
  41. */
  42. int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance)
  43. {
  44. if (unlikely(!buffer))
  45. return 1;
  46. const char *s = buffer_tostring(buffer);
  47. int len = buffer_strlen(buffer);
  48. // do nothing with HTTP responses 200 or 204
  49. while (!isspace(*s) && len) {
  50. s++;
  51. len--;
  52. }
  53. s++;
  54. len--;
  55. if (likely(len > 4 && (!strncmp(s, "200 ", 4) || !strncmp(s, "204 ", 4))))
  56. return 0;
  57. else
  58. return exporting_discard_response(buffer, instance);
  59. }
  60. /**
  61. * Release specific data allocated.
  62. *
  63. * @param instance an instance data structure.
  64. */
  65. void clean_prometheus_remote_write(struct instance *instance)
  66. {
  67. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  68. freez(simple_connector_data->connector_specific_data);
  69. struct prometheus_remote_write_specific_config *connector_specific_config =
  70. instance->config.connector_specific_config;
  71. freez(connector_specific_config->remote_write_path);
  72. }
  73. /**
  74. * Initialize Prometheus Remote Write connector instance
  75. *
  76. * @param instance an instance data structure.
  77. * @return Returns 0 on success, 1 on failure.
  78. */
  79. int init_prometheus_remote_write_instance(struct instance *instance)
  80. {
  81. instance->worker = simple_connector_worker;
  82. instance->start_batch_formatting = NULL;
  83. instance->start_host_formatting = format_host_prometheus_remote_write;
  84. instance->start_chart_formatting = format_chart_prometheus_remote_write;
  85. instance->metric_formatting = format_dimension_prometheus_remote_write;
  86. instance->end_chart_formatting = NULL;
  87. instance->variables_formatting = format_variables_prometheus_remote_write;
  88. instance->end_host_formatting = NULL;
  89. instance->end_batch_formatting = format_batch_prometheus_remote_write;
  90. instance->prepare_header = prometheus_remote_write_prepare_header;
  91. instance->check_response = process_prometheus_remote_write_response;
  92. instance->buffer = (void *)buffer_create(0, &netdata_buffers_statistics.buffers_exporters);
  93. if (uv_mutex_init(&instance->mutex))
  94. return 1;
  95. if (uv_cond_init(&instance->cond_var))
  96. return 1;
  97. struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
  98. instance->connector_specific_data = simple_connector_data;
  99. #ifdef ENABLE_HTTPS
  100. simple_connector_data->flags = NETDATA_SSL_START;
  101. simple_connector_data->conn = NULL;
  102. if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
  103. security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
  104. }
  105. #endif
  106. struct prometheus_remote_write_specific_data *connector_specific_data =
  107. callocz(1, sizeof(struct prometheus_remote_write_specific_data));
  108. simple_connector_data->connector_specific_data = (void *)connector_specific_data;
  109. simple_connector_init(instance);
  110. connector_specific_data->write_request = init_write_request();
  111. instance->engine->protocol_buffers_initialized = 1;
  112. return 0;
  113. }
  114. struct format_remote_write_label_callback {
  115. struct instance *instance;
  116. void *write_request;
  117. };
  118. static int format_remote_write_label_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  119. struct format_remote_write_label_callback *d = (struct format_remote_write_label_callback *)data;
  120. if (!should_send_label(d->instance, ls)) return 0;
  121. char k[PROMETHEUS_ELEMENT_MAX + 1];
  122. char v[PROMETHEUS_ELEMENT_MAX + 1];
  123. prometheus_name_copy(k, name, PROMETHEUS_ELEMENT_MAX);
  124. prometheus_label_copy(v, value, PROMETHEUS_ELEMENT_MAX);
  125. add_label(d->write_request, k, v);
  126. return 1;
  127. }
  128. /**
  129. * Format host data for Prometheus Remote Write connector
  130. *
  131. * @param instance an instance data structure.
  132. * @param host a data collecting host.
  133. * @return Always returns 0.
  134. */
  135. int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  136. {
  137. struct simple_connector_data *simple_connector_data =
  138. (struct simple_connector_data *)instance->connector_specific_data;
  139. struct prometheus_remote_write_specific_data *connector_specific_data =
  140. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  141. char hostname[PROMETHEUS_ELEMENT_MAX + 1];
  142. prometheus_label_copy(
  143. hostname,
  144. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  145. PROMETHEUS_ELEMENT_MAX);
  146. add_host_info(
  147. connector_specific_data->write_request,
  148. "netdata_info", hostname, rrdhost_program_name(host), rrdhost_program_version(host), now_realtime_usec() / USEC_PER_MS);
  149. if (unlikely(sending_labels_configured(instance))) {
  150. struct format_remote_write_label_callback tmp = {
  151. .write_request = connector_specific_data->write_request,
  152. .instance = instance
  153. };
  154. rrdlabels_walkthrough_read(host->rrdlabels, format_remote_write_label_callback, &tmp);
  155. }
  156. return 0;
  157. }
  158. /**
  159. * Format chart data for Prometheus Remote Write connector
  160. *
  161. * @param instance an instance data structure.
  162. * @param st a chart.
  163. * @return Always returns 0.
  164. */
  165. int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st)
  166. {
  167. prometheus_label_copy(
  168. chart,
  169. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? rrdset_name(st) : rrdset_id(st),
  170. PROMETHEUS_ELEMENT_MAX);
  171. prometheus_label_copy(family, rrdset_family(st), PROMETHEUS_ELEMENT_MAX);
  172. prometheus_name_copy(context, rrdset_context(st), PROMETHEUS_ELEMENT_MAX);
  173. as_collected = (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED);
  174. homogeneous = 1;
  175. if (as_collected) {
  176. if (rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
  177. rrdset_update_heterogeneous_flag(st);
  178. if (rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
  179. homogeneous = 0;
  180. } else {
  181. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  182. prometheus_units_copy(units, rrdset_units(st), PROMETHEUS_ELEMENT_MAX, 0);
  183. }
  184. return 0;
  185. }
  186. /**
  187. * Format dimension data for Prometheus Remote Write connector
  188. *
  189. * @param instance an instance data structure.
  190. * @param rd a dimension.
  191. * @return Always returns 0.
  192. */
  193. int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd)
  194. {
  195. struct simple_connector_data *simple_connector_data =
  196. (struct simple_connector_data *)instance->connector_specific_data;
  197. struct prometheus_remote_write_specific_data *connector_specific_data =
  198. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  199. if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
  200. char name[PROMETHEUS_LABELS_MAX + 1];
  201. char dimension[PROMETHEUS_ELEMENT_MAX + 1];
  202. char *suffix = "";
  203. RRDHOST *host = rd->rrdset->rrdhost;
  204. if (as_collected) {
  205. // we need as-collected / raw data
  206. if (unlikely(rd->last_collected_time.tv_sec < instance->after)) {
  207. debug(
  208. D_EXPORTING,
  209. "EXPORTING: not sending dimension '%s' of chart '%s' from host '%s', "
  210. "its last data collection (%lu) is not within our timeframe (%lu to %lu)",
  211. rrddim_id(rd), rrdset_id(rd->rrdset),
  212. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  213. (unsigned long)rd->last_collected_time.tv_sec,
  214. (unsigned long)instance->after,
  215. (unsigned long)instance->before);
  216. return 0;
  217. }
  218. if (rd->algorithm == RRD_ALGORITHM_INCREMENTAL || rd->algorithm == RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL) {
  219. if (strcmp(rrdset_module_name(rd->rrdset), "prometheus"))
  220. suffix = "_total";
  221. }
  222. if (homogeneous) {
  223. // all the dimensions of the chart, has the same algorithm, multiplier and divisor
  224. // we add all dimensions as labels
  225. prometheus_label_copy(
  226. dimension,
  227. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  228. PROMETHEUS_ELEMENT_MAX);
  229. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  230. add_metric(
  231. connector_specific_data->write_request,
  232. name, chart, family, dimension,
  233. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  234. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  235. } else {
  236. // the dimensions of the chart, do not have the same algorithm, multiplier or divisor
  237. // we create a metric per dimension
  238. prometheus_name_copy(
  239. dimension,
  240. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  241. PROMETHEUS_ELEMENT_MAX);
  242. snprintf(
  243. name, PROMETHEUS_LABELS_MAX, "%s_%s_%s%s", instance->config.prefix, context, dimension,
  244. suffix);
  245. add_metric(
  246. connector_specific_data->write_request,
  247. name, chart, family, NULL,
  248. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  249. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  250. }
  251. } else {
  252. // we need average or sum of the data
  253. time_t last_t = instance->before;
  254. NETDATA_DOUBLE value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
  255. if (!isnan(value) && !isinf(value)) {
  256. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  257. suffix = "_average";
  258. else if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)
  259. suffix = "_sum";
  260. prometheus_label_copy(
  261. dimension,
  262. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rrddim_name(rd) : rrddim_id(rd),
  263. PROMETHEUS_ELEMENT_MAX);
  264. snprintf(
  265. name, PROMETHEUS_LABELS_MAX, "%s_%s%s%s", instance->config.prefix, context, units, suffix);
  266. add_metric(
  267. connector_specific_data->write_request,
  268. name, chart, family, dimension,
  269. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host),
  270. value, last_t * MSEC_PER_SEC);
  271. }
  272. }
  273. }
  274. return 0;
  275. }
  276. static int format_variable_prometheus_remote_write_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rv_ptr __maybe_unused, void *data) {
  277. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  278. struct prometheus_remote_write_variables_callback_options *opts = data;
  279. if (rrdvar_flags(rv) & (RRDVAR_FLAG_CUSTOM_HOST_VAR | RRDVAR_FLAG_CUSTOM_CHART_VAR)) {
  280. RRDHOST *host = opts->host;
  281. struct instance *instance = opts->instance;
  282. struct simple_connector_data *simple_connector_data =
  283. (struct simple_connector_data *)instance->connector_specific_data;
  284. struct prometheus_remote_write_specific_data *connector_specific_data =
  285. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  286. char name[PROMETHEUS_LABELS_MAX + 1];
  287. char *suffix = "";
  288. prometheus_name_copy(context, rrdvar_name(rv), PROMETHEUS_ELEMENT_MAX);
  289. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  290. NETDATA_DOUBLE value = rrdvar2number(rv);
  291. add_variable(connector_specific_data->write_request, name,
  292. (host == localhost) ? instance->config.hostname : rrdhost_hostname(host), value, opts->now / USEC_PER_MS);
  293. }
  294. return 0;
  295. }
  296. /**
  297. * Format a variable for Prometheus Remote Write connector
  298. *
  299. * @param rv a variable.
  300. * @param instance an instance data structure.
  301. * @return Always returns 0.
  302. */
  303. int format_variables_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  304. {
  305. struct prometheus_remote_write_variables_callback_options opt = {
  306. .host = host,
  307. .instance = instance,
  308. .now = now_realtime_usec(),
  309. };
  310. return rrdvar_walkthrough_read(host->rrdvars, format_variable_prometheus_remote_write_callback, &opt);
  311. }
  312. /**
  313. * Format a batch for Prometheus Remote Write connector
  314. *
  315. * @param instance an instance data structure.
  316. * @return Returns 0 on success, 1 on failure.
  317. */
  318. int format_batch_prometheus_remote_write(struct instance *instance)
  319. {
  320. struct simple_connector_data *simple_connector_data =
  321. (struct simple_connector_data *)instance->connector_specific_data;
  322. struct prometheus_remote_write_specific_data *connector_specific_data =
  323. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  324. size_t data_size = get_write_request_size(connector_specific_data->write_request);
  325. if (unlikely(!data_size)) {
  326. error("EXPORTING: write request size is out of range");
  327. return 1;
  328. }
  329. BUFFER *buffer = instance->buffer;
  330. buffer_need_bytes(buffer, data_size);
  331. if (unlikely(pack_and_clear_write_request(connector_specific_data->write_request, buffer->buffer, &data_size))) {
  332. error("EXPORTING: cannot pack write request");
  333. return 1;
  334. }
  335. buffer->len = data_size;
  336. simple_connector_end_batch(instance);
  337. return 0;
  338. }