backends.c 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define BACKENDS_INTERNALS
  3. #include "backends.h"
  4. // ----------------------------------------------------------------------------
  5. // How backends work in netdata:
  6. //
  7. // 1. There is an independent thread that runs at the required interval
  8. // (for example, once every 10 seconds)
  9. //
  10. // 2. Every time it wakes, it calls the backend formatting functions to build
  11. // a buffer of data. This is a very fast, memory only operation.
  12. //
  13. // 3. If the buffer already includes data, the new data are appended.
  14. // If the buffer becomes too big, because the data cannot be sent, a
  15. // log is written and the buffer is discarded.
  16. //
  17. // 4. Then it tries to send all the data. It blocks until all the data are sent
  18. // or the socket returns an error.
  19. // If the time required for this is above the interval, it starts skipping
  20. // intervals, but the calculated values include the entire database, without
  21. // gaps (it remembers the timestamps and continues from where it stopped).
  22. //
  23. // 5. repeats the above forever.
  24. //
  25. const char *global_backend_prefix = "netdata";
  26. int global_backend_update_every = 10;
  27. BACKEND_OPTIONS global_backend_options = BACKEND_SOURCE_DATA_AVERAGE | BACKEND_OPTION_SEND_NAMES;
  28. // ----------------------------------------------------------------------------
  29. // helper functions for backends
  30. size_t backend_name_copy(char *d, const char *s, size_t usable) {
  31. size_t n;
  32. for(n = 0; *s && n < usable ; d++, s++, n++) {
  33. char c = *s;
  34. if(c != '.' && !isalnum(c)) *d = '_';
  35. else *d = c;
  36. }
  37. *d = '\0';
  38. return n;
  39. }
  40. // calculate the SUM or AVERAGE of a dimension, for any timeframe
  41. // may return NAN if the database does not have any value in the give timeframe
  42. calculated_number backend_calculate_value_from_stored_data(
  43. RRDSET *st // the chart
  44. , RRDDIM *rd // the dimension
  45. , time_t after // the start timestamp
  46. , time_t before // the end timestamp
  47. , BACKEND_OPTIONS backend_options // BACKEND_SOURCE_* bitmap
  48. , time_t *first_timestamp // the first point of the database used in this response
  49. , time_t *last_timestamp // the timestamp that should be reported to backend
  50. ) {
  51. RRDHOST *host = st->rrdhost;
  52. (void)host;
  53. // find the edges of the rrd database for this chart
  54. time_t first_t = rd->state->query_ops.oldest_time(rd);
  55. time_t last_t = rd->state->query_ops.latest_time(rd);
  56. time_t update_every = st->update_every;
  57. struct rrddim_query_handle handle;
  58. storage_number n;
  59. // step back a little, to make sure we have complete data collection
  60. // for all metrics
  61. after -= update_every * 2;
  62. before -= update_every * 2;
  63. // align the time-frame
  64. after = after - (after % update_every);
  65. before = before - (before % update_every);
  66. // for before, loose another iteration
  67. // the latest point will be reported the next time
  68. before -= update_every;
  69. if(unlikely(after > before))
  70. // this can happen when update_every > before - after
  71. after = before;
  72. if(unlikely(after < first_t))
  73. after = first_t;
  74. if(unlikely(before > last_t))
  75. before = last_t;
  76. if(unlikely(before < first_t || after > last_t)) {
  77. // the chart has not been updated in the wanted timeframe
  78. debug(D_BACKEND, "BACKEND: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
  79. host->hostname, st->id, rd->id,
  80. (unsigned long)after, (unsigned long)before,
  81. (unsigned long)first_t, (unsigned long)last_t
  82. );
  83. return NAN;
  84. }
  85. *first_timestamp = after;
  86. *last_timestamp = before;
  87. size_t counter = 0;
  88. calculated_number sum = 0;
  89. /*
  90. long start_at_slot = rrdset_time2slot(st, before),
  91. stop_at_slot = rrdset_time2slot(st, after),
  92. slot, stop_now = 0;
  93. for(slot = start_at_slot; !stop_now ; slot--) {
  94. if(unlikely(slot < 0)) slot = st->entries - 1;
  95. if(unlikely(slot == stop_at_slot)) stop_now = 1;
  96. storage_number n = rd->values[slot];
  97. if(unlikely(!does_storage_number_exist(n))) {
  98. // not collected
  99. continue;
  100. }
  101. calculated_number value = unpack_storage_number(n);
  102. sum += value;
  103. counter++;
  104. }
  105. */
  106. for(rd->state->query_ops.init(rd, &handle, after, before) ; !rd->state->query_ops.is_finished(&handle) ; ) {
  107. time_t curr_t;
  108. n = rd->state->query_ops.next_metric(&handle, &curr_t);
  109. if(unlikely(!does_storage_number_exist(n))) {
  110. // not collected
  111. continue;
  112. }
  113. calculated_number value = unpack_storage_number(n);
  114. sum += value;
  115. counter++;
  116. }
  117. rd->state->query_ops.finalize(&handle);
  118. if(unlikely(!counter)) {
  119. debug(D_BACKEND, "BACKEND: %s.%s.%s: no values stored in database for range %lu to %lu",
  120. host->hostname, st->id, rd->id,
  121. (unsigned long)after, (unsigned long)before
  122. );
  123. return NAN;
  124. }
  125. if(unlikely(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_SUM))
  126. return sum;
  127. return sum / (calculated_number)counter;
  128. }
  129. // discard a response received by a backend
  130. // after logging a simple of it to error.log
  131. int discard_response(BUFFER *b, const char *backend) {
  132. char sample[1024];
  133. const char *s = buffer_tostring(b);
  134. char *d = sample, *e = &sample[sizeof(sample) - 1];
  135. for(; *s && d < e ;s++) {
  136. char c = *s;
  137. if(unlikely(!isprint(c))) c = ' ';
  138. *d++ = c;
  139. }
  140. *d = '\0';
  141. info("BACKEND: received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample);
  142. buffer_flush(b);
  143. return 0;
  144. }
  145. // ----------------------------------------------------------------------------
  146. // the backend thread
  147. static SIMPLE_PATTERN *charts_pattern = NULL;
  148. static SIMPLE_PATTERN *hosts_pattern = NULL;
  149. inline int backends_can_send_rrdset(BACKEND_OPTIONS backend_options, RRDSET *st) {
  150. RRDHOST *host = st->rrdhost;
  151. (void)host;
  152. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_BACKEND_IGNORE)))
  153. return 0;
  154. if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_BACKEND_SEND))) {
  155. // we have not checked this chart
  156. if(simple_pattern_matches(charts_pattern, st->id) || simple_pattern_matches(charts_pattern, st->name))
  157. rrdset_flag_set(st, RRDSET_FLAG_BACKEND_SEND);
  158. else {
  159. rrdset_flag_set(st, RRDSET_FLAG_BACKEND_IGNORE);
  160. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is disabled for backends.", st->id, host->hostname);
  161. return 0;
  162. }
  163. }
  164. if(unlikely(!rrdset_is_available_for_backends(st))) {
  165. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s', because it is not available for backends.", st->id, host->hostname);
  166. return 0;
  167. }
  168. if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE && !(BACKEND_OPTIONS_DATA_SOURCE(backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED))) {
  169. debug(D_BACKEND, "BACKEND: not sending chart '%s' of host '%s' because its memory mode is '%s' and the backend requires database access.", st->id, host->hostname, rrd_memory_mode_name(host->rrd_memory_mode));
  170. return 0;
  171. }
  172. return 1;
  173. }
  174. inline BACKEND_OPTIONS backend_parse_data_source(const char *source, BACKEND_OPTIONS backend_options) {
  175. if(!strcmp(source, "raw") || !strcmp(source, "as collected") || !strcmp(source, "as-collected") || !strcmp(source, "as_collected") || !strcmp(source, "ascollected")) {
  176. backend_options |= BACKEND_SOURCE_DATA_AS_COLLECTED;
  177. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_AS_COLLECTED);
  178. }
  179. else if(!strcmp(source, "average")) {
  180. backend_options |= BACKEND_SOURCE_DATA_AVERAGE;
  181. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_AVERAGE);
  182. }
  183. else if(!strcmp(source, "sum") || !strcmp(source, "volume")) {
  184. backend_options |= BACKEND_SOURCE_DATA_SUM;
  185. backend_options &= ~(BACKEND_OPTIONS_SOURCE_BITS ^ BACKEND_SOURCE_DATA_SUM);
  186. }
  187. else {
  188. error("BACKEND: invalid data source method '%s'.", source);
  189. }
  190. return backend_options;
  191. }
  192. static void backends_main_cleanup(void *ptr) {
  193. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  194. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  195. info("cleaning up...");
  196. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  197. }
  198. /**
  199. * Set Kinesis variables
  200. *
  201. * Set the variables necessary to work with this specific backend.
  202. *
  203. * @param default_port the default port of the backend
  204. * @param brc function called to check the result.
  205. * @param brf function called to format the message to the backend
  206. */
  207. void backend_set_kinesis_variables(int *default_port,
  208. backend_response_checker_t brc,
  209. backend_request_formatter_t brf)
  210. {
  211. (void)default_port;
  212. #ifndef HAVE_KINESIS
  213. (void)brc;
  214. (void)brf;
  215. #endif
  216. #if HAVE_KINESIS
  217. *brc = process_json_response;
  218. if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  219. *brf = backends_format_dimension_collected_json_plaintext;
  220. else
  221. *brf = backends_format_dimension_stored_json_plaintext;
  222. #endif
  223. }
  224. /**
  225. * Set Prometheus variables
  226. *
  227. * Set the variables necessary to work with this specific backend.
  228. *
  229. * @param default_port the default port of the backend
  230. * @param brc function called to check the result.
  231. * @param brf function called to format the message to the backend
  232. */
  233. void backend_set_prometheus_variables(int *default_port,
  234. backend_response_checker_t brc,
  235. backend_request_formatter_t brf)
  236. {
  237. (void)default_port;
  238. (void)brf;
  239. #ifndef ENABLE_PROMETHEUS_REMOTE_WRITE
  240. (void)brc;
  241. #endif
  242. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  243. *brc = backends_process_prometheus_remote_write_response;
  244. #endif /* ENABLE_PROMETHEUS_REMOTE_WRITE */
  245. }
  246. /**
  247. * Set MongoDB variables
  248. *
  249. * Set the variables necessary to work with this specific backend.
  250. *
  251. * @param default_port the default port of the backend
  252. * @param brc function called to check the result.
  253. * @param brf function called to format the message to the backend
  254. */
  255. void backend_set_mongodb_variables(int *default_port,
  256. backend_response_checker_t brc,
  257. backend_request_formatter_t brf)
  258. {
  259. (void)default_port;
  260. #ifndef HAVE_MONGOC
  261. (void)brc;
  262. (void)brf;
  263. #endif
  264. #if HAVE_MONGOC
  265. *brc = process_json_response;
  266. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  267. *brf = backends_format_dimension_collected_json_plaintext;
  268. else
  269. *brf = backends_format_dimension_stored_json_plaintext;
  270. #endif
  271. }
  272. /**
  273. * Set JSON variables
  274. *
  275. * Set the variables necessary to work with this specific backend.
  276. *
  277. * @param default_port the default port of the backend
  278. * @param brc function called to check the result.
  279. * @param brf function called to format the message to the backend
  280. */
  281. void backend_set_json_variables(int *default_port,
  282. backend_response_checker_t brc,
  283. backend_request_formatter_t brf)
  284. {
  285. *default_port = 5448;
  286. *brc = process_json_response;
  287. if (BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  288. *brf = backends_format_dimension_collected_json_plaintext;
  289. else
  290. *brf = backends_format_dimension_stored_json_plaintext;
  291. }
  292. /**
  293. * Set OpenTSDB HTTP variables
  294. *
  295. * Set the variables necessary to work with this specific backend.
  296. *
  297. * @param default_port the default port of the backend
  298. * @param brc function called to check the result.
  299. * @param brf function called to format the message to the backend
  300. */
  301. void backend_set_opentsdb_http_variables(int *default_port,
  302. backend_response_checker_t brc,
  303. backend_request_formatter_t brf)
  304. {
  305. *default_port = 4242;
  306. *brc = process_opentsdb_response;
  307. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  308. *brf = backends_format_dimension_collected_opentsdb_http;
  309. else
  310. *brf = backends_format_dimension_stored_opentsdb_http;
  311. }
  312. /**
  313. * Set OpenTSDB Telnet variables
  314. *
  315. * Set the variables necessary to work with this specific backend.
  316. *
  317. * @param default_port the default port of the backend
  318. * @param brc function called to check the result.
  319. * @param brf function called to format the message to the backend
  320. */
  321. void backend_set_opentsdb_telnet_variables(int *default_port,
  322. backend_response_checker_t brc,
  323. backend_request_formatter_t brf)
  324. {
  325. *default_port = 4242;
  326. *brc = process_opentsdb_response;
  327. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  328. *brf = backends_format_dimension_collected_opentsdb_telnet;
  329. else
  330. *brf = backends_format_dimension_stored_opentsdb_telnet;
  331. }
  332. /**
  333. * Set Graphite variables
  334. *
  335. * Set the variables necessary to work with this specific backend.
  336. *
  337. * @param default_port the default port of the backend
  338. * @param brc function called to check the result.
  339. * @param brf function called to format the message to the backend
  340. */
  341. void backend_set_graphite_variables(int *default_port,
  342. backend_response_checker_t brc,
  343. backend_request_formatter_t brf)
  344. {
  345. *default_port = 2003;
  346. *brc = process_graphite_response;
  347. if(BACKEND_OPTIONS_DATA_SOURCE(global_backend_options) == BACKEND_SOURCE_DATA_AS_COLLECTED)
  348. *brf = backends_format_dimension_collected_graphite_plaintext;
  349. else
  350. *brf = backends_format_dimension_stored_graphite_plaintext;
  351. }
  352. /**
  353. * Select Type
  354. *
  355. * Select the backedn type based in the user input
  356. *
  357. * @param type is the string that defines the backend type
  358. *
  359. * @return It returns the backend id.
  360. */
  361. BACKEND_TYPE backend_select_type(const char *type) {
  362. if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
  363. return BACKEND_TYPE_GRAPHITE;
  364. }
  365. else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
  366. return BACKEND_TYPE_OPENTSDB_USING_TELNET;
  367. }
  368. else if(!strcmp(type, "opentsdb:http") || !strcmp(type, "opentsdb:https")) {
  369. return BACKEND_TYPE_OPENTSDB_USING_HTTP;
  370. }
  371. else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
  372. return BACKEND_TYPE_JSON;
  373. }
  374. else if (!strcmp(type, "prometheus_remote_write")) {
  375. return BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE;
  376. }
  377. else if (!strcmp(type, "kinesis") || !strcmp(type, "kinesis:plaintext")) {
  378. return BACKEND_TYPE_KINESIS;
  379. }
  380. else if (!strcmp(type, "mongodb") || !strcmp(type, "mongodb:plaintext")) {
  381. return BACKEND_TYPE_MONGODB;
  382. }
  383. return BACKEND_TYPE_UNKNOWN;
  384. }
  385. /**
  386. * Backend main
  387. *
  388. * The main thread used to control the backedns.
  389. *
  390. * @param ptr a pointer to netdata_static_structure.
  391. *
  392. * @return It always return NULL.
  393. */
  394. void *backends_main(void *ptr) {
  395. netdata_thread_cleanup_push(backends_main_cleanup, ptr);
  396. int default_port = 0;
  397. int sock = -1;
  398. BUFFER *b = buffer_create(1), *response = buffer_create(1);
  399. int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, BACKEND_OPTIONS) = NULL;
  400. int (*backend_response_checker)(BUFFER *) = NULL;
  401. #if HAVE_KINESIS
  402. int do_kinesis = 0;
  403. char *kinesis_auth_key_id = NULL, *kinesis_secure_key = NULL, *kinesis_stream_name = NULL;
  404. #endif
  405. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  406. int do_prometheus_remote_write = 0;
  407. BUFFER *http_request_header = NULL;
  408. #endif
  409. #if HAVE_MONGOC
  410. int do_mongodb = 0;
  411. char *mongodb_uri = NULL;
  412. char *mongodb_database = NULL;
  413. char *mongodb_collection = NULL;
  414. // set the default socket timeout in ms
  415. int32_t mongodb_default_socket_timeout = (int32_t)(global_backend_update_every >= 2)?(global_backend_update_every * MSEC_PER_SEC - 500):1000;
  416. #endif
  417. #ifdef ENABLE_HTTPS
  418. struct netdata_ssl opentsdb_ssl = {NULL , NETDATA_SSL_START};
  419. #endif
  420. // ------------------------------------------------------------------------
  421. // collect configuration options
  422. struct timeval timeout = {
  423. .tv_sec = 0,
  424. .tv_usec = 0
  425. };
  426. int enabled = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
  427. const char *source = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
  428. const char *type = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
  429. const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
  430. global_backend_prefix = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
  431. const char *hostname = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
  432. global_backend_update_every = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", global_backend_update_every);
  433. int buffer_on_failures = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
  434. long timeoutms = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", global_backend_update_every * 2 * 1000);
  435. if(config_get_boolean(CONFIG_SECTION_BACKEND, "send names instead of ids", (global_backend_options & BACKEND_OPTION_SEND_NAMES)))
  436. global_backend_options |= BACKEND_OPTION_SEND_NAMES;
  437. else
  438. global_backend_options &= ~BACKEND_OPTION_SEND_NAMES;
  439. charts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send charts matching", "*"), NULL, SIMPLE_PATTERN_EXACT);
  440. hosts_pattern = simple_pattern_create(config_get(CONFIG_SECTION_BACKEND, "send hosts matching", "localhost *"), NULL, SIMPLE_PATTERN_EXACT);
  441. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  442. const char *remote_write_path = config_get(CONFIG_SECTION_BACKEND, "remote write URL path", "/receive");
  443. #endif
  444. // ------------------------------------------------------------------------
  445. // validate configuration options
  446. // and prepare for sending data to our backend
  447. global_backend_options = backend_parse_data_source(source, global_backend_options);
  448. if(timeoutms < 1) {
  449. error("BACKEND: invalid timeout %ld ms given. Assuming %d ms.", timeoutms, global_backend_update_every * 2 * 1000);
  450. timeoutms = global_backend_update_every * 2 * 1000;
  451. }
  452. timeout.tv_sec = (timeoutms * 1000) / 1000000;
  453. timeout.tv_usec = (timeoutms * 1000) % 1000000;
  454. if(!enabled || global_backend_update_every < 1)
  455. goto cleanup;
  456. // ------------------------------------------------------------------------
  457. // select the backend type
  458. BACKEND_TYPE work_type = backend_select_type(type);
  459. if (work_type == BACKEND_TYPE_UNKNOWN) {
  460. error("BACKEND: Unknown backend type '%s'", type);
  461. goto cleanup;
  462. }
  463. switch (work_type) {
  464. case BACKEND_TYPE_OPENTSDB_USING_HTTP: {
  465. #ifdef ENABLE_HTTPS
  466. if (!strcmp(type, "opentsdb:https")) {
  467. security_start_ssl(NETDATA_SSL_CONTEXT_OPENTSDB);
  468. }
  469. #endif
  470. backend_set_opentsdb_http_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  471. break;
  472. }
  473. case BACKEND_TYPE_PROMETHEUS_REMOTE_WRITE: {
  474. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  475. do_prometheus_remote_write = 1;
  476. http_request_header = buffer_create(1);
  477. backends_init_write_request();
  478. #else
  479. error("BACKEND: Prometheus remote write support isn't compiled");
  480. #endif // ENABLE_PROMETHEUS_REMOTE_WRITE
  481. backend_set_prometheus_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  482. break;
  483. }
  484. case BACKEND_TYPE_KINESIS: {
  485. #if HAVE_KINESIS
  486. do_kinesis = 1;
  487. if(unlikely(read_kinesis_conf(netdata_configured_user_config_dir, &kinesis_auth_key_id, &kinesis_secure_key, &kinesis_stream_name))) {
  488. error("BACKEND: kinesis backend type is set but cannot read its configuration from %s/aws_kinesis.conf", netdata_configured_user_config_dir);
  489. goto cleanup;
  490. }
  491. backends_kinesis_init(destination, kinesis_auth_key_id, kinesis_secure_key, timeout.tv_sec * 1000 + timeout.tv_usec / 1000);
  492. #else
  493. error("BACKEND: AWS Kinesis support isn't compiled");
  494. #endif // HAVE_KINESIS
  495. backend_set_kinesis_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  496. break;
  497. }
  498. case BACKEND_TYPE_MONGODB: {
  499. #if HAVE_MONGOC
  500. if(unlikely(read_mongodb_conf(netdata_configured_user_config_dir,
  501. &mongodb_uri,
  502. &mongodb_database,
  503. &mongodb_collection))) {
  504. error("BACKEND: mongodb backend type is set but cannot read its configuration from %s/mongodb.conf",
  505. netdata_configured_user_config_dir);
  506. goto cleanup;
  507. }
  508. if(likely(!backends_mongodb_init(mongodb_uri, mongodb_database, mongodb_collection, mongodb_default_socket_timeout))) {
  509. backend_set_mongodb_variables(&default_port, &backend_response_checker, &backend_request_formatter);
  510. do_mongodb = 1;
  511. }
  512. else {
  513. error("BACKEND: cannot initialize MongoDB backend");
  514. goto cleanup;
  515. }
  516. #else
  517. error("BACKEND: MongoDB support isn't compiled");
  518. #endif // HAVE_MONGOC
  519. break;
  520. }
  521. case BACKEND_TYPE_GRAPHITE: {
  522. backend_set_graphite_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  523. break;
  524. }
  525. case BACKEND_TYPE_OPENTSDB_USING_TELNET: {
  526. backend_set_opentsdb_telnet_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  527. break;
  528. }
  529. case BACKEND_TYPE_JSON: {
  530. backend_set_json_variables(&default_port,&backend_response_checker,&backend_request_formatter);
  531. break;
  532. }
  533. case BACKEND_TYPE_UNKNOWN: {
  534. break;
  535. }
  536. default: {
  537. break;
  538. }
  539. }
  540. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  541. if((backend_request_formatter == NULL && !do_prometheus_remote_write) || backend_response_checker == NULL) {
  542. #else
  543. if(backend_request_formatter == NULL || backend_response_checker == NULL) {
  544. #endif
  545. error("BACKEND: backend is misconfigured - disabling it.");
  546. goto cleanup;
  547. }
  548. // ------------------------------------------------------------------------
  549. // prepare the charts for monitoring the backend operation
  550. struct rusage thread;
  551. collected_number
  552. chart_buffered_metrics = 0,
  553. chart_lost_metrics = 0,
  554. chart_sent_metrics = 0,
  555. chart_buffered_bytes = 0,
  556. chart_received_bytes = 0,
  557. chart_sent_bytes = 0,
  558. chart_receptions = 0,
  559. chart_transmission_successes = 0,
  560. chart_transmission_failures = 0,
  561. chart_data_lost_events = 0,
  562. chart_lost_bytes = 0,
  563. chart_backend_reconnects = 0;
  564. // chart_backend_latency = 0;
  565. RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", "backends", NULL, 130600, global_backend_update_every, RRDSET_TYPE_LINE);
  566. rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  567. rrddim_add(chart_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  568. rrddim_add(chart_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  569. RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KiB", "backends", NULL, 130610, global_backend_update_every, RRDSET_TYPE_AREA);
  570. rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  571. rrddim_add(chart_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  572. rrddim_add(chart_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  573. rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  574. RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", "backends", NULL, 130630, global_backend_update_every, RRDSET_TYPE_LINE);
  575. rrddim_add(chart_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  576. rrddim_add(chart_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  577. rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  578. rrddim_add(chart_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  579. rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  580. /*
  581. * this is misleading - we can only measure the time we need to send data
  582. * this time is not related to the time required for the data to travel to
  583. * the backend database and the time that server needed to process them
  584. *
  585. * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
  586. *
  587. RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", "backends", NULL, 130620, global_backend_update_every, RRDSET_TYPE_AREA);
  588. rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
  589. */
  590. RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", "backends", NULL, 130630, global_backend_update_every, RRDSET_TYPE_STACKED);
  591. rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  592. rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  593. // ------------------------------------------------------------------------
  594. // prepare the backend main loop
  595. info("BACKEND: configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, global_backend_update_every, hostname, global_backend_prefix);
  596. send_statistics("BACKEND_START", "OK", type);
  597. usec_t step_ut = global_backend_update_every * USEC_PER_SEC;
  598. time_t after = now_realtime_sec();
  599. int failures = 0;
  600. heartbeat_t hb;
  601. heartbeat_init(&hb);
  602. while(!netdata_exit) {
  603. // ------------------------------------------------------------------------
  604. // Wait for the next iteration point.
  605. heartbeat_next(&hb, step_ut);
  606. time_t before = now_realtime_sec();
  607. debug(D_BACKEND, "BACKEND: preparing buffer for timeframe %lu to %lu", (unsigned long)after, (unsigned long)before);
  608. // ------------------------------------------------------------------------
  609. // add to the buffer the data we need to send to the backend
  610. netdata_thread_disable_cancelability();
  611. size_t count_hosts = 0;
  612. size_t count_charts_total = 0;
  613. size_t count_dims_total = 0;
  614. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  615. if(do_prometheus_remote_write)
  616. backends_clear_write_request();
  617. #endif
  618. rrd_rdlock();
  619. RRDHOST *host;
  620. rrdhost_foreach_read(host) {
  621. if(unlikely(!rrdhost_flag_check(host, RRDHOST_FLAG_BACKEND_SEND|RRDHOST_FLAG_BACKEND_DONT_SEND))) {
  622. char *name = (host == localhost)?"localhost":host->hostname;
  623. if (!hosts_pattern || simple_pattern_matches(hosts_pattern, name)) {
  624. rrdhost_flag_set(host, RRDHOST_FLAG_BACKEND_SEND);
  625. info("enabled backend for host '%s'", name);
  626. }
  627. else {
  628. rrdhost_flag_set(host, RRDHOST_FLAG_BACKEND_DONT_SEND);
  629. info("disabled backend for host '%s'", name);
  630. }
  631. }
  632. if(unlikely(!rrdhost_flag_check(host, RRDHOST_FLAG_BACKEND_SEND)))
  633. continue;
  634. rrdhost_rdlock(host);
  635. count_hosts++;
  636. size_t count_charts = 0;
  637. size_t count_dims = 0;
  638. size_t count_dims_skipped = 0;
  639. const char *__hostname = (host == localhost)?hostname:host->hostname;
  640. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  641. if(do_prometheus_remote_write) {
  642. backends_rrd_stats_remote_write_allmetrics_prometheus(
  643. host
  644. , __hostname
  645. , global_backend_prefix
  646. , global_backend_options
  647. , after
  648. , before
  649. , &count_charts
  650. , &count_dims
  651. , &count_dims_skipped
  652. );
  653. chart_buffered_metrics += count_dims;
  654. }
  655. else
  656. #endif
  657. {
  658. RRDSET *st;
  659. rrdset_foreach_read(st, host) {
  660. if(likely(backends_can_send_rrdset(global_backend_options, st))) {
  661. rrdset_rdlock(st);
  662. count_charts++;
  663. RRDDIM *rd;
  664. rrddim_foreach_read(rd, st) {
  665. if (likely(rd->last_collected_time.tv_sec >= after)) {
  666. chart_buffered_metrics += backend_request_formatter(b, global_backend_prefix, host, __hostname, st, rd, after, before, global_backend_options);
  667. count_dims++;
  668. }
  669. else {
  670. debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection (%lu) is not within our timeframe (%lu to %lu)", rd->id, st->id, __hostname, (unsigned long)rd->last_collected_time.tv_sec, (unsigned long)after, (unsigned long)before);
  671. count_dims_skipped++;
  672. }
  673. }
  674. rrdset_unlock(st);
  675. }
  676. }
  677. }
  678. debug(D_BACKEND, "BACKEND: sending host '%s', metrics of %zu dimensions, of %zu charts. Skipped %zu dimensions.", __hostname, count_dims, count_charts, count_dims_skipped);
  679. count_charts_total += count_charts;
  680. count_dims_total += count_dims;
  681. rrdhost_unlock(host);
  682. }
  683. rrd_unlock();
  684. netdata_thread_enable_cancelability();
  685. debug(D_BACKEND, "BACKEND: buffer has %zu bytes, added metrics for %zu dimensions, of %zu charts, from %zu hosts", buffer_strlen(b), count_dims_total, count_charts_total, count_hosts);
  686. // ------------------------------------------------------------------------
  687. chart_buffered_bytes = (collected_number)buffer_strlen(b);
  688. // reset the monitoring chart counters
  689. chart_received_bytes =
  690. chart_sent_bytes =
  691. chart_sent_metrics =
  692. chart_lost_metrics =
  693. chart_receptions =
  694. chart_transmission_successes =
  695. chart_transmission_failures =
  696. chart_data_lost_events =
  697. chart_lost_bytes =
  698. chart_backend_reconnects = 0;
  699. // chart_backend_latency = 0;
  700. if(unlikely(netdata_exit)) break;
  701. //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b));
  702. //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
  703. // prepare for the next iteration
  704. // to add incrementally data to buffer
  705. after = before;
  706. #if HAVE_KINESIS
  707. if(do_kinesis) {
  708. unsigned long long partition_key_seq = 0;
  709. size_t buffer_len = buffer_strlen(b);
  710. size_t sent = 0;
  711. while(sent < buffer_len) {
  712. char partition_key[KINESIS_PARTITION_KEY_MAX + 1];
  713. snprintf(partition_key, KINESIS_PARTITION_KEY_MAX, "netdata_%llu", partition_key_seq++);
  714. size_t partition_key_len = strnlen(partition_key, KINESIS_PARTITION_KEY_MAX);
  715. const char *first_char = buffer_tostring(b) + sent;
  716. size_t record_len = 0;
  717. // split buffer into chunks of maximum allowed size
  718. if(buffer_len - sent < KINESIS_RECORD_MAX - partition_key_len) {
  719. record_len = buffer_len - sent;
  720. }
  721. else {
  722. record_len = KINESIS_RECORD_MAX - partition_key_len;
  723. while(*(first_char + record_len) != '\n' && record_len) record_len--;
  724. }
  725. char error_message[ERROR_LINE_MAX + 1] = "";
  726. debug(D_BACKEND, "BACKEND: backends_kinesis_put_record(): dest = %s, id = %s, key = %s, stream = %s, partition_key = %s, \
  727. buffer = %zu, record = %zu", destination, kinesis_auth_key_id, kinesis_secure_key, kinesis_stream_name,
  728. partition_key, buffer_len, record_len);
  729. backends_kinesis_put_record(kinesis_stream_name, partition_key, first_char, record_len);
  730. sent += record_len;
  731. chart_transmission_successes++;
  732. size_t sent_bytes = 0, lost_bytes = 0;
  733. if(unlikely(backends_kinesis_get_result(error_message, &sent_bytes, &lost_bytes))) {
  734. // oops! we couldn't send (all or some of the) data
  735. error("BACKEND: %s", error_message);
  736. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  737. destination, sent_bytes, sent_bytes - lost_bytes);
  738. chart_transmission_failures++;
  739. chart_data_lost_events++;
  740. chart_lost_bytes += lost_bytes;
  741. // estimate the number of lost metrics
  742. chart_lost_metrics += (collected_number)(chart_buffered_metrics
  743. * (buffer_len && (lost_bytes > buffer_len) ? (double)lost_bytes / buffer_len : 1));
  744. break;
  745. }
  746. else {
  747. chart_receptions++;
  748. }
  749. if(unlikely(netdata_exit)) break;
  750. }
  751. chart_sent_bytes += sent;
  752. if(likely(sent == buffer_len))
  753. chart_sent_metrics = chart_buffered_metrics;
  754. buffer_flush(b);
  755. } else
  756. #endif /* HAVE_KINESIS */
  757. #if HAVE_MONGOC
  758. if(do_mongodb) {
  759. size_t buffer_len = buffer_strlen(b);
  760. size_t sent = 0;
  761. while(sent < buffer_len) {
  762. const char *first_char = buffer_tostring(b);
  763. debug(D_BACKEND, "BACKEND: backends_mongodb_insert(): uri = %s, database = %s, collection = %s, \
  764. buffer = %zu", mongodb_uri, mongodb_database, mongodb_collection, buffer_len);
  765. if(likely(!backends_mongodb_insert((char *)first_char, (size_t)chart_buffered_metrics))) {
  766. sent += buffer_len;
  767. chart_transmission_successes++;
  768. chart_receptions++;
  769. }
  770. else {
  771. // oops! we couldn't send (all or some of the) data
  772. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zu bytes.",
  773. mongodb_uri, buffer_len, 0UL);
  774. chart_transmission_failures++;
  775. chart_data_lost_events++;
  776. chart_lost_bytes += buffer_len;
  777. // estimate the number of lost metrics
  778. chart_lost_metrics += (collected_number)chart_buffered_metrics;
  779. break;
  780. }
  781. if(unlikely(netdata_exit)) break;
  782. }
  783. chart_sent_bytes += sent;
  784. if(likely(sent == buffer_len))
  785. chart_sent_metrics = chart_buffered_metrics;
  786. buffer_flush(b);
  787. } else
  788. #endif /* HAVE_MONGOC */
  789. {
  790. // ------------------------------------------------------------------------
  791. // if we are connected, receive a response, without blocking
  792. if(likely(sock != -1)) {
  793. errno = 0;
  794. // loop through to collect all data
  795. while(sock != -1 && errno != EWOULDBLOCK) {
  796. buffer_need_bytes(response, 4096);
  797. ssize_t r;
  798. #ifdef ENABLE_HTTPS
  799. if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
  800. r = SSL_read(opentsdb_ssl.conn, &response->buffer[response->len], response->size - response->len);
  801. } else {
  802. r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
  803. }
  804. #else
  805. r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
  806. #endif
  807. if(likely(r > 0)) {
  808. // we received some data
  809. response->len += r;
  810. chart_received_bytes += r;
  811. chart_receptions++;
  812. }
  813. else if(r == 0) {
  814. error("BACKEND: '%s' closed the socket", destination);
  815. close(sock);
  816. sock = -1;
  817. }
  818. else {
  819. // failed to receive data
  820. if(errno != EAGAIN && errno != EWOULDBLOCK) {
  821. error("BACKEND: cannot receive data from backend '%s'.", destination);
  822. }
  823. }
  824. }
  825. // if we received data, process them
  826. if(buffer_strlen(response))
  827. backend_response_checker(response);
  828. }
  829. // ------------------------------------------------------------------------
  830. // if we are not connected, connect to a backend server
  831. if(unlikely(sock == -1)) {
  832. // usec_t start_ut = now_monotonic_usec();
  833. size_t reconnects = 0;
  834. sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
  835. #ifdef ENABLE_HTTPS
  836. if(sock != -1) {
  837. if(netdata_opentsdb_ctx) {
  838. if(!opentsdb_ssl.conn) {
  839. opentsdb_ssl.conn = SSL_new(netdata_opentsdb_ctx);
  840. if(!opentsdb_ssl.conn) {
  841. error("Failed to allocate SSL structure %d.", sock);
  842. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  843. }
  844. } else {
  845. SSL_clear(opentsdb_ssl.conn);
  846. }
  847. }
  848. if(opentsdb_ssl.conn) {
  849. if(SSL_set_fd(opentsdb_ssl.conn, sock) != 1) {
  850. error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket);
  851. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  852. } else {
  853. opentsdb_ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE;
  854. SSL_set_connect_state(opentsdb_ssl.conn);
  855. int err = SSL_connect(opentsdb_ssl.conn);
  856. if (err != 1) {
  857. err = SSL_get_error(opentsdb_ssl.conn, err);
  858. error("SSL cannot connect with the server: %s ", ERR_error_string((long)SSL_get_error(opentsdb_ssl.conn, err), NULL));
  859. opentsdb_ssl.flags = NETDATA_SSL_NO_HANDSHAKE;
  860. } //TODO: check certificate here
  861. }
  862. }
  863. }
  864. #endif
  865. chart_backend_reconnects += reconnects;
  866. // chart_backend_latency += now_monotonic_usec() - start_ut;
  867. }
  868. if(unlikely(netdata_exit)) break;
  869. // ------------------------------------------------------------------------
  870. // if we are connected, send our buffer to the backend server
  871. if(likely(sock != -1)) {
  872. size_t len = buffer_strlen(b);
  873. // usec_t start_ut = now_monotonic_usec();
  874. int flags = 0;
  875. #ifdef MSG_NOSIGNAL
  876. flags += MSG_NOSIGNAL;
  877. #endif
  878. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  879. if(do_prometheus_remote_write) {
  880. size_t data_size = backends_get_write_request_size();
  881. if(unlikely(!data_size)) {
  882. error("BACKEND: write request size is out of range");
  883. continue;
  884. }
  885. buffer_flush(b);
  886. buffer_need_bytes(b, data_size);
  887. if(unlikely(backends_pack_write_request(b->buffer, &data_size))) {
  888. error("BACKEND: cannot pack write request");
  889. continue;
  890. }
  891. b->len = data_size;
  892. chart_buffered_bytes = (collected_number)buffer_strlen(b);
  893. buffer_flush(http_request_header);
  894. buffer_sprintf(http_request_header,
  895. "POST %s HTTP/1.1\r\n"
  896. "Host: %s\r\n"
  897. "Accept: */*\r\n"
  898. "Content-Length: %zu\r\n"
  899. "Content-Type: application/x-www-form-urlencoded\r\n\r\n",
  900. remote_write_path,
  901. hostname,
  902. data_size
  903. );
  904. len = buffer_strlen(http_request_header);
  905. send(sock, buffer_tostring(http_request_header), len, flags);
  906. len = data_size;
  907. }
  908. #endif
  909. ssize_t written;
  910. #ifdef ENABLE_HTTPS
  911. if(opentsdb_ssl.conn && !opentsdb_ssl.flags) {
  912. written = SSL_write(opentsdb_ssl.conn, buffer_tostring(b), len);
  913. } else {
  914. written = send(sock, buffer_tostring(b), len, flags);
  915. }
  916. #else
  917. written = send(sock, buffer_tostring(b), len, flags);
  918. #endif
  919. // chart_backend_latency += now_monotonic_usec() - start_ut;
  920. if(written != -1 && (size_t)written == len) {
  921. // we sent the data successfully
  922. chart_transmission_successes++;
  923. chart_sent_bytes += written;
  924. chart_sent_metrics = chart_buffered_metrics;
  925. // reset the failures count
  926. failures = 0;
  927. // empty the buffer
  928. buffer_flush(b);
  929. }
  930. else {
  931. // oops! we couldn't send (all or some of the) data
  932. error("BACKEND: failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
  933. chart_transmission_failures++;
  934. if(written != -1)
  935. chart_sent_bytes += written;
  936. // increment the counter we check for data loss
  937. failures++;
  938. // close the socket - we will re-open it next time
  939. close(sock);
  940. sock = -1;
  941. }
  942. }
  943. else {
  944. error("BACKEND: failed to update database backend '%s'", destination);
  945. chart_transmission_failures++;
  946. // increment the counter we check for data loss
  947. failures++;
  948. }
  949. }
  950. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  951. if(do_prometheus_remote_write && failures) {
  952. (void) buffer_on_failures;
  953. failures = 0;
  954. chart_lost_bytes = chart_buffered_bytes = backends_get_write_request_size(); // estimated write request size
  955. chart_data_lost_events++;
  956. chart_lost_metrics = chart_buffered_metrics;
  957. } else
  958. #endif
  959. if(failures > buffer_on_failures) {
  960. // too bad! we are going to lose data
  961. chart_lost_bytes += buffer_strlen(b);
  962. error("BACKEND: reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
  963. buffer_flush(b);
  964. failures = 0;
  965. chart_data_lost_events++;
  966. chart_lost_metrics = chart_buffered_metrics;
  967. }
  968. if(unlikely(netdata_exit)) break;
  969. // ------------------------------------------------------------------------
  970. // update the monitoring charts
  971. if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
  972. rrddim_set(chart_ops, "read", chart_receptions);
  973. rrddim_set(chart_ops, "write", chart_transmission_successes);
  974. rrddim_set(chart_ops, "discard", chart_data_lost_events);
  975. rrddim_set(chart_ops, "failure", chart_transmission_failures);
  976. rrddim_set(chart_ops, "reconnect", chart_backend_reconnects);
  977. rrdset_done(chart_ops);
  978. if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
  979. rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
  980. rrddim_set(chart_metrics, "lost", chart_lost_metrics);
  981. rrddim_set(chart_metrics, "sent", chart_sent_metrics);
  982. rrdset_done(chart_metrics);
  983. if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
  984. rrddim_set(chart_bytes, "buffered", chart_buffered_bytes);
  985. rrddim_set(chart_bytes, "lost", chart_lost_bytes);
  986. rrddim_set(chart_bytes, "sent", chart_sent_bytes);
  987. rrddim_set(chart_bytes, "received", chart_received_bytes);
  988. rrdset_done(chart_bytes);
  989. /*
  990. if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
  991. rrddim_set(chart_latency, "latency", chart_backend_latency);
  992. rrdset_done(chart_latency);
  993. */
  994. getrusage(RUSAGE_THREAD, &thread);
  995. if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
  996. rrddim_set(chart_rusage, "user", thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
  997. rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
  998. rrdset_done(chart_rusage);
  999. if(likely(buffer_strlen(b) == 0))
  1000. chart_buffered_metrics = 0;
  1001. if(unlikely(netdata_exit)) break;
  1002. }
  1003. cleanup:
  1004. #if HAVE_KINESIS
  1005. if(do_kinesis) {
  1006. backends_kinesis_shutdown();
  1007. freez(kinesis_auth_key_id);
  1008. freez(kinesis_secure_key);
  1009. freez(kinesis_stream_name);
  1010. }
  1011. #endif
  1012. #if ENABLE_PROMETHEUS_REMOTE_WRITE
  1013. buffer_free(http_request_header);
  1014. if(do_prometheus_remote_write)
  1015. backends_protocol_buffers_shutdown();
  1016. #endif
  1017. #if HAVE_MONGOC
  1018. if(do_mongodb) {
  1019. backends_mongodb_cleanup();
  1020. freez(mongodb_uri);
  1021. freez(mongodb_database);
  1022. freez(mongodb_collection);
  1023. }
  1024. #endif
  1025. if(sock != -1)
  1026. close(sock);
  1027. buffer_free(b);
  1028. buffer_free(response);
  1029. #ifdef ENABLE_HTTPS
  1030. if(netdata_opentsdb_ctx) {
  1031. if(opentsdb_ssl.conn) {
  1032. SSL_free(opentsdb_ssl.conn);
  1033. }
  1034. }
  1035. #endif
  1036. netdata_thread_cleanup_pop(1);
  1037. return NULL;
  1038. }