backends.c 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define BACKENDS_INTERNALS
  3. #include "backends.h"
  4. // ----------------------------------------------------------------------------
  5. // How backends work in netdata:
  6. //
  7. // 1. There is an independent thread that runs at the required interval
  8. // (for example, once every 10 seconds)
  9. //
  10. // 2. Every time it wakes, it calls the backend formatting functions to build
  11. // a buffer of data. This is a very fast, memory only operation.
  12. //
  13. // 3. If the buffer already includes data, the new data are appended.
  14. // If the buffer becomes too big, because the data cannot be sent, a
  15. // log is written and the buffer is discarded.
  16. //
  17. // 4. Then it tries to send all the data. It blocks until all the data are sent
  18. // or the socket returns an error.
  19. // If the time required for this is above the interval, it starts skipping
  20. // intervals, but the calculated values include the entire database, without
  21. // gaps (it remembers the timestamps and continues from where it stopped).
  22. //
  23. // 5. repeats the above forever.
  24. //
  25. const char *global_backend_prefix = "netdata";
  26. int global_backend_update_every = 10;
  27. BACKEND_OPTIONS global_backend_options = BACKEND_SOURCE_DATA_AVERAGE | BACKEND_OPTION_SEND_NAMES;
  28. const char *global_backend_source = NULL;
  29. // ----------------------------------------------------------------------------
  30. // helper functions for backends
  31. size_t backend_name_copy(char *d, const char *s, size_t usable) {
  32. size_t n;
  33. for(n = 0; *s && n < usable ; d++, s++, n++) {
  34. char c = *s;
  35. if(c != '.' && !isalnum(c)) *d = '_';
  36. else *d = c;
  37. }
  38. *d = '\0';
  39. return n;
  40. }
  41. // calculate the SUM or AVERAGE of a dimension, for any timeframe
  42. // may return NAN if the database does not have any value in the give timeframe
  43. calculated_number backend_calculate_value_from_stored_data(
  44. RRDSET *st // the chart
  45. , RRDDIM *rd // the dimension
  46. , time_t after // the start timestamp
  47. , time_t before // the end timestamp
  48. , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
  49. , time_t *first_timestamp // the first point of the database used in this response
  50. , time_t *last_timestamp // the timestamp that should be reported to backend
  51. ) {
  52. RRDHOST *host = st->rrdhost;
  53. (void)host;
  54. // find the edges of the rrd database for this chart
  55. time_t first_t = rd->state->query_ops.oldest_time(rd);
  56. time_t last_t = rd->state->query_ops.latest_time(rd);
  57. time_t update_every = st->update_every;
  58. struct rrddim_query_handle handle;
  59. storage_number n;
  60. // step back a little, to make sure we have complete data collection
  61. // for all metrics
  62. after -= update_every * 2;
  63. before -= update_every * 2;
  64. // align the time-frame
  65. after = after - (after % update_every);
  66. before = before - (before % update_every);
  67. // for before, loose another iteration
  68. // the latest point will be reported the next time
  69. before -= update_every;
  70. if(unlikely(after > before))
  71. // this can happen when update_every > before - after
  72. after = before;
  73. if(unlikely(after < first_t))
  74. after = first_t;
  75. if(unlikely(before > last_t))
  76. before = last_t;
  77. if(unlikely(before < first_t || after > last_t)) {
  78. // the chart has not been updated in the wanted timeframe
  79. debug(D_BACKEND, "BACKEND: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
  80. host->hostname, st->id, rd->id,
  81. (unsigned long)after, (unsigned long)before,
  82. (unsigned long)first_t, (unsigned long)last_t
  83. );
  84. return NAN;
  85. }
  86. *first_timestamp = after;
  87. *last_timestamp = before;
  88. size_t counter = 0;
  89. calculated_number sum = 0;
  90. /*
  91. long start_at_slot = rrdset_time2slot(st, before),
  92. stop_at_slot = rrdset_time2slot(st, after),
  93. slot, stop_now = 0;
  94. for(slot = start_at_slot; !stop_now ; slot--) {
  95. if(unlikely(slot < 0)) slot = st->entries - 1;
  96. if(unlikely(slot == stop_at_slot)) stop_now = 1;
  97. storage_number n = rd->values[slot];
  98. if(unlikely(!does_storage_number_exist(n))) {
  99. // not collected
  100. continue;
  101. }
  102. calculated_number value = unpack_storage_number(n);
  103. sum += value;
  104. counter++;
  105. }
  106. */
  107. for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) {
  108. time_t curr_t;
  109. n = rd->state->query_ops.next_metric(&handle, &curr_t);
  110. if(unlikely(!does_storage_number_exist(n))) {
  111. // not collected
  112. continue;
  113. }
  114. calculated_number value = unpack_storage_number(n);
  115. sum += value;
  116. counter++;
  117. }
  118. rd->state->query_ops.finalize(&handle);
  119. if(unlikely(!counter)) {
  120. debug(D_BACKEND, "BACKEND: %s.%s.%s: no values stored in database for range %lu to %lu",
  121. host->hostname, st->id, rd->id,
  122. (unsigned long)after, (unsigned long)before
  123. );
  124. return NAN;
  125. }
  126. if(unlikely(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_SUM))
  127. return sum;
  128. return sum / (calculated_number)counter;
  129. }
  130. // discard a response received by a backend
  131. // after logging a simple of it to error.log
  132. int discard_response(BUFFER *b, const char *backend) {
  133. char sample[1024];
  134. const char *s = buffer_tostring(b);
  135. char *d = sample, *e = &sample[sizeof(sample) - 1];
  136. for(; *s && d < e ;s++) {
  137. char c = *s;
  138. if(unlikely(!isprint(c))) c = ' ';
  139. *d++ = c;
  140. }
  141. *d = '\0';
  142. info("BACKEND: received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample);
  143. buffer_flush(b);
  144. return 0;
  145. }
  146. // ----------------------------------------------------------------------------
  147. // the backend thread
  148. static SIMPLE_PATTERN *charts_pattern = NULL;
  149. static SIMPLE_PATTERN *hosts_pattern = NULL;
  150. inline int backends_can_send_rrdset(BACKEND_OPTIONS backend_options, RRDSET *st) {
  151. RRDHOST *host = st->rrdhost;
  152. (void)host;
  153. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_BACKEND_IGNORE)))
  154. return 0;
  155. if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_BACKEND_SEND))) {
  156. // we have not checked this chart
  157. if(simple_pattern_matches(charts_pattern, st->id) || simple_pattern_matches(charts_pattern, st->name))
  158. rrdset_flag_set(st, RRDSET_FLAG_BACKEND_SEND);
  159. else {
  160. rrdset_flag_set(st, RRDSET_FLAG_BACKEND_IGNORE);
  161. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is disabled for backends.", st->id, host->hostname);
  162. return 0;
  163. }
  164. }
  165. if(unlikely(!rrdset_is_available_for_exporting_and_alarms(st))) {
  166. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is not available for backends.", st->id, host->hostname);
  167. return 0;
  168. }
  169. if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE && !(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED))) {
  170. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s' because its memory mode is '%s' and the backend requires database access.", st->id, host->hostname, rrd_memory_mode_name(host->rrd_memory_mode));
  171. return 0;
  172. }
  173. return 1;
  174. }
  175. inline BACKEND_OPTIONS backend_parse_data_source(const char *source, BACKEND_OPTIONS backend_options) {
  176. if(!strcmp(source, "raw") || !strcmp(source, "as collected") || !strcmp(source, "as-collected") || !strcmp(source, "as_collected") || !strcmp(source, "ascollected")) {
  177. backend_options |= BACKEND_SOURCE_DATA_AS_COLLECTED;
  178. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_AS_COLLECTED);
  179. }
  180. else if(!strcmp(source, "average")) {
  181. backend_options |= BACKEND_SOURCE_DATA_AVERAGE;
  182. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_AVERAGE);
  183. }
  184. else if(!strcmp(source, "sum") || !strcmp(source, "volume")) {
  185. backend_options |= BACKEND_SOURCE_DATA_SUM;
  186. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_SUM);
  187. }
  188. else {
  189. error("BACKEND: invalid data source method '%s'.", source);
  190. }
  191. return backend_options;
  192. }
  193. static void backends_main_cleanup(void *ptr) {
  194. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  195. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  196. info("cleaning up...");
  197. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  198. }
  199. /**
  200. * Set Kinesis variables
  201. *
  202. * Set the variables necessary to work with this specific backend.
  203. *
  204. * @param default_port the default port of the backend
  205. * @param brc function called to check the result.
  206. * @param brf function called to format the message to the backend
  207. */
  208. void backend_set_kinesis_variables(int *default_port,
  209. backend_response_checker_t brc,
  210. backend_request_formatter_t brf)
  211. {
  212. (void)default_port;
  213. #ifndef HAVE_KINESIS
  214. (void)brc;
  215. (void)brf;
  216. #endif
  217. #if HAVE_KINESIS
  218. *brc = process_json_response;
  219. if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  220. *brf = backends_format_dimension_collected_json_plaintext;
  221. else
  222. *brf = backends_format_dimension_stored_json_plaintext;
  223. #endif
  224. }
  225. /**
  226. * Set Prometheus variables
  227. *
  228. * Set the variables necessary to work with this specific backend.
  229. *
  230. * @param default_port the default port of the backend
  231. * @param brc function called to check the result.
  232. * @param brf function called to format the message to the backend
  233. */
  234. void backend_set_prometheus_variables(int *default_port,
  235. backend_response_checker_t brc,
  236. backend_request_formatter_t brf)
  237. {
  238. (void)default_port;
  239. (void)brf;
  240. #ifndef ENABLE_PROMETHEUS_REMOTE_WRITE
  241. (void)brc;
  242. #endif
  243. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  244. *brc = backends_process_prometheus_remote_write_response;
  245. #endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
  246. }
  247. /**
  248. * Set MongoDB variables
  249. *
  250. * Set the variables necessary to work with this specific backend.
  251. *
  252. * @param default_port the default port of the backend
  253. * @param brc function called to check the result.
  254. * @param brf function called to format the message to the backend
  255. */
  256. void backend_set_mongodb_variables(int *default_port,
  257. backend_response_checker_t brc,
  258. backend_request_formatter_t brf)
  259. {
  260. (void)default_port;
  261. #ifndef HAVE_MONGOC
  262. (void)brc;
  263. (void)brf;
  264. #endif
  265. #if HAVE_MONGOC
  266. *brc = process_json_response;
  267. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  268. *brf = backends_format_dimension_collected_json_plaintext;
  269. else
  270. *brf = backends_format_dimension_stored_json_plaintext;
  271. #endif
  272. }
  273. /**
  274. * Set JSON variables
  275. *
  276. * Set the variables necessary to work with this specific backend.
  277. *
  278. * @param default_port the default port of the backend
  279. * @param brc function called to check the result.
  280. * @param brf function called to format the message to the backend
  281. */
  282. void backend_set_json_variables(int *default_port,
  283. backend_response_checker_t brc,
  284. backend_request_formatter_t brf)
  285. {
  286. *default_port = 5448;
  287. *brc = process_json_response;
  288. if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  289. *brf = backends_format_dimension_collected_json_plaintext;
  290. else
  291. *brf = backends_format_dimension_stored_json_plaintext;
  292. }
  293. /**
  294. * Set OpenTSDB HTTP variables
  295. *
  296. * Set the variables necessary to work with this specific backend.
  297. *
  298. * @param default_port the default port of the backend
  299. * @param brc function called to check the result.
  300. * @param brf function called to format the message to the backend
  301. */
  302. void backend_set_opentsdb_http_variables(int *default_port,
  303. backend_response_checker_t brc,
  304. backend_request_formatter_t brf)
  305. {
  306. *default_port = 4242;
  307. *brc = process_opentsdb_response;
  308. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  309. *brf = backends_format_dimension_collected_opentsdb_http;
  310. else
  311. *brf = backends_format_dimension_stored_opentsdb_http;
  312. }
  313. /**
  314. * Set OpenTSDB Telnet variables
  315. *
  316. * Set the variables necessary to work with this specific backend.
  317. *
  318. * @param default_port the default port of the backend
  319. * @param brc function called to check the result.
  320. * @param brf function called to format the message to the backend
  321. */
  322. void backend_set_opentsdb_telnet_variables(int *default_port,
  323. backend_response_checker_t brc,
  324. backend_request_formatter_t brf)
  325. {
  326. *default_port = 4242;
  327. *brc = process_opentsdb_response;
  328. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  329. *brf = backends_format_dimension_collected_opentsdb_telnet;
  330. else
  331. *brf = backends_format_dimension_stored_opentsdb_telnet;
  332. }
  333. /**
  334. * Set Graphite variables
  335. *
  336. * Set the variables necessary to work with this specific backend.
  337. *
  338. * @param default_port the default port of the backend
  339. * @param brc function called to check the result.
  340. * @param brf function called to format the message to the backend
  341. */
  342. void backend_set_graphite_variables(int *default_port,
  343. backend_response_checker_t brc,
  344. backend_request_formatter_t brf)
  345. {
  346. *default_port = 2003;
  347. *brc = process_graphite_response;
  348. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  349. *brf = backends_format_dimension_collected_graphite_plaintext;
  350. else
  351. *brf = backends_format_dimension_stored_graphite_plaintext;
  352. }
  353. /**
  354. * Select Type
  355. *
  356. * Select the backend type based in the user input
  357. *
  358. * @param type is the string that defines the backend type
  359. *
  360. * @return It returns the backend id.
  361. */
  362. BACKEND_TYPE backend_select_type(const char *type) {
  363. if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
  364. return BACKEND_TYPE_GRAPHITE;
  365. }
  366. else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
  367. return BACKEND_TYPE_OPENTSDB_USING_TELNET;
  368. }
  369. else if(!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
  370. return BACKEND_TYPE_OPENTSDB_USING_HTTP;
  371. }
  372. else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
  373. return BACKEND_TYPE_JSON;
  374. }
  375. else if (!strcmp(type, "prometheus_remote_write")) {
  376. return BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE;
  377. }
  378. else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
  379. return BACKEND_TYPE_KINESIS;
  380. }
  381. else if (!strcmp(type, "mongodb") || !strcmp(type, "mongodb:plaintext")) {
  382. return BACKEND_TYPE_MONGODB;
  383. }
  384. return BACKEND_TYPE_UNKNOWN;
  385. }
  386. /**
  387. * Backend main
  388. *
  389. * The main thread used to control the backends.
  390. *
  391. * @param ptr a pointer to netdata_static_structure.
  392. *
  393. * @return It always return NULL.
  394. */
  395. void *backends_main(void *ptr) {
  396. netdata_thread_cleanup_push(backends_main_cleanup, ptr);
  397. int default_port = 0;
  398. int sock = -1;
  399. BUFFER *b = buffer_create(1), *response = buffer_create(1);
  400. int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL;
  401. int (*backend_response_checker)(BUFFER *) = NULL;
  402. #if HAVE_KINESIS
  403. int do_kinesis = 0;
  404. char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
  405. #endif
  406. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  407. int do_prometheus_remote_write = 0;
  408. BUFFER *http_request_header = NULL;
  409. #endif
  410. #if HAVE_MONGOC
  411. int do_mongodb = 0;
  412. char *mongodb_uri = NULL;
  413. char *mongodb_database = NULL;
  414. char *mongodb_collection = NULL;
  415. // set the default socket timeout in ms
  416. int32_t mongodb_default_socket_timeout = (int32_t)(global_backend_update_every >= 2)?(global_backend_update_every * MSEC_PER_SEC - 500):1000;
  417. #endif
  418. #ifdef ENABLE_HTTPS
  419. struct netdata_ssl opentsdb_ssl = {NULL , NETDATA_SSL_START};
  420. #endif
  421. // ------------------------------------------------------------------------
  422. // collect configuration options
  423. struct timeval timeout = {
  424. .tv_sec = 0,
  425. .tv_usec = 0
  426. };
  427. int enabled = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
  428. const char *source = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
  429. const char *type = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
  430. const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
  431. global_backend_prefix = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
  432. const char *hostname = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
  433. global_backend_update_every = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", global_backend_update_every);
  434. int buffer_on_failures = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
  435. long timeoutms = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", global_backend_update_every * 2 * 1000);
  436. if(config_get_boolean(CONFIG_SECTION_BACKEND, "send names instead of ids", (global_backend_options & BACKEND_OPTION_SEND_NAMES)))
  437. global_backend_options |= BACKEND_OPTION_SEND_NAMES;
  438. else
  439. global_backend_options &= ~BACKEND_OPTION_SEND_NAMES;
  440. charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
  441. hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
  442. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  443. const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive");
  444. #endif
  445. // ------------------------------------------------------------------------
  446. // validate configuration options
  447. // and prepare for sending data to our backend
  448. global_backend_options = backend_parse_data_source(source, global_backend_options);
  449. global_backend_source = source;
  450. if(timeoutms < 1) {
  451. error("BACKEND: invalid timeout %ld ms given. Assuming %d ms.", timeoutms, global_backend_update_every * 2 * 1000);
  452. timeoutms = global_backend_update_every * 2 * 1000;
  453. }
  454. timeout.tv_sec = (timeoutms * 1000) / 1000000;
  455. timeout.tv_usec = (timeoutms * 1000) % 1000000;
  456. if(!enabled || global_backend_update_every < 1)
  457. goto cleanup;
  458. // ------------------------------------------------------------------------
  459. // select the backend type
  460. BACKEND_TYPE work_type = backend_select_type(type);
  461. if (work_type == BACKEND_TYPE_UNKNOWN) {
  462. error("BACKEND: Unknown backend type '%s'", type);
  463. goto cleanup;
  464. }
  465. switch (work_type) {
  466. case BACKEND_TYPE_OPENTSDB_USING_HTTP: {
  467. #ifdef ENABLE_HTTPS
  468. if (!strcmp(type, "opentsdb:https")) {
  469. security_start_ssl(NETDATA_SSL_CONTEXT_EXPORTING);
  470. }
  471. #endif
  472. backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  473. break;
  474. }
  475. case BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE: {
  476. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  477. do_prometheus_remote_write = 1;
  478. http_request_header = buffer_create(1);
  479. backends_init_write_request();
  480. #else
  481. error("BACKEND: Prometheus remote write support isn't compiled");
  482. #endif // ENABLE_PROMETHEUS_REMOTE_WRITE
  483. backend_set_prometheus_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  484. break;
  485. }
  486. case BACKEND_TYPE_KINESIS: {
  487. #if HAVE_KINESIS
  488. do_kinesis = 1;
  489. if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
  490. error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
  491. goto cleanup;
  492. }
  493. backends_kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
  494. #else
  495. error("BACKEND: AWS Kinesis support isn't compiled");
  496. #endif // HAVE_KINESIS
  497. backend_set_kinesis_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  498. break;
  499. }
  500. case BACKEND_TYPE_MONGODB: {
  501. #if HAVE_MONGOC
  502. if(unlikely(read_mongodb_conf(netdata_configured_user_config_dir,
  503. &mongodb_uri,
  504. &mongodb_database,
  505. &mongodb_collection))) {
  506. error("BACKEND: mongodb backend type is set but cannot read its configuration from %s/mongodb.conf",
  507. netdata_configured_user_config_dir);
  508. goto cleanup;
  509. }
  510. if(likely(!backends_mongodb_init(mongodb_uri, mongodb_database, mongodb_collection, mongodb_default_socket_timeout))) {
  511. backend_set_mongodb_variables(&default_port, &backend_response_checker, &backend_request_formatter);
  512. do_mongodb = 1;
  513. }
  514. else {
  515. error("BACKEND: cannot initialize MongoDB backend");
  516. goto cleanup;
  517. }
  518. #else
  519. error("BACKEND: MongoDB support isn't compiled");
  520. #endif // HAVE_MONGOC
  521. break;
  522. }
  523. case BACKEND_TYPE_GRAPHITE: {
  524. backend_set_graphite_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  525. break;
  526. }
  527. case BACKEND_TYPE_OPENTSDB_USING_TELNET: {
  528. backend_set_opentsdb_telnet_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  529. break;
  530. }
  531. case BACKEND_TYPE_JSON: {
  532. backend_set_json_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  533. break;
  534. }
  535. case BACKEND_TYPE_UNKNOWN: {
  536. break;
  537. }
  538. default: {
  539. break;
  540. }
  541. }
  542. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  543. if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) {
  544. #else
  545. if(backend_request_formatter == NULL || backend_response_checker == NULL) {
  546. #endif
  547. error("BACKEND: backend is misconfigured - disabling it.");
  548. goto cleanup;
  549. }
  550. // ------------------------------------------------------------------------
  551. // prepare the charts for monitoring the backend operation
  552. struct rusage thread;
  553. collected_number
  554. chart_buffered_metrics = 0,
  555. chart_lost_metrics = 0,
  556. chart_sent_metrics = 0,
  557. chart_buffered_bytes = 0,
  558. chart_received_bytes = 0,
  559. chart_sent_bytes = 0,
  560. chart_receptions = 0,
  561. chart_transmission_successes = 0,
  562. chart_transmission_failures = 0,
  563. chart_data_lost_events = 0,
  564. chart_lost_bytes = 0,
  565. chart_backend_reconnects = 0;
  566. // chart_backend_latency = 0;
  567. RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", "backends", NULL, 130600, global_backend_update_every, RRDSET_TYPE_LINE);
  568. rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  569. rrddim_add(chart_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  570. rrddim_add(chart_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  571. RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KiB", "backends", NULL, 130610, global_backend_update_every, RRDSET_TYPE_AREA);
  572. rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  573. rrddim_add(chart_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  574. rrddim_add(chart_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  575. rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  576. RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", "backends", NULL, 130630, global_backend_update_every, RRDSET_TYPE_LINE);
  577. rrddim_add(chart_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  578. rrddim_add(chart_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  579. rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  580. rrddim_add(chart_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  581. rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  582. /*
  583. * this is misleading - we can only measure the time we need to send data
  584. * this time is not related to the time required for the data to travel to
  585. * the backend database and the time that server needed to process them
  586. *
  587. * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
  588. *
  589. RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", "backends", NULL, 130620, global_backend_update_every, RRDSET_TYPE_AREA);
  590. rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
  591. */
  592. RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", "backends", NULL, 130630, global_backend_update_every, RRDSET_TYPE_STACKED);
  593. rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  594. rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  595. // ------------------------------------------------------------------------
  596. // prepare the backend main loop
  597. info("BACKEND: configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, global_backend_update_every, hostname, global_backend_prefix);
  598. send_statistics("BACKEND_START", "OK", type);
  599. usec_t step_ut = global_backend_update_every * USEC_PER_SEC;
  600. time_t after = now_realtime_sec();
  601. int failures = 0;
  602. heartbeat_t hb;
  603. heartbeat_init(&hb);
  604. while(!netdata_exit) {
  605. // ------------------------------------------------------------------------
  606. // Wait for the next iteration point.
  607. heartbeat_next(&hb, step_ut);
  608. time_t before = now_realtime_sec();
  609. debug(D_BACKEND, "BACKEND: preparing buffer for timeframe %lu to %lu", (unsigned long)after, (unsigned long)before);
  610. // ------------------------------------------------------------------------
  611. // add to the buffer the data we need to send to the backend
  612. netdata_thread_disable_cancelability();
  613. size_t count_hosts = 0;
  614. size_t count_charts_total = 0;
  615. size_t count_dims_total = 0;
  616. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  617. if(do_prometheus_remote_write)
  618. backends_clear_write_request();
  619. #endif
  620. rrd_rdlock();
  621. RRDHOST *host;
  622. rrdhost_foreach_read(host) {
  623. if(unlikely(!rrdhost_flag_check(host, RRDHOST_FLAG_BACKEND_SEND|RRDHOST_FLAG_BACKEND_DONT_SEND))) {
  624. char *name = (host == localhost)?"localhost":host->hostname;
  625. if (!hosts_pattern || simple_pattern_matches(hosts_pattern, name)) {
  626. rrdhost_flag_set(host, RRDHOST_FLAG_BACKEND_SEND);
  627. info("enabled backend for host '%s'", name);
  628. }
  629. else {
  630. rrdhost_flag_set(host, RRDHOST_FLAG_BACKEND_DONT_SEND);
  631. info("disabled backend for host '%s'", name);
  632. }
  633. }
  634. if(unlikely(!rrdhost_flag_check(host, RRDHOST_FLAG_BACKEND_SEND)))
  635. continue;
  636. rrdhost_rdlock(host);
  637. count_hosts++;
  638. size_t count_charts = 0;
  639. size_t count_dims = 0;
  640. size_t count_dims_skipped = 0;
  641. const char *__hostname = (host == localhost)?hostname:host->hostname;
  642. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  643. if(do_prometheus_remote_write) {
  644. backends_rrd_stats_remote_write_allmetrics_prometheus(
  645. host
  646. , __hostname
  647. , global_backend_prefix
  648. , global_backend_options
  649. , after
  650. , before
  651. , &count_charts
  652. , &count_dims
  653. , &count_dims_skipped
  654. );
  655. chart_buffered_metrics += count_dims;
  656. }
  657. else
  658. #endif
  659. {
  660. RRDSET *st;
  661. rrdset_foreach_read(st, host) {
  662. if(likely(backends_can_send_rrdset(global_backend_options, st))) {
  663. rrdset_rdlock(st);
  664. count_charts++;
  665. RRDDIM *rd;
  666. rrddim_foreach_read(rd, st) {
  667. if (likely(rd->last_collected_time.tv_sec >= after)) {
  668. chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
  669. count_dims++;
  670. }
  671. else {
  672. debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
  673. count_dims_skipped++;
  674. }
  675. }
  676. rrdset_unlock(st);
  677. }
  678. }
  679. }
  680. debug(D_BACKEND, "BACKEND: sending host '%s', metrics of %zu dimensions, of %zu charts. Skipped %zu dimensions.", __hostname, count_dims, count_charts, count_dims_skipped);
  681. count_charts_total += count_charts;
  682. count_dims_total += count_dims;
  683. rrdhost_unlock(host);
  684. }
  685. rrd_unlock();
  686. netdata_thread_enable_cancelability();
  687. debug(D_BACKEND, "BACKEND: buffer has %zu bytes, added metrics for %zu dimensions, of %zu charts, from %zu hosts", buffer_strlen(b), count_dims_total, count_charts_total, count_hosts);
  688. // ------------------------------------------------------------------------
  689. chart_buffered_bytes = (collected_number)buffer_strlen(b);
  690. // reset the monitoring chart counters
  691. chart_received_bytes =
  692. chart_sent_bytes =
  693. chart_sent_metrics =
  694. chart_lost_metrics =
  695. chart_receptions =
  696. chart_transmission_successes =
  697. chart_transmission_failures =
  698. chart_data_lost_events =
  699. chart_lost_bytes =
  700. chart_backend_reconnects = 0;
  701. // chart_backend_latency = 0;
  702. if(unlikely(netdata_exit)) break;
  703. //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b));
  704. //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
  705. // prepare for the next iteration
  706. // to add incrementally data to buffer
  707. after = before;
  708. #if HAVE_KINESIS
  709. if(do_kinesis) {
  710. unsigned long long partition_key_seq = 0;
  711. size_t buffer_len = buffer_strlen(b);
  712. size_t sent = 0;
  713. while(sent < buffer_len) {
  714. char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
  715. snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
  716. size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
  717. const char *first_char = buffer_tostring(b) + sent;
  718. size_t record_len = 0;
  719. // split buffer into chunks of maximum allowed size
  720. if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
  721. record_len = buffer_len - sent;
  722. }
  723. else {
  724. record_len = KINESIS_RECORD_MAX - partition_key_len;
  725. while(*(first_char + record_len) != '\n' && record_len) record_len--;
  726. }
  727. char error_message[ERROR_LINE_MAX + 1] = "";
  728. debug(D_BACKEND, "BACKEND: backends_kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
  729. buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
  730. partition_key, buffer_len, record_len);
  731. backends_kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
  732. sent += record_len;
  733. chart_transmission_successes++;
  734. size_t sent_bytes = 0, lost_bytes = 0;
  735. if(unlikely(backends_kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
  736. // oops! we couldn't send (all or some of the) data
  737. error("BACKEND: %s", error_message);
  738. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  739. destination, sent_bytes, sent_bytes - lost_bytes);
  740. chart_transmission_failures++;
  741. chart_data_lost_events++;
  742. chart_lost_bytes += lost_bytes;
  743. // estimate the number of lost metrics
  744. chart_lost_metrics += (collected_number)(chart_buffered_metrics
  745. * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
  746. break;
  747. }
  748. else {
  749. chart_receptions++;
  750. }
  751. if(unlikely(netdata_exit)) break;
  752. }
  753. chart_sent_bytes += sent;
  754. if(likely(sent == buffer_len))
  755. chart_sent_metrics = chart_buffered_metrics;
  756. buffer_flush(b);
  757. } else
  758. #endif /* HAVE_KINESIS */
  759. #if HAVE_MONGOC
  760. if(do_mongodb) {
  761. size_t buffer_len = buffer_strlen(b);
  762. size_t sent = 0;
  763. while(sent < buffer_len) {
  764. const char *first_char = buffer_tostring(b);
  765. debug(D_BACKEND, "BACKEND: backends_mongodb_insert(): uri = %s, database = %s, collection = %s, \
  766. buffer = %zu", mongodb_uri, mongodb_database, mongodb_collection, buffer_len);
  767. if(likely(!backends_mongodb_insert((char *)first_char, (size_t)chart_buffered_metrics))) {
  768. sent += buffer_len;
  769. chart_transmission_successes++;
  770. chart_receptions++;
  771. }
  772. else {
  773. // oops! we couldn't send (all or some of the) data
  774. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  775. mongodb_uri, buffer_len, 0UL);
  776. chart_transmission_failures++;
  777. chart_data_lost_events++;
  778. chart_lost_bytes += buffer_len;
  779. // estimate the number of lost metrics
  780. chart_lost_metrics += (collected_number)chart_buffered_metrics;
  781. break;
  782. }
  783. if(unlikely(netdata_exit)) break;
  784. }
  785. chart_sent_bytes += sent;
  786. if(likely(sent == buffer_len))
  787. chart_sent_metrics = chart_buffered_metrics;
  788. buffer_flush(b);
  789. } else
  790. #endif /* HAVE_MONGOC */
  791. {
  792. // ------------------------------------------------------------------------
  793. // if we are connected, receive a response, without blocking
  794. if(likely(sock != -1)) {
  795. errno = 0;
  796. // loop through to collect all data
  797. while(sock != -1 && errno != EWOULDBLOCK) {
  798. buffer_need_bytes(response, 4096);
  799. ssize_t r;
  800. #ifdef ENABLE_HTTPS
  801. if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
  802. r = SSL_read(opentsdb_ssl.conn, &response->buffer[response->len], response->size - response->len);
  803. } else {
  804. r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
  805. }
  806. #else
  807. r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
  808. #endif
  809. if(likely(r > 0)) {
  810. // we received some data
  811. response->len += r;
  812. chart_received_bytes += r;
  813. chart_receptions++;
  814. }
  815. else if(r == 0) {
  816. error("BACKEND: '%s' closed the socket", destination);
  817. close(sock);
  818. sock = -1;
  819. }
  820. else {
  821. // failed to receive data
  822. if(errno != EAGAIN && errno != EWOULDBLOCK) {
  823. error("BACKEND: cannot receive data from backend '%s'.", destination);
  824. }
  825. }
  826. }
  827. // if we received data, process them
  828. if(buffer_strlen(response))
  829. backend_response_checker(response);
  830. }
  831. // ------------------------------------------------------------------------
  832. // if we are not connected, connect to a backend server
  833. if(unlikely(sock == -1)) {
  834. // usec_t start_ut = now_monotonic_usec();
  835. size_t reconnects = 0;
  836. sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
  837. #ifdef ENABLE_HTTPS
  838. if(sock != -1) {
  839. if(netdata_exporting_ctx) {
  840. if(!opentsdb_ssl.conn) {
  841. opentsdb_ssl.conn = SSL_new(netdata_exporting_ctx);
  842. if(!opentsdb_ssl.conn) {
  843. error("Failed to allocate SSL structure %d.", sock);
  844. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  845. }
  846. } else {
  847. SSL_clear(opentsdb_ssl.conn);
  848. }
  849. }
  850. if(opentsdb_ssl.conn) {
  851. if(SSL_set_fd(opentsdb_ssl.conn, sock) != 1) {
  852. error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
  853. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  854. } else {
  855. opentsdb_ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
  856. SSL_set_connect_state(opentsdb_ssl.conn);
  857. int err = SSL_connect(opentsdb_ssl.conn);
  858. if (err != 1) {
  859. err = SSL_get_error(opentsdb_ssl.conn, err);
  860. error("SSL cannot connect with the server: %s ", ERR_error_string((long)SSL_get_error(opentsdb_ssl.conn, err), NULL));
  861. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  862. } //TODO: check certificate here
  863. }
  864. }
  865. }
  866. #endif
  867. chart_backend_reconnects += reconnects;
  868. // chart_backend_latency += now_monotonic_usec() - start_ut;
  869. }
  870. if(unlikely(netdata_exit)) break;
  871. // ------------------------------------------------------------------------
  872. // if we are connected, send our buffer to the backend server
  873. if(likely(sock != -1)) {
  874. size_t len = buffer_strlen(b);
  875. // usec_t start_ut = now_monotonic_usec();
  876. int flags = 0;
  877. #ifdef MSG_NOSIGNAL
  878. flags += MSG_NOSIGNAL;
  879. #endif
  880. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  881. if(do_prometheus_remote_write) {
  882. size_t data_size = backends_get_write_request_size();
  883. if(unlikely(!data_size)) {
  884. error("BACKEND: write request size is out of range");
  885. continue;
  886. }
  887. buffer_flush(b);
  888. buffer_need_bytes(b, data_size);
  889. if(unlikely(backends_pack_write_request(b->buffer, &data_size))) {
  890. error("BACKEND: cannot pack write request");
  891. continue;
  892. }
  893. b->len = data_size;
  894. chart_buffered_bytes = (collected_number)buffer_strlen(b);
  895. buffer_flush(http_request_header);
  896. buffer_sprintf(http_request_header,
  897. "POST %s HTTP/1.1\r\n"
  898. "Host: %s\r\n"
  899. "Accept: */*\r\n"
  900. "Content-Length: %zu\r\n"
  901. "Content-Type: application/x-www-form-urlencoded\r\n\r\n",
  902. remote_write_path,
  903. destination,
  904. data_size
  905. );
  906. len = buffer_strlen(http_request_header);
  907. send(sock, buffer_tostring(http_request_header), len, flags);
  908. len = data_size;
  909. }
  910. #endif
  911. ssize_t written;
  912. #ifdef ENABLE_HTTPS
  913. if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
  914. written = SSL_write(opentsdb_ssl.conn, buffer_tostring(b), len);
  915. } else {
  916. written = send(sock, buffer_tostring(b), len, flags);
  917. }
  918. #else
  919. written = send(sock, buffer_tostring(b), len, flags);
  920. #endif
  921. // chart_backend_latency += now_monotonic_usec() - start_ut;
  922. if(written != -1 && (size_t)written == len) {
  923. // we sent the data successfully
  924. chart_transmission_successes++;
  925. chart_sent_bytes += written;
  926. chart_sent_metrics = chart_buffered_metrics;
  927. // reset the failures count
  928. failures = 0;
  929. // empty the buffer
  930. buffer_flush(b);
  931. }
  932. else {
  933. // oops! we couldn't send (all or some of the) data
  934. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
  935. chart_transmission_failures++;
  936. if(written != -1)
  937. chart_sent_bytes += written;
  938. // increment the counter we check for data loss
  939. failures++;
  940. // close the socket - we will re-open it next time
  941. close(sock);
  942. sock = -1;
  943. }
  944. }
  945. else {
  946. error("BACKEND: failed to update database backend '%s'", destination);
  947. chart_transmission_failures++;
  948. // increment the counter we check for data loss
  949. failures++;
  950. }
  951. }
  952. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  953. if(do_prometheus_remote_write && failures) {
  954. (void) buffer_on_failures;
  955. failures = 0;
  956. chart_lost_bytes = chart_buffered_bytes = backends_get_write_request_size(); // estimated write request size
  957. chart_data_lost_events++;
  958. chart_lost_metrics = chart_buffered_metrics;
  959. } else
  960. #endif
  961. if(failures > buffer_on_failures) {
  962. // too bad! we are going to lose data
  963. chart_lost_bytes += buffer_strlen(b);
  964. error("BACKEND: reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
  965. buffer_flush(b);
  966. failures = 0;
  967. chart_data_lost_events++;
  968. chart_lost_metrics = chart_buffered_metrics;
  969. }
  970. if(unlikely(netdata_exit)) break;
  971. // ------------------------------------------------------------------------
  972. // update the monitoring charts
  973. if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
  974. rrddim_set(chart_ops, "read", chart_receptions);
  975. rrddim_set(chart_ops, "write", chart_transmission_successes);
  976. rrddim_set(chart_ops, "discard", chart_data_lost_events);
  977. rrddim_set(chart_ops, "failure", chart_transmission_failures);
  978. rrddim_set(chart_ops, "reconnect", chart_backend_reconnects);
  979. rrdset_done(chart_ops);
  980. if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
  981. rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
  982. rrddim_set(chart_metrics, "lost", chart_lost_metrics);
  983. rrddim_set(chart_metrics, "sent", chart_sent_metrics);
  984. rrdset_done(chart_metrics);
  985. if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
  986. rrddim_set(chart_bytes, "buffered", chart_buffered_bytes);
  987. rrddim_set(chart_bytes, "lost", chart_lost_bytes);
  988. rrddim_set(chart_bytes, "sent", chart_sent_bytes);
  989. rrddim_set(chart_bytes, "received", chart_received_bytes);
  990. rrdset_done(chart_bytes);
  991. /*
  992. if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
  993. rrddim_set(chart_latency, "latency", chart_backend_latency);
  994. rrdset_done(chart_latency);
  995. */
  996. getrusage(RUSAGE_THREAD, &thread);
  997. if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
  998. rrddim_set(chart_rusage, "user", thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
  999. rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
  1000. rrdset_done(chart_rusage);
  1001. if(likely(buffer_strlen(b) == 0))
  1002. chart_buffered_metrics = 0;
  1003. if(unlikely(netdata_exit)) break;
  1004. }
  1005. cleanup:
  1006. #if HAVE_KINESIS
  1007. if(do_kinesis) {
  1008. backends_kinesis_shutdown();
  1009. freez(kinesis_auth_key_id);
  1010. freez(kinesis_secure_key);
  1011. freez(kinesis_stream_name);
  1012. }
  1013. #endif
  1014. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  1015. buffer_free(http_request_header);
  1016. if(do_prometheus_remote_write)
  1017. backends_protocol_buffers_shutdown();
  1018. #endif
  1019. #if HAVE_MONGOC
  1020. if(do_mongodb) {
  1021. backends_mongodb_cleanup();
  1022. freez(mongodb_uri);
  1023. freez(mongodb_database);
  1024. freez(mongodb_collection);
  1025. }
  1026. #endif
  1027. if(sock != -1)
  1028. close(sock);
  1029. buffer_free(b);
  1030. buffer_free(response);
  1031. #ifdef ENABLE_HTTPS
  1032. if(netdata_exporting_ctx) {
  1033. if(opentsdb_ssl.conn) {
  1034. SSL_free(opentsdb_ssl.conn);
  1035. }
  1036. }
  1037. #endif
  1038. netdata_thread_cleanup_pop(1);
  1039. return NULL;
  1040. }