remote_write.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "remote_write.h"
  3. static int as_collected;
  4. static int homogeneous;
  5. char context[PROMETHEUS_ELEMENT_MAX + 1];
  6. char chart[PROMETHEUS_ELEMENT_MAX + 1];
  7. char family[PROMETHEUS_ELEMENT_MAX + 1];
  8. char units[PROMETHEUS_ELEMENT_MAX + 1] = "";
  9. /**
  10. * Prepare HTTP header
  11. *
  12. * @param instance an instance data structure.
  13. */
  14. void prometheus_remote_write_prepare_header(struct instance *instance)
  15. {
  16. struct prometheus_remote_write_specific_config *connector_specific_config =
  17. instance->config.connector_specific_config;
  18. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  19. buffer_sprintf(
  20. simple_connector_data->last_buffer->header,
  21. "POST %s HTTP/1.1\r\n"
  22. "Host: %s\r\n"
  23. "Accept: */*\r\n"
  24. "%s"
  25. "Content-Encoding: snappy\r\n"
  26. "Content-Type: application/x-protobuf\r\n"
  27. "X-Prometheus-Remote-Write-Version: 0.1.0\r\n"
  28. "Content-Length: %zu\r\n"
  29. "\r\n",
  30. connector_specific_config->remote_write_path,
  31. simple_connector_data->connected_to,
  32. simple_connector_data->auth_string ? simple_connector_data->auth_string : "",
  33. buffer_strlen(simple_connector_data->last_buffer->buffer));
  34. }
  35. /**
  36. * Process a response received after Prometheus remote write connector had sent data
  37. *
  38. * @param buffer a response from a remote service.
  39. * @param instance an instance data structure.
  40. * @return Returns 0 on success, 1 on failure.
  41. */
  42. int process_prometheus_remote_write_response(BUFFER *buffer, struct instance *instance)
  43. {
  44. if (unlikely(!buffer))
  45. return 1;
  46. const char *s = buffer_tostring(buffer);
  47. int len = buffer_strlen(buffer);
  48. // do nothing with HTTP responses 200 or 204
  49. while (!isspace(*s) && len) {
  50. s++;
  51. len--;
  52. }
  53. s++;
  54. len--;
  55. if (likely(len > 4 && (!strncmp(s, "200 ", 4) || !strncmp(s, "204 ", 4))))
  56. return 0;
  57. else
  58. return exporting_discard_response(buffer, instance);
  59. }
  60. /**
  61. * Release specific data allocated.
  62. *
  63. * @param instance an instance data structure.
  64. */
  65. void clean_prometheus_remote_write(struct instance *instance)
  66. {
  67. struct simple_connector_data *simple_connector_data = instance->connector_specific_data;
  68. freez(simple_connector_data->connector_specific_data);
  69. struct prometheus_remote_write_specific_config *connector_specific_config =
  70. instance->config.connector_specific_config;
  71. freez(connector_specific_config->remote_write_path);
  72. }
  73. /**
  74. * Initialize Prometheus Remote Write connector instance
  75. *
  76. * @param instance an instance data structure.
  77. * @return Returns 0 on success, 1 on failure.
  78. */
  79. int init_prometheus_remote_write_instance(struct instance *instance)
  80. {
  81. instance->worker = simple_connector_worker;
  82. instance->start_batch_formatting = NULL;
  83. instance->start_host_formatting = format_host_prometheus_remote_write;
  84. instance->start_chart_formatting = format_chart_prometheus_remote_write;
  85. instance->metric_formatting = format_dimension_prometheus_remote_write;
  86. instance->end_chart_formatting = NULL;
  87. instance->end_host_formatting = NULL;
  88. instance->end_batch_formatting = format_batch_prometheus_remote_write;
  89. instance->prepare_header = prometheus_remote_write_prepare_header;
  90. instance->check_response = process_prometheus_remote_write_response;
  91. instance->buffer = (void *)buffer_create(0);
  92. if (uv_mutex_init(&instance->mutex))
  93. return 1;
  94. if (uv_cond_init(&instance->cond_var))
  95. return 1;
  96. struct simple_connector_data *simple_connector_data = callocz(1, sizeof(struct simple_connector_data));
  97. instance->connector_specific_data = simple_connector_data;
  98. #ifdef ENABLE_HTTPS
  99. simple_connector_data->flags = NETDATA_SSL_START;
  100. simple_connector_data->conn = NULL;
  101. if (instance->config.options & EXPORTING_OPTION_USE_TLS) {
  102. security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
  103. }
  104. #endif
  105. struct prometheus_remote_write_specific_data *connector_specific_data =
  106. callocz(1, sizeof(struct prometheus_remote_write_specific_data));
  107. simple_connector_data->connector_specific_data = (void *)connector_specific_data;
  108. simple_connector_init(instance);
  109. connector_specific_data->write_request = init_write_request();
  110. instance->engine->protocol_buffers_initialized = 1;
  111. return 0;
  112. }
  113. /**
  114. * Format host data for Prometheus Remote Write connector
  115. *
  116. * @param instance an instance data structure.
  117. * @param host a data collecting host.
  118. * @return Always returns 0.
  119. */
  120. int format_host_prometheus_remote_write(struct instance *instance, RRDHOST *host)
  121. {
  122. struct simple_connector_data *simple_connector_data =
  123. (struct simple_connector_data *)instance->connector_specific_data;
  124. struct prometheus_remote_write_specific_data *connector_specific_data =
  125. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  126. char hostname[PROMETHEUS_ELEMENT_MAX + 1];
  127. prometheus_label_copy(
  128. hostname,
  129. (host == localhost) ? instance->config.hostname : host->hostname,
  130. PROMETHEUS_ELEMENT_MAX);
  131. add_host_info(
  132. connector_specific_data->write_request,
  133. "netdata_info", hostname, host->program_name, host->program_version, now_realtime_usec() / USEC_PER_MS);
  134. if (unlikely(sending_labels_configured(instance))) {
  135. rrdhost_check_rdlock(host);
  136. netdata_rwlock_rdlock(&host->labels.labels_rwlock);
  137. for (struct label *label = host->labels.head; label; label = label->next) {
  138. if (!should_send_label(instance, label))
  139. continue;
  140. char key[PROMETHEUS_ELEMENT_MAX + 1];
  141. prometheus_name_copy(key, label->key, PROMETHEUS_ELEMENT_MAX);
  142. char value[PROMETHEUS_ELEMENT_MAX + 1];
  143. prometheus_label_copy(value, label->value, PROMETHEUS_ELEMENT_MAX);
  144. add_label(connector_specific_data->write_request, key, value);
  145. }
  146. netdata_rwlock_unlock(&host->labels.labels_rwlock);
  147. }
  148. return 0;
  149. }
  150. /**
  151. * Format chart data for Prometheus Remote Write connector
  152. *
  153. * @param instance an instance data structure.
  154. * @param st a chart.
  155. * @return Always returns 0.
  156. */
  157. int format_chart_prometheus_remote_write(struct instance *instance, RRDSET *st)
  158. {
  159. prometheus_label_copy(
  160. chart,
  161. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && st->name) ? st->name : st->id,
  162. PROMETHEUS_ELEMENT_MAX);
  163. prometheus_label_copy(family, st->family, PROMETHEUS_ELEMENT_MAX);
  164. prometheus_name_copy(context, st->context, PROMETHEUS_ELEMENT_MAX);
  165. as_collected = (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED);
  166. homogeneous = 1;
  167. if (as_collected) {
  168. if (rrdset_flag_check(st, RRDSET_FLAG_HOMOGENEOUS_CHECK))
  169. rrdset_update_heterogeneous_flag(st);
  170. if (rrdset_flag_check(st, RRDSET_FLAG_HETEROGENEOUS))
  171. homogeneous = 0;
  172. } else {
  173. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  174. prometheus_units_copy(units, st->units, PROMETHEUS_ELEMENT_MAX, 0);
  175. }
  176. return 0;
  177. }
  178. /**
  179. * Format dimension data for Prometheus Remote Write connector
  180. *
  181. * @param instance an instance data structure.
  182. * @param rd a dimension.
  183. * @return Always returns 0.
  184. */
  185. int format_dimension_prometheus_remote_write(struct instance *instance, RRDDIM *rd)
  186. {
  187. struct simple_connector_data *simple_connector_data =
  188. (struct simple_connector_data *)instance->connector_specific_data;
  189. struct prometheus_remote_write_specific_data *connector_specific_data =
  190. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  191. if (rd->collections_counter && !rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) {
  192. char name[PROMETHEUS_LABELS_MAX + 1];
  193. char dimension[PROMETHEUS_ELEMENT_MAX + 1];
  194. char *suffix = "";
  195. RRDHOST *host = rd->rrdset->rrdhost;
  196. if (as_collected) {
  197. // we need as-collected / raw data
  198. if (unlikely(rd->last_collected_time.tv_sec < instance->after)) {
  199. debug(
  200. D_EXPORTING,
  201. "EXPORTING: not sending dimension '%s' of chart '%s' from host '%s', "
  202. "its last data collection (%lu) is not within our timeframe (%lu to %lu)",
  203. rd->id, rd->rrdset->id,
  204. (host == localhost) ? instance->config.hostname : host->hostname,
  205. (unsigned long)rd->last_collected_time.tv_sec,
  206. (unsigned long)instance->after,
  207. (unsigned long)instance->before);
  208. return 0;
  209. }
  210. if (homogeneous) {
  211. // all the dimensions of the chart, has the same algorithm, multiplier and divisor
  212. // we add all dimensions as labels
  213. prometheus_label_copy(
  214. dimension,
  215. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  216. PROMETHEUS_ELEMENT_MAX);
  217. snprintf(name, PROMETHEUS_LABELS_MAX, "%s_%s%s", instance->config.prefix, context, suffix);
  218. add_metric(
  219. connector_specific_data->write_request,
  220. name, chart, family, dimension,
  221. (host == localhost) ? instance->config.hostname : host->hostname,
  222. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  223. } else {
  224. // the dimensions of the chart, do not have the same algorithm, multiplier or divisor
  225. // we create a metric per dimension
  226. prometheus_name_copy(
  227. dimension,
  228. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  229. PROMETHEUS_ELEMENT_MAX);
  230. snprintf(
  231. name, PROMETHEUS_LABELS_MAX, "%s_%s_%s%s", instance->config.prefix, context, dimension,
  232. suffix);
  233. add_metric(
  234. connector_specific_data->write_request,
  235. name, chart, family, NULL,
  236. (host == localhost) ? instance->config.hostname : host->hostname,
  237. rd->last_collected_value, timeval_msec(&rd->last_collected_time));
  238. }
  239. } else {
  240. // we need average or sum of the data
  241. time_t last_t = instance->before;
  242. calculated_number value = exporting_calculate_value_from_stored_data(instance, rd, &last_t);
  243. if (!isnan(value) && !isinf(value)) {
  244. if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AVERAGE)
  245. suffix = "_average";
  246. else if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM)
  247. suffix = "_sum";
  248. prometheus_label_copy(
  249. dimension,
  250. (instance->config.options & EXPORTING_OPTION_SEND_NAMES && rd->name) ? rd->name : rd->id,
  251. PROMETHEUS_ELEMENT_MAX);
  252. snprintf(
  253. name, PROMETHEUS_LABELS_MAX, "%s_%s%s%s", instance->config.prefix, context, units, suffix);
  254. add_metric(
  255. connector_specific_data->write_request,
  256. name, chart, family, dimension,
  257. (host == localhost) ? instance->config.hostname : host->hostname,
  258. value, last_t * MSEC_PER_SEC);
  259. }
  260. }
  261. }
  262. return 0;
  263. }
  264. /**
  265. * Format a batch for Prometheus Remote Write connector
  266. *
  267. * @param instance an instance data structure.
  268. * @return Returns 0 on success, 1 on failure.
  269. */
  270. int format_batch_prometheus_remote_write(struct instance *instance)
  271. {
  272. struct simple_connector_data *simple_connector_data =
  273. (struct simple_connector_data *)instance->connector_specific_data;
  274. struct prometheus_remote_write_specific_data *connector_specific_data =
  275. (struct prometheus_remote_write_specific_data *)simple_connector_data->connector_specific_data;
  276. size_t data_size = get_write_request_size(connector_specific_data->write_request);
  277. if (unlikely(!data_size)) {
  278. error("EXPORTING: write request size is out of range");
  279. return 1;
  280. }
  281. BUFFER *buffer = instance->buffer;
  282. buffer_need_bytes(buffer, data_size);
  283. if (unlikely(pack_and_clear_write_request(connector_specific_data->write_request, buffer->buffer, &data_size))) {
  284. error("EXPORTING: cannot pack write request");
  285. return 1;
  286. }
  287. buffer->len = data_size;
  288. instance->stats.buffered_bytes = (collected_number)buffer_strlen(buffer);
  289. simple_connector_end_batch(instance);
  290. return 0;
  291. }