remote_write.c 15 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "remote_write.h"
  3. static int as_collected;
  4. static int homogeneous;
  5. char context[PROMETHEUS_ELEMENT_MAX + 1];
  6. char chart[PROMETHEUS_ELEMENT_MAX + 1];
  7. char family[PROMETHEUS_ELEMENT_MAX + 1];
  8. char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
  9. /**
  10. * Prepare HTTP header
  11. *
  12. * @param instance an instance data structure.
  13. */
  14. void prometheus_remote_write_prepare_header(struct instance *instance)
  15. {
  16. struct prometheus_remote_write_specific_config *connector_specific_config =
  17. instance->config.connector_specific_config;
  18. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  19. buffer_sprintf(
  20. simple_connector_data->last_buffer->header,
  21. "POST %s HTTP/1.1\r\n"
  22. "Host: %s\r\n"
  23. "Accept: */*\r\n"
  24. "%s"
  25. "Content-Encoding: snappy\r\n"
  26. "Content-Type: application/x-protobuf\r\n"
  27. "X-Prometheus-Remote-Write-Version: 0.1.0\r\n"
  28. "Content-Length: %zu\r\n"
  29. "\r\n",
  30. connector_specific_config->remote_write_path,
  31. simple_connector_data->connected_to,
  32. simple_connector_data->auth_string ? simple_connector_data->auth_string : "",
  33. buffer_strlen(simple_connector_data->last_buffer->buffer));
  34. }
  35. /**
  36. * Process a response received after Prometheus remote write connector had sent data
  37. *
  38. * @param buffer a response from a remote service.
  39. * @param instance an instance data structure.
  40. * @return Returns 0 on success, 1 on failure.
  41. */
  42. int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance)
  43. {
  44. if (unlikely(!buffer))
  45. return 1;
  46. const char *s = buffer_tostring(buffer);
  47. int len = buffer_strlen(buffer);
  48. // do nothing with HTTP responses 200 or 204
  49. while (!isspace(*s) && len) {
  50. s++;
  51. len--;
  52. }
  53. s++;
  54. len--;
  55. if (likely(len > 4 && (!strncmp(s, "200 ", 4) || !strncmp(s, "204 ", 4))))
  56. return 0;
  57. else
  58. return exporting_discard_response(buffer, instance);
  59. }
  60. /**
  61. * Release specific data allocated.
  62. *
  63. * @param instance an instance data structure.
  64. */
  65. void clean_prometheus_remote_write(struct instance *instance)
  66. {
  67. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  68. freez(simple_connector_data->connector_specific_data);
  69. struct prometheus_remote_write_specific_config *connector_specific_config =
  70. instance->config.connector_specific_config;
  71. freez(connector_specific_config->remote_write_path);
  72. }
  73. /**
  74. * Initialize Prometheus Remote Write connector instance
  75. *
  76. * @param instance an instance data structure.
  77. * @return Returns 0 on success, 1 on failure.
  78. */
  79. int init_prometheus_remote_write_instance(struct instance *instance)
  80. {
  81. instance->worker = simple_connector_worker;
  82. instance->start_batch_formatting = NULL;
  83. instance->start_host_formatting = format_host_prometheus_remote_write;
  84. instance->start_chart_formatting = format_chart_prometheus_remote_write;
  85. instance->metric_formatting = format_dimension_prometheus_remote_write;
  86. instance->end_chart_formatting = NULL;
  87. instance->variables_formatting = format_variables_prometheus_remote_write;
  88. instance->end_host_formatting = NULL;
  89. instance->end_batch_formatting = format_batch_prometheus_remote_write;
  90. instance->prepare_header = prometheus_remote_write_prepare_header;
  91. instance->check_response = process_prometheus_remote_write_response;
  92. instance->buffer = (void *)buffer_create(0);
  93. if (uv_mutex_init(&instance->mutex))
  94. return 1;
  95. if (uv_cond_init(&instance->cond_var))
  96. return 1;
  97. struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
  98. instance->connector_specific_data = simple_connector_data;
  99. #ifdef ENABLE_HTTPS
  100. simple_connector_data->flags = NETDATA_SSL_START;
  101. simple_connector_data->conn = NULL;
  102. if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
  103. security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
  104. }
  105. #endif
  106. struct prometheus_remote_write_specific_data *connector_specific_data =
  107. callocz(1, sizeof(struct prometheus_remote_write_specific_data));
  108. simple_connector_data->connector_specific_data = (void *)connector_specific_data;
  109. simple_connector_init(instance);
  110. connector_specific_data->write_request = init_write_request();
  111. instance->engine->protocol_buffers_initialized = 1;
  112. return 0;
  113. }
  114. struct format_remote_write_label_callback {
  115. struct instance *instance;
  116. void *write_request;
  117. };
  118. static int format_remote_write_label_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  119. struct format_remote_write_label_callback *d = (struct format_remote_write_label_callback *)data;
  120. if (!should_send_label(d->instance, ls)) return 0;
  121. char k[PROMETHEUS_ELEMENT_MAX + 1];
  122. char v[PROMETHEUS_ELEMENT_MAX + 1];
  123. prometheus_name_copy(k, name, PROMETHEUS_ELEMENT_MAX);
  124. prometheus_label_copy(v, value, PROMETHEUS_ELEMENT_MAX);
  125. add_label(d->write_request, k, v);
  126. return 1;
  127. }
  128. /**
  129. * Format host data for Prometheus Remote Write connector
  130. *
  131. * @param instance an instance data structure.
  132. * @param host a data collecting host.
  133. * @return Always returns 0.
  134. */
  135. int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  136. {
  137. struct simple_connector_data *simple_connector_data =
  138. (struct simple_connector_data *)instance->connector_specific_data;
  139. struct prometheus_remote_write_specific_data *connector_specific_data =
  140. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  141. char hostname[PROMETHEUS_ELEMENT_MAX + 1];
  142. prometheus_label_copy(
  143. hostname,
  144. (host == localhost) ? instance->config.hostname : host->hostname,
  145. PROMETHEUS_ELEMENT_MAX);
  146. add_host_info(
  147. connector_specific_data->write_request,
  148. "netdata_info", hostname, host->program_name, host->program_version, now_realtime_usec() / USEC_PER_MS);
  149. if (unlikely(sending_labels_configured(instance))) {
  150. struct format_remote_write_label_callback tmp = {
  151. .write_request = connector_specific_data->write_request,
  152. .instance = instance
  153. };
  154. rrdlabels_walkthrough_read(host->host_labels, format_remote_write_label_callback, &tmp);
  155. }
  156. return 0;
  157. }
  158. /**
  159. * Format chart data for Prometheus Remote Write connector
  160. *
  161. * @param instance an instance data structure.
  162. * @param st a chart.
  163. * @return Always returns 0.
  164. */
  165. int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st)
  166. {
  167. prometheus_label_copy(
  168. chart,
  169. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
  170. PROMETHEUS_ELEMENT_MAX);
  171. prometheus_label_copy(family, st->family, PROMETHEUS_ELEMENT_MAX);
  172. prometheus_name_copy(context, st->context, PROMETHEUS_ELEMENT_MAX);
  173. as_collected = (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED);
  174. homogeneous = 1;
  175. if (as_collected) {
  176. if (rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
  177. rrdset_update_heterogeneous_flag(st);
  178. if (rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
  179. homogeneous = 0;
  180. } else {
  181. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  182. prometheus_units_copy(units, st->units, PROMETHEUS_ELEMENT_MAX, 0);
  183. }
  184. return 0;
  185. }
  186. /**
  187. * Format dimension data for Prometheus Remote Write connector
  188. *
  189. * @param instance an instance data structure.
  190. * @param rd a dimension.
  191. * @return Always returns 0.
  192. */
  193. int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd)
  194. {
  195. struct simple_connector_data *simple_connector_data =
  196. (struct simple_connector_data *)instance->connector_specific_data;
  197. struct prometheus_remote_write_specific_data *connector_specific_data =
  198. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  199. if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
  200. char name[PROMETHEUS_LABELS_MAX + 1];
  201. char dimension[PROMETHEUS_ELEMENT_MAX + 1];
  202. char *suffix = "";
  203. RRDHOST *host = rd->rrdset->rrdhost;
  204. if (as_collected) {
  205. // we need as-collected / raw data
  206. if (unlikely(rd->last_collected_time.tv_sec < instance->after)) {
  207. debug(
  208. D_EXPORTING,
  209. "EXPORTING: not sending dimension '%s' of chart '%s' from host '%s', "
  210. "its last data collection (%lu) is not within our timeframe (%lu to %lu)",
  211. rd->id, rd->rrdset->id,
  212. (host == localhost) ? instance->config.hostname : host->hostname,
  213. (unsigned long)rd->last_collected_time.tv_sec,
  214. (unsigned long)instance->after,
  215. (unsigned long)instance->before);
  216. return 0;
  217. }
  218. if (homogeneous) {
  219. // all the dimensions of the chart, has the same algorithm, multiplier and divisor
  220. // we add all dimensions as labels
  221. prometheus_label_copy(
  222. dimension,
  223. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  224. PROMETHEUS_ELEMENT_MAX);
  225. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  226. add_metric(
  227. connector_specific_data->write_request,
  228. name, chart, family, dimension,
  229. (host == localhost) ? instance->config.hostname : host->hostname,
  230. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  231. } else {
  232. // the dimensions of the chart, do not have the same algorithm, multiplier or divisor
  233. // we create a metric per dimension
  234. prometheus_name_copy(
  235. dimension,
  236. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  237. PROMETHEUS_ELEMENT_MAX);
  238. snprintf(
  239. name, PROMETHEUS_LABELS_MAX, "%s_%s_%s%s", instance->config.prefix, context, dimension,
  240. suffix);
  241. add_metric(
  242. connector_specific_data->write_request,
  243. name, chart, family, NULL,
  244. (host == localhost) ? instance->config.hostname : host->hostname,
  245. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  246. }
  247. } else {
  248. // we need average or sum of the data
  249. time_t last_t = instance->before;
  250. NETDATA_DOUBLE value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
  251. if (!isnan(value) && !isinf(value)) {
  252. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  253. suffix = "_average";
  254. else if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)
  255. suffix = "_sum";
  256. prometheus_label_copy(
  257. dimension,
  258. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  259. PROMETHEUS_ELEMENT_MAX);
  260. snprintf(
  261. name, PROMETHEUS_LABELS_MAX, "%s_%s%s%s", instance->config.prefix, context, units, suffix);
  262. add_metric(
  263. connector_specific_data->write_request,
  264. name, chart, family, dimension,
  265. (host == localhost) ? instance->config.hostname : host->hostname,
  266. value, last_t * MSEC_PER_SEC);
  267. }
  268. }
  269. }
  270. return 0;
  271. }
  272. int format_variable_prometheus_remote_write_callback(RRDVAR *rv, void *data) {
  273. struct prometheus_remote_write_variables_callback_options *opts = data;
  274. if (rv->options & (RRDVAR_OPTION_CUSTOM_HOST_VAR | RRDVAR_OPTION_CUSTOM_CHART_VAR)) {
  275. RRDHOST *host = opts->host;
  276. struct instance *instance = opts->instance;
  277. struct simple_connector_data *simple_connector_data =
  278. (struct simple_connector_data *)instance->connector_specific_data;
  279. struct prometheus_remote_write_specific_data *connector_specific_data =
  280. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  281. char name[PROMETHEUS_LABELS_MAX + 1];
  282. char *suffix = "";
  283. prometheus_name_copy(context, rv->name, PROMETHEUS_ELEMENT_MAX);
  284. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  285. NETDATA_DOUBLE value = rrdvar2number(rv);
  286. add_variable(connector_specific_data->write_request, name,
  287. (host == localhost) ? instance->config.hostname : host->hostname, value, opts->now / USEC_PER_MS);
  288. }
  289. return 0;
  290. }
  291. /**
  292. * Format a variable for Prometheus Remote Write connector
  293. *
  294. * @param rv a variable.
  295. * @param instance an instance data structure.
  296. * @return Always returns 0.
  297. */
  298. int format_variables_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  299. {
  300. struct prometheus_remote_write_variables_callback_options opt = {
  301. .host = host,
  302. .instance = instance,
  303. .now = now_realtime_usec(),
  304. };
  305. return foreach_host_variable_callback(host, format_variable_prometheus_remote_write_callback, &opt);
  306. }
  307. /**
  308. * Format a batch for Prometheus Remote Write connector
  309. *
  310. * @param instance an instance data structure.
  311. * @return Returns 0 on success, 1 on failure.
  312. */
  313. int format_batch_prometheus_remote_write(struct instance *instance)
  314. {
  315. struct simple_connector_data *simple_connector_data =
  316. (struct simple_connector_data *)instance->connector_specific_data;
  317. struct prometheus_remote_write_specific_data *connector_specific_data =
  318. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  319. size_t data_size = get_write_request_size(connector_specific_data->write_request);
  320. if (unlikely(!data_size)) {
  321. error("EXPORTING: write request size is out of range");
  322. return 1;
  323. }
  324. BUFFER *buffer = instance->buffer;
  325. buffer_need_bytes(buffer, data_size);
  326. if (unlikely(pack_and_clear_write_request(connector_specific_data->write_request, buffer->buffer, &data_size))) {
  327. error("EXPORTING: cannot pack write request");
  328. return 1;
  329. }
  330. buffer->len = data_size;
  331. simple_connector_end_batch(instance);
  332. return 0;
  333. }