sender.c 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #define WORKER_SENDER_JOB_CONNECT 0
  4. #define WORKER_SENDER_JOB_PIPE_READ 1
  5. #define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
  6. #define WORKER_SENDER_JOB_EXECUTE 3
  7. #define WORKER_SENDER_JOB_SOCKET_SEND 4
  8. #define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
  9. #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
  10. #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
  11. #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
  12. #define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
  13. #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
  14. #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
  15. #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
  16. #define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
  17. #define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
  18. #define WORKER_SENDER_JOB_BUFFER_RATIO 15
  19. #define WORKER_SENDER_JOB_BYTES_RECEIVED 16
  20. #define WORKER_SENDER_JOB_BYTES_SENT 17
  21. #define WORKER_SENDER_JOB_REPLAY_REQUEST 18
  22. #define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
  23. #define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
  24. #if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
  25. #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
  26. #endif
  27. extern struct config stream_config;
  28. extern char *netdata_ssl_ca_path;
  29. extern char *netdata_ssl_ca_file;
  30. static __thread BUFFER *sender_thread_buffer = NULL;
  31. static __thread bool sender_thread_buffer_used = false;
  32. static __thread time_t sender_thread_buffer_last_reset_s = 0;
  33. void sender_thread_buffer_free(void) {
  34. buffer_free(sender_thread_buffer);
  35. sender_thread_buffer = NULL;
  36. sender_thread_buffer_used = false;
  37. }
  38. // Collector thread starting a transmission
  39. BUFFER *sender_start(struct sender_state *s) {
  40. if(unlikely(sender_thread_buffer_used))
  41. fatal("STREAMING: thread buffer is used multiple times concurrently.");
  42. if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
  43. if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
  44. buffer_free(sender_thread_buffer);
  45. sender_thread_buffer = NULL;
  46. }
  47. }
  48. if(unlikely(!sender_thread_buffer)) {
  49. sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
  50. sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
  51. }
  52. sender_thread_buffer_used = true;
  53. buffer_flush(sender_thread_buffer);
  54. return sender_thread_buffer;
  55. }
  56. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
  57. #ifdef ENABLE_RRDPUSH_COMPRESSION
  58. /*
  59. * In case of stream compression buffer overflow
  60. * Inform the user through the error log file and
  61. * deactivate compression by downgrading the stream protocol.
  62. */
  63. static inline void deactivate_compression(struct sender_state *s) {
  64. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
  65. netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
  66. s->flags &= ~SENDER_FLAG_COMPRESSION;
  67. netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
  68. rrdpush_sender_thread_close_socket(s->host);
  69. }
  70. #endif
  71. #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
  72. // Collector thread finishing a transmission
  73. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
  74. if(unlikely(wb != sender_thread_buffer))
  75. fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
  76. if(unlikely(!sender_thread_buffer_used))
  77. fatal("STREAMING: sender is committing a buffer twice.");
  78. sender_thread_buffer_used = false;
  79. char *src = (char *)buffer_tostring(wb);
  80. size_t src_len = buffer_strlen(wb);
  81. if(unlikely(!src || !src_len))
  82. return;
  83. sender_lock(s);
  84. // FILE *fp = fopen("/tmp/stream.txt", "a");
  85. // fprintf(fp,
  86. // "\n--- SEND BEGIN: %s ----\n"
  87. // "%s"
  88. // "--- SEND END ----------------------------------------\n"
  89. // , rrdhost_hostname(s->host), src);
  90. // fclose(fp);
  91. if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
  92. netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
  93. rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
  94. s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
  95. }
  96. #ifdef ENABLE_RRDPUSH_COMPRESSION
  97. if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
  98. while(src_len) {
  99. size_t size_to_compress = src_len;
  100. if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
  101. if (stream_has_capability(s, STREAM_CAP_BINARY))
  102. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  103. else {
  104. if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
  105. // we need to find the last newline
  106. // so that the decompressor will have a whole line to work with
  107. const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
  108. while (--t >= src)
  109. if (unlikely(*t == '\n'))
  110. break;
  111. if (t <= src) {
  112. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  113. } else
  114. size_to_compress = t - src + 1;
  115. }
  116. }
  117. }
  118. char *dst;
  119. size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  120. if (!dst_len) {
  121. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
  122. rrdhost_hostname(s->host), s->connected_to);
  123. rrdpush_compressor_reset(&s->compressor);
  124. dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  125. if(!dst_len) {
  126. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
  127. rrdhost_hostname(s->host), s->connected_to);
  128. deactivate_compression(s);
  129. sender_unlock(s);
  130. return;
  131. }
  132. }
  133. if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
  134. s->flags |= SENDER_FLAG_OVERFLOW;
  135. else
  136. s->sent_bytes_on_this_connection_per_type[type] += dst_len;
  137. src = src + size_to_compress;
  138. src_len -= size_to_compress;
  139. }
  140. }
  141. else if(cbuffer_add_unsafe(s->buffer, src, src_len))
  142. s->flags |= SENDER_FLAG_OVERFLOW;
  143. else
  144. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  145. #else
  146. if(cbuffer_add_unsafe(s->buffer, src, src_len))
  147. s->flags |= SENDER_FLAG_OVERFLOW;
  148. else
  149. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  150. #endif
  151. replication_recalculate_buffer_used_ratio_unsafe(s);
  152. bool signal_sender = false;
  153. if(!rrdpush_sender_pipe_has_pending_data(s)) {
  154. rrdpush_sender_pipe_set_pending_data(s);
  155. signal_sender = true;
  156. }
  157. sender_unlock(s);
  158. if(signal_sender)
  159. rrdpush_signal_sender_to_wake_up(s);
  160. }
  161. static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
  162. buffer_sprintf(
  163. wb
  164. , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
  165. , rrdvar_name(rva)
  166. , rrdvar2number(rva)
  167. );
  168. netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
  169. }
  170. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
  171. if(rrdhost_can_send_definitions_to_parent(host)) {
  172. BUFFER *wb = sender_start(host->sender);
  173. rrdpush_sender_add_host_variable_to_buffer(wb, rva);
  174. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  175. sender_thread_buffer_free();
  176. }
  177. }
  178. struct custom_host_variables_callback {
  179. BUFFER *wb;
  180. };
  181. static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
  182. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  183. struct custom_host_variables_callback *tmp = struct_ptr;
  184. BUFFER *wb = tmp->wb;
  185. if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
  186. rrdpush_sender_add_host_variable_to_buffer(wb, rv);
  187. return 1;
  188. }
  189. return 0;
  190. }
  191. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  192. if(rrdhost_can_send_definitions_to_parent(host)) {
  193. BUFFER *wb = sender_start(host->sender);
  194. struct custom_host_variables_callback tmp = {
  195. .wb = wb
  196. };
  197. int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
  198. (void)ret;
  199. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  200. sender_thread_buffer_free();
  201. netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  202. }
  203. }
  204. // resets all the chart, so that their definitions
  205. // will be resent to the central netdata
  206. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  207. RRDSET *st;
  208. rrdset_foreach_read(st, host) {
  209. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  210. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  211. st->upstream_resync_time_s = 0;
  212. RRDDIM *rd;
  213. rrddim_foreach_read(rd, st)
  214. rrddim_clear_exposed(rd);
  215. rrddim_foreach_done(rd);
  216. }
  217. rrdset_foreach_done(st);
  218. rrdhost_sender_replicating_charts_zero(host);
  219. }
  220. static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
  221. static __thread time_t last_reset_time_s = 0;
  222. if(!force && now_s - last_reset_time_s < 300)
  223. return;
  224. if(!have_mutex)
  225. sender_lock(s);
  226. rrdpush_sender_last_buffer_recreate_set(s, now_s);
  227. last_reset_time_s = now_s;
  228. if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
  229. size_t max = s->buffer->max_size;
  230. cbuffer_free(s->buffer);
  231. s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
  232. }
  233. sender_thread_buffer_free();
  234. if(!have_mutex)
  235. sender_unlock(s);
  236. }
  237. static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
  238. rrdpush_sender_set_flush_time(host->sender);
  239. sender_lock(host->sender);
  240. // flush the output buffer from any data it may have
  241. cbuffer_flush(host->sender->buffer);
  242. rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
  243. replication_recalculate_buffer_used_ratio_unsafe(host->sender);
  244. sender_unlock(host->sender);
  245. }
  246. static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
  247. rrdpush_sender_set_flush_time(host->sender);
  248. // stop all replication commands inflight
  249. replication_sender_delete_pending_requests(host->sender);
  250. // reset the state of all charts
  251. rrdpush_sender_thread_reset_all_charts(host);
  252. rrdpush_sender_replicating_charts_zero(host->sender);
  253. }
  254. static void rrdpush_sender_on_connect(RRDHOST *host) {
  255. rrdpush_sender_cbuffer_flush(host);
  256. rrdpush_sender_charts_and_replication_reset(host);
  257. }
  258. static void rrdpush_sender_after_connect(RRDHOST *host) {
  259. rrdpush_sender_thread_send_custom_host_variables(host);
  260. }
  261. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  262. #ifdef ENABLE_HTTPS
  263. netdata_ssl_close(&host->sender->ssl);
  264. #endif
  265. if(host->sender->rrdpush_sender_socket != -1) {
  266. close(host->sender->rrdpush_sender_socket);
  267. host->sender->rrdpush_sender_socket = -1;
  268. }
  269. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  270. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  271. // do not flush the circular buffer here
  272. // this function is called sometimes with the mutex lock, sometimes without the lock
  273. rrdpush_sender_charts_and_replication_reset(host);
  274. }
  275. void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
  276. {
  277. se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
  278. se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
  279. se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
  280. se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz("");
  281. se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
  282. }
  283. void rrdpush_clean_encoded(stream_encoded_t *se)
  284. {
  285. if (se->os_name)
  286. freez(se->os_name);
  287. if (se->os_id)
  288. freez(se->os_id);
  289. if (se->os_version)
  290. freez(se->os_version);
  291. if (se->kernel_name)
  292. freez(se->kernel_name);
  293. if (se->kernel_version)
  294. freez(se->kernel_version);
  295. }
  296. struct {
  297. const char *response;
  298. size_t length;
  299. int32_t version;
  300. bool dynamic;
  301. const char *error;
  302. int worker_job_id;
  303. time_t postpone_reconnect_seconds;
  304. } stream_responses[] = {
  305. {
  306. .response = START_STREAMING_PROMPT_VN,
  307. .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
  308. .version = STREAM_HANDSHAKE_OK_V3, // and above
  309. .dynamic = true, // dynamic = we will parse the version / capabilities
  310. .error = NULL,
  311. .worker_job_id = 0,
  312. .postpone_reconnect_seconds = 0,
  313. },
  314. {
  315. .response = START_STREAMING_PROMPT_V2,
  316. .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
  317. .version = STREAM_HANDSHAKE_OK_V2,
  318. .dynamic = false,
  319. .error = NULL,
  320. .worker_job_id = 0,
  321. .postpone_reconnect_seconds = 0,
  322. },
  323. {
  324. .response = START_STREAMING_PROMPT_V1,
  325. .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
  326. .version = STREAM_HANDSHAKE_OK_V1,
  327. .dynamic = false,
  328. .error = NULL,
  329. .worker_job_id = 0,
  330. .postpone_reconnect_seconds = 0,
  331. },
  332. {
  333. .response = START_STREAMING_ERROR_SAME_LOCALHOST,
  334. .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
  335. .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
  336. .dynamic = false,
  337. .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
  338. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  339. .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
  340. },
  341. {
  342. .response = START_STREAMING_ERROR_ALREADY_STREAMING,
  343. .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
  344. .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
  345. .dynamic = false,
  346. .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
  347. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  348. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  349. },
  350. {
  351. .response = START_STREAMING_ERROR_NOT_PERMITTED,
  352. .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
  353. .version = STREAM_HANDSHAKE_ERROR_DENIED,
  354. .dynamic = false,
  355. .error = "remote server denied access, probably we don't have the right API key?",
  356. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  357. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  358. },
  359. {
  360. .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
  361. .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
  362. .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
  363. .dynamic = false,
  364. .error = "remote server is currently busy, we should try later",
  365. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  366. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  367. },
  368. {
  369. .response = START_STREAMING_ERROR_INTERNAL_ERROR,
  370. .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
  371. .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
  372. .dynamic = false,
  373. .error = "remote server is encountered an internal error, we should try later",
  374. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  375. .postpone_reconnect_seconds = 5 * 60, // 5 minutes
  376. },
  377. {
  378. .response = START_STREAMING_ERROR_INITIALIZATION,
  379. .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
  380. .version = STREAM_HANDSHAKE_INITIALIZATION,
  381. .dynamic = false,
  382. .error = "remote server is initializing, we should try later",
  383. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  384. .postpone_reconnect_seconds = 2 * 60, // 2 minute
  385. },
  386. // terminator
  387. {
  388. .response = NULL,
  389. .length = 0,
  390. .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
  391. .dynamic = false,
  392. .error = "remote node response is not understood, is it Netdata?",
  393. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  394. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  395. }
  396. };
  397. static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
  398. int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
  399. int i;
  400. for(i = 0; stream_responses[i].response ; i++) {
  401. if(stream_responses[i].dynamic &&
  402. http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
  403. strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
  404. version = str2i(&http[stream_responses[i].length]);
  405. break;
  406. }
  407. else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
  408. version = stream_responses[i].version;
  409. break;
  410. }
  411. }
  412. if(version >= STREAM_HANDSHAKE_OK_V1) {
  413. host->destination->reason = version;
  414. host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
  415. s->capabilities = convert_stream_version_to_capabilities(version, host, true);
  416. return true;
  417. }
  418. const char *error = stream_responses[i].error;
  419. int worker_job_id = stream_responses[i].worker_job_id;
  420. time_t delay = stream_responses[i].postpone_reconnect_seconds;
  421. worker_is_busy(worker_job_id);
  422. rrdpush_sender_thread_close_socket(host);
  423. host->destination->reason = version;
  424. host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
  425. char buf[LOG_DATE_LENGTH];
  426. log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
  427. netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
  428. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  429. return false;
  430. }
  431. static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
  432. #ifdef ENABLE_HTTPS
  433. RRDHOST *host = s->host;
  434. bool ssl_required = host->destination && host->destination->ssl;
  435. netdata_ssl_close(&host->sender->ssl);
  436. if(!ssl_required)
  437. return true;
  438. if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
  439. if(!netdata_ssl_connect(&host->sender->ssl)) {
  440. // couldn't connect
  441. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  442. rrdpush_sender_thread_close_socket(host);
  443. host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
  444. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  445. return false;
  446. }
  447. if (netdata_ssl_validate_certificate_sender &&
  448. security_test_certificate(host->sender->ssl.conn)) {
  449. // certificate is not valid
  450. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  451. netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
  452. rrdpush_sender_thread_close_socket(host);
  453. host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
  454. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  455. return false;
  456. }
  457. return true;
  458. }
  459. netdata_log_error("SSL: failed to establish connection.");
  460. return false;
  461. #else
  462. // SSL is not enabled
  463. return true;
  464. #endif
  465. }
  466. static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
  467. struct timeval tv = {
  468. .tv_sec = timeout,
  469. .tv_usec = 0
  470. };
  471. // make sure the socket is closed
  472. rrdpush_sender_thread_close_socket(host);
  473. s->rrdpush_sender_socket = connect_to_one_of_destinations(
  474. host
  475. , default_port
  476. , &tv
  477. , &s->reconnects_counter
  478. , s->connected_to
  479. , sizeof(s->connected_to)-1
  480. , &host->destination
  481. );
  482. if(unlikely(s->rrdpush_sender_socket == -1)) {
  483. // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
  484. return false;
  485. }
  486. // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
  487. // reset our capabilities to default
  488. s->capabilities = stream_our_capabilities(host, true);
  489. #ifdef ENABLE_RRDPUSH_COMPRESSION
  490. // If we don't want compression, remove it from our capabilities
  491. if(!(s->flags & SENDER_FLAG_COMPRESSION))
  492. s->capabilities &= ~STREAM_CAP_COMPRESSION;
  493. #endif // ENABLE_RRDPUSH_COMPRESSION
  494. /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
  495. version negotiation resulted in a high enough version.
  496. */
  497. stream_encoded_t se;
  498. rrdpush_encode_variable(&se, host);
  499. host->sender->hops = host->system_info->hops + 1;
  500. char http[HTTP_HEADER_SIZE + 1];
  501. int eol = snprintfz(http, HTTP_HEADER_SIZE,
  502. "STREAM "
  503. "key=%s"
  504. "&hostname=%s"
  505. "&registry_hostname=%s"
  506. "&machine_guid=%s"
  507. "&update_every=%d"
  508. "&os=%s"
  509. "&timezone=%s"
  510. "&abbrev_timezone=%s"
  511. "&utc_offset=%d"
  512. "&hops=%d"
  513. "&ml_capable=%d"
  514. "&ml_enabled=%d"
  515. "&mc_version=%d"
  516. "&tags=%s"
  517. "&ver=%u"
  518. "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
  519. "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
  520. "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
  521. "&NETDATA_SYSTEM_OS_NAME=%s"
  522. "&NETDATA_SYSTEM_OS_ID=%s"
  523. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  524. "&NETDATA_SYSTEM_OS_VERSION=%s"
  525. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  526. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  527. "&NETDATA_HOST_IS_K8S_NODE=%s"
  528. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  529. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  530. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  531. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  532. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  533. "&NETDATA_SYSTEM_CONTAINER=%s"
  534. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  535. "&NETDATA_CONTAINER_OS_NAME=%s"
  536. "&NETDATA_CONTAINER_OS_ID=%s"
  537. "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
  538. "&NETDATA_CONTAINER_OS_VERSION=%s"
  539. "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
  540. "&NETDATA_CONTAINER_OS_DETECTION=%s"
  541. "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
  542. "&NETDATA_SYSTEM_CPU_FREQ=%s"
  543. "&NETDATA_SYSTEM_TOTAL_RAM=%s"
  544. "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
  545. "&NETDATA_PROTOCOL_VERSION=%s"
  546. " HTTP/1.1\r\n"
  547. "User-Agent: %s/%s\r\n"
  548. "Accept: */*\r\n\r\n"
  549. , host->rrdpush_send_api_key
  550. , rrdhost_hostname(host)
  551. , rrdhost_registry_hostname(host)
  552. , host->machine_guid
  553. , default_rrd_update_every
  554. , rrdhost_os(host)
  555. , rrdhost_timezone(host)
  556. , rrdhost_abbrev_timezone(host)
  557. , host->utc_offset
  558. , host->sender->hops
  559. , host->system_info->ml_capable
  560. , host->system_info->ml_enabled
  561. , host->system_info->mc_version
  562. , rrdhost_tags(host)
  563. , s->capabilities
  564. , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
  565. , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
  566. , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
  567. , se.os_name
  568. , se.os_id
  569. , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
  570. , se.os_version
  571. , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
  572. , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
  573. , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
  574. , se.kernel_name
  575. , se.kernel_version
  576. , (host->system_info->architecture) ? host->system_info->architecture : ""
  577. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  578. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  579. , (host->system_info->container) ? host->system_info->container : ""
  580. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  581. , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
  582. , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
  583. , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
  584. , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
  585. , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
  586. , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
  587. , (host->system_info->host_cores) ? host->system_info->host_cores : ""
  588. , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
  589. , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
  590. , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
  591. , STREAMING_PROTOCOL_VERSION
  592. , rrdhost_program_name(host)
  593. , rrdhost_program_version(host)
  594. );
  595. http[eol] = 0x00;
  596. rrdpush_clean_encoded(&se);
  597. if(!rrdpush_sender_connect_ssl(s))
  598. return false;
  599. ssize_t bytes, len = (ssize_t)strlen(http);
  600. bytes = send_timeout(
  601. #ifdef ENABLE_HTTPS
  602. &host->sender->ssl,
  603. #endif
  604. s->rrdpush_sender_socket,
  605. http,
  606. len,
  607. 0,
  608. timeout);
  609. if(bytes <= 0) { // timeout is 0
  610. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  611. rrdpush_sender_thread_close_socket(host);
  612. netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
  613. host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
  614. host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
  615. return false;
  616. }
  617. bytes = recv_timeout(
  618. #ifdef ENABLE_HTTPS
  619. &host->sender->ssl,
  620. #endif
  621. s->rrdpush_sender_socket,
  622. http,
  623. HTTP_HEADER_SIZE,
  624. 0,
  625. timeout);
  626. if(bytes <= 0) { // timeout is 0
  627. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  628. rrdpush_sender_thread_close_socket(host);
  629. netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
  630. host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
  631. host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
  632. return false;
  633. }
  634. if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
  635. netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
  636. if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
  637. netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
  638. http[bytes] = '\0';
  639. netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
  640. if(!rrdpush_sender_validate_response(host, s, http, bytes))
  641. return false;
  642. #ifdef ENABLE_RRDPUSH_COMPRESSION
  643. if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
  644. rrdpush_compressor_reset(&s->compressor);
  645. else
  646. rrdpush_compressor_destroy(&s->compressor);
  647. #endif // ENABLE_RRDPUSH_COMPRESSION
  648. log_sender_capabilities(s);
  649. netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
  650. return true;
  651. }
  652. static bool attempt_to_connect(struct sender_state *state)
  653. {
  654. state->send_attempts = 0;
  655. // reset the bytes we have sent for this session
  656. state->sent_bytes_on_this_connection = 0;
  657. memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
  658. if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
  659. // reset the buffer, to properly send charts and metrics
  660. rrdpush_sender_on_connect(state->host);
  661. // send from the beginning
  662. state->begin = 0;
  663. // make sure the next reconnection will be immediate
  664. state->not_connected_loops = 0;
  665. // let the data collection threads know we are ready
  666. rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  667. rrdpush_sender_after_connect(state->host);
  668. return true;
  669. }
  670. // we couldn't connect
  671. // increase the failed connections counter
  672. state->not_connected_loops++;
  673. // slow re-connection on repeating errors
  674. usec_t now_ut = now_monotonic_usec();
  675. usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
  676. while(now_ut < end_ut) {
  677. netdata_thread_testcancel();
  678. sleep_usec(500 * USEC_PER_MS); // seconds
  679. now_ut = now_monotonic_usec();
  680. }
  681. return false;
  682. }
  683. // TCP window is open, and we have data to transmit.
  684. static ssize_t attempt_to_send(struct sender_state *s) {
  685. ssize_t ret;
  686. #ifdef NETDATA_INTERNAL_CHECKS
  687. struct circular_buffer *cb = s->buffer;
  688. #endif
  689. sender_lock(s);
  690. char *chunk;
  691. size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
  692. netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
  693. #ifdef ENABLE_HTTPS
  694. if(SSL_connection(&s->ssl))
  695. ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
  696. else
  697. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  698. #else
  699. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  700. #endif
  701. if (likely(ret > 0)) {
  702. cbuffer_remove_unsafe(s->buffer, ret);
  703. s->sent_bytes_on_this_connection += ret;
  704. s->sent_bytes += ret;
  705. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
  706. }
  707. else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
  708. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
  709. else if (ret == -1) {
  710. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
  711. netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
  712. netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
  713. rrdpush_sender_thread_close_socket(s->host);
  714. }
  715. else
  716. netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
  717. replication_recalculate_buffer_used_ratio_unsafe(s);
  718. sender_unlock(s);
  719. return ret;
  720. }
  721. static ssize_t attempt_read(struct sender_state *s) {
  722. ssize_t ret;
  723. #ifdef ENABLE_HTTPS
  724. if (SSL_connection(&s->ssl))
  725. ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
  726. else
  727. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  728. #else
  729. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  730. #endif
  731. if (ret > 0) {
  732. s->read_len += ret;
  733. return ret;
  734. }
  735. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
  736. return ret;
  737. #ifdef ENABLE_HTTPS
  738. if (SSL_connection(&s->ssl))
  739. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  740. else
  741. #endif
  742. if (ret == 0 || errno == ECONNRESET) {
  743. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
  744. netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
  745. }
  746. else {
  747. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
  748. netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
  749. }
  750. rrdpush_sender_thread_close_socket(s->host);
  751. return ret;
  752. }
  753. struct inflight_stream_function {
  754. struct sender_state *sender;
  755. STRING *transaction;
  756. usec_t received_ut;
  757. };
  758. void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
  759. struct inflight_stream_function *tmp = data;
  760. struct sender_state *s = tmp->sender;
  761. if(rrdhost_can_send_definitions_to_parent(s->host)) {
  762. BUFFER *wb = sender_start(s);
  763. pluginsd_function_result_begin_to_buffer(wb
  764. , string2str(tmp->transaction)
  765. , code
  766. , functions_content_type_to_format(func_wb->content_type)
  767. , func_wb->expires);
  768. buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
  769. pluginsd_function_result_end_to_buffer(wb);
  770. sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
  771. sender_thread_buffer_free();
  772. internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
  773. rrdhost_hostname(s->host), s->connected_to,
  774. string2str(tmp->transaction),
  775. buffer_strlen(func_wb),
  776. now_realtime_usec() - tmp->received_ut);
  777. }
  778. string_freez(tmp->transaction);
  779. buffer_free(func_wb);
  780. freez(tmp);
  781. }
  782. // This is just a placeholder until the gap filling state machine is inserted
  783. void execute_commands(struct sender_state *s) {
  784. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  785. char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
  786. *end = 0;
  787. while( start < end && (newline = strchr(start, '\n')) ) {
  788. *newline = '\0';
  789. netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
  790. gettid(), s->connected_to, rrdhost_hostname(s->host), start);
  791. // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
  792. char *words[PLUGINSD_MAX_WORDS] = { NULL };
  793. size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
  794. const char *keyword = get_word(words, num_words, 0);
  795. if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
  796. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  797. char *transaction = get_word(words, num_words, 1);
  798. char *timeout_s = get_word(words, num_words, 2);
  799. char *function = get_word(words, num_words, 3);
  800. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  801. netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  802. rrdhost_hostname(s->host), s->connected_to,
  803. keyword,
  804. transaction?transaction:"(unset)",
  805. timeout_s?timeout_s:"(unset)",
  806. function?function:"(unset)");
  807. }
  808. else {
  809. int timeout = str2i(timeout_s);
  810. if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  811. struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
  812. tmp->received_ut = now_realtime_usec();
  813. tmp->sender = s;
  814. tmp->transaction = string_strdupz(transaction);
  815. BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
  816. int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
  817. if(code != HTTP_RESP_OK) {
  818. rrd_call_function_error(wb, "Failed to route request to collector", code);
  819. stream_execute_function_callback(wb, code, tmp);
  820. }
  821. }
  822. }
  823. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
  824. worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
  825. const char *chart_id = get_word(words, num_words, 1);
  826. const char *start_streaming = get_word(words, num_words, 2);
  827. const char *after = get_word(words, num_words, 3);
  828. const char *before = get_word(words, num_words, 4);
  829. if (!chart_id || !start_streaming || !after || !before) {
  830. netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
  831. " (chart=%s, start_streaming=%s, after=%s, before=%s)",
  832. rrdhost_hostname(s->host), s->connected_to,
  833. keyword,
  834. chart_id ? chart_id : "(unset)",
  835. start_streaming ? start_streaming : "(unset)",
  836. after ? after : "(unset)",
  837. before ? before : "(unset)");
  838. }
  839. else {
  840. replication_add_request(s, chart_id,
  841. strtoll(after, NULL, 0),
  842. strtoll(before, NULL, 0),
  843. !strcmp(start_streaming, "true")
  844. );
  845. }
  846. }
  847. else {
  848. netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
  849. }
  850. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  851. start = newline + 1;
  852. }
  853. if (start < end) {
  854. memmove(s->read_buffer, start, end-start);
  855. s->read_len = end - start;
  856. }
  857. else {
  858. s->read_buffer[0] = '\0';
  859. s->read_len = 0;
  860. }
  861. }
  862. struct rrdpush_sender_thread_data {
  863. RRDHOST *host;
  864. char *pipe_buffer;
  865. };
  866. static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
  867. static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
  868. bool ret = true;
  869. netdata_mutex_lock(&mutex);
  870. int new_pipe_fds[2];
  871. if(reopen) {
  872. if(pipe(new_pipe_fds) != 0) {
  873. netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
  874. new_pipe_fds[PIPE_READ] = -1;
  875. new_pipe_fds[PIPE_WRITE] = -1;
  876. ret = false;
  877. }
  878. }
  879. int old_pipe_fds[2];
  880. old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
  881. old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
  882. if(reopen) {
  883. pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
  884. pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
  885. }
  886. else {
  887. pipe_fds[PIPE_READ] = -1;
  888. pipe_fds[PIPE_WRITE] = -1;
  889. }
  890. if(old_pipe_fds[PIPE_READ] > 2)
  891. close(old_pipe_fds[PIPE_READ]);
  892. if(old_pipe_fds[PIPE_WRITE] > 2)
  893. close(old_pipe_fds[PIPE_WRITE]);
  894. netdata_mutex_unlock(&mutex);
  895. return ret;
  896. }
  897. void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
  898. if(unlikely(s->tid == gettid()))
  899. return;
  900. RRDHOST *host = s->host;
  901. int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
  902. // signal the sender there are more data
  903. if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
  904. netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
  905. rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
  906. }
  907. }
  908. static bool rrdhost_set_sender(RRDHOST *host) {
  909. if(unlikely(!host->sender)) return false;
  910. bool ret = false;
  911. sender_lock(host->sender);
  912. if(!host->sender->tid) {
  913. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  914. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  915. host->rrdpush_sender_connection_counter++;
  916. host->sender->tid = gettid();
  917. host->sender->last_state_since_t = now_realtime_sec();
  918. host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
  919. ret = true;
  920. }
  921. sender_unlock(host->sender);
  922. rrdpush_reset_destinations_postpone_time(host);
  923. return ret;
  924. }
  925. static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
  926. if(unlikely(!host->sender)) return;
  927. if(host->sender->tid == gettid()) {
  928. host->sender->tid = 0;
  929. host->sender->exit.shutdown = false;
  930. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  931. host->sender->last_state_since_t = now_realtime_sec();
  932. if(host->destination) {
  933. host->destination->since = host->sender->last_state_since_t;
  934. host->destination->reason = host->sender->exit.reason;
  935. }
  936. }
  937. rrdpush_reset_destinations_postpone_time(host);
  938. }
  939. static bool rrdhost_sender_should_exit(struct sender_state *s) {
  940. // check for outstanding cancellation requests
  941. netdata_thread_testcancel();
  942. if(unlikely(!service_running(SERVICE_STREAMING))) {
  943. if(!s->exit.reason)
  944. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
  945. return true;
  946. }
  947. if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
  948. if(!s->exit.reason)
  949. s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
  950. return true;
  951. }
  952. if(unlikely(s->exit.shutdown)) {
  953. if(!s->exit.reason)
  954. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
  955. return true;
  956. }
  957. if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
  958. if(!s->exit.reason)
  959. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
  960. return true;
  961. }
  962. return false;
  963. }
  964. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  965. struct rrdpush_sender_thread_data *s = ptr;
  966. worker_unregister();
  967. RRDHOST *host = s->host;
  968. sender_lock(host->sender);
  969. netdata_log_info("STREAM %s [send]: sending thread exits %s",
  970. rrdhost_hostname(host),
  971. host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
  972. rrdpush_sender_thread_close_socket(host);
  973. rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
  974. rrdhost_clear_sender___while_having_sender_mutex(host);
  975. sender_unlock(host->sender);
  976. freez(s->pipe_buffer);
  977. freez(s);
  978. }
  979. void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
  980. #ifdef ENABLE_HTTPS
  981. static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
  982. spinlock_lock(&sp);
  983. if(netdata_ssl_streaming_sender_ctx || !host) {
  984. spinlock_unlock(&sp);
  985. return;
  986. }
  987. for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
  988. if (d->ssl) {
  989. // we need to initialize SSL
  990. netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
  991. ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
  992. // stop the loop
  993. break;
  994. }
  995. }
  996. spinlock_unlock(&sp);
  997. #endif
  998. }
  999. void *rrdpush_sender_thread(void *ptr) {
  1000. worker_register("STREAMSND");
  1001. worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
  1002. worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
  1003. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
  1004. worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
  1005. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
  1006. // disconnection reasons
  1007. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
  1008. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
  1009. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
  1010. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
  1011. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
  1012. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
  1013. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
  1014. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
  1015. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
  1016. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
  1017. worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
  1018. worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
  1019. worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
  1020. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
  1021. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
  1022. worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
  1023. struct sender_state *s = ptr;
  1024. if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
  1025. !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
  1026. !*s->host->rrdpush_send_api_key) {
  1027. netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
  1028. rrdhost_hostname(s->host), gettid());
  1029. return NULL;
  1030. }
  1031. if(!rrdhost_set_sender(s->host)) {
  1032. netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
  1033. rrdhost_hostname(s->host), gettid());
  1034. return NULL;
  1035. }
  1036. rrdpush_initialize_ssl_ctx(s->host);
  1037. netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
  1038. s->timeout = (int)appconfig_get_number(
  1039. &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
  1040. s->default_port = (int)appconfig_get_number(
  1041. &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  1042. s->buffer->max_size = (size_t)appconfig_get_number(
  1043. &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
  1044. s->reconnect_delay = (unsigned int)appconfig_get_number(
  1045. &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  1046. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
  1047. &stream_config, CONFIG_SECTION_STREAM,
  1048. "initial clock resync iterations",
  1049. remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
  1050. // initialize rrdpush globals
  1051. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1052. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  1053. int pipe_buffer_size = 10 * 1024;
  1054. #ifdef F_GETPIPE_SZ
  1055. pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
  1056. #endif
  1057. if(pipe_buffer_size < 10 * 1024)
  1058. pipe_buffer_size = 10 * 1024;
  1059. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1060. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1061. rrdhost_hostname(s->host));
  1062. return NULL;
  1063. }
  1064. struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
  1065. thread_data->pipe_buffer = mallocz(pipe_buffer_size);
  1066. thread_data->host = s->host;
  1067. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
  1068. size_t iterations = 0;
  1069. time_t now_s = now_monotonic_sec();
  1070. while(!rrdhost_sender_should_exit(s)) {
  1071. iterations++;
  1072. // The connection attempt blocks (after which we use the socket in nonblocking)
  1073. if(unlikely(s->rrdpush_sender_socket == -1)) {
  1074. worker_is_busy(WORKER_SENDER_JOB_CONNECT);
  1075. now_s = now_monotonic_sec();
  1076. rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
  1077. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1078. s->flags &= ~SENDER_FLAG_OVERFLOW;
  1079. s->read_len = 0;
  1080. s->buffer->read = 0;
  1081. s->buffer->write = 0;
  1082. if(!attempt_to_connect(s))
  1083. continue;
  1084. if(rrdhost_sender_should_exit(s))
  1085. break;
  1086. now_s = s->last_traffic_seen_t = now_monotonic_sec();
  1087. rrdpush_send_claimed_id(s->host);
  1088. rrdpush_send_host_labels(s->host);
  1089. rrdpush_send_global_functions(s->host);
  1090. s->replication.oldest_request_after_t = 0;
  1091. rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1092. netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
  1093. continue;
  1094. }
  1095. if(iterations % 1000 == 0)
  1096. now_s = now_monotonic_sec();
  1097. // If the TCP window never opened then something is wrong, restart connection
  1098. if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
  1099. !rrdpush_sender_pending_replication_requests(s) &&
  1100. !rrdpush_sender_replicating_charts(s)
  1101. )) {
  1102. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  1103. netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
  1104. rrdpush_sender_thread_close_socket(s->host);
  1105. continue;
  1106. }
  1107. sender_lock(s);
  1108. size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
  1109. size_t available = cbuffer_available_size_unsafe(s->buffer);
  1110. if (unlikely(!outstanding)) {
  1111. rrdpush_sender_pipe_clear_pending_data(s);
  1112. rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
  1113. }
  1114. sender_unlock(s);
  1115. worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
  1116. if(outstanding)
  1117. s->send_attempts++;
  1118. if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
  1119. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1120. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1121. rrdhost_hostname(s->host));
  1122. rrdpush_sender_thread_close_socket(s->host);
  1123. break;
  1124. }
  1125. }
  1126. worker_is_idle();
  1127. // Wait until buffer opens in the socket or a rrdset_done_push wakes us
  1128. enum {
  1129. Collector = 0,
  1130. Socket = 1,
  1131. };
  1132. struct pollfd fds[2] = {
  1133. [Collector] = {
  1134. .fd = s->rrdpush_sender_pipe[PIPE_READ],
  1135. .events = POLLIN,
  1136. .revents = 0,
  1137. },
  1138. [Socket] = {
  1139. .fd = s->rrdpush_sender_socket,
  1140. .events = POLLIN | (outstanding ? POLLOUT : 0 ),
  1141. .revents = 0,
  1142. }
  1143. };
  1144. int poll_rc = poll(fds, 2, 1000);
  1145. netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
  1146. fds[Collector].revents, fds[Socket].revents, outstanding);
  1147. if(unlikely(rrdhost_sender_should_exit(s)))
  1148. break;
  1149. internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
  1150. "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1151. internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
  1152. "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1153. // Spurious wake-ups without error - loop again
  1154. if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
  1155. netdata_thread_testcancel();
  1156. netdata_log_debug(D_STREAM, "Spurious wakeup");
  1157. now_s = now_monotonic_sec();
  1158. continue;
  1159. }
  1160. // Only errors from poll() are internal, but try restarting the connection
  1161. if(unlikely(poll_rc == -1)) {
  1162. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
  1163. netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
  1164. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1165. rrdpush_sender_thread_close_socket(s->host);
  1166. continue;
  1167. }
  1168. // If we have data and have seen the TCP window open then try to close it by a transmission.
  1169. if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
  1170. worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
  1171. ssize_t bytes = attempt_to_send(s);
  1172. if(bytes > 0) {
  1173. s->last_traffic_seen_t = now_monotonic_sec();
  1174. worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
  1175. }
  1176. }
  1177. // If the collector woke us up then empty the pipe to remove the signal
  1178. if (fds[Collector].revents & (POLLIN|POLLPRI)) {
  1179. worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
  1180. netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
  1181. if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
  1182. netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
  1183. }
  1184. // Read as much as possible to fill the buffer, split into full lines for execution.
  1185. if (fds[Socket].revents & POLLIN) {
  1186. worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
  1187. ssize_t bytes = attempt_read(s);
  1188. if(bytes > 0) {
  1189. s->last_traffic_seen_t = now_monotonic_sec();
  1190. worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
  1191. }
  1192. }
  1193. if(unlikely(s->read_len))
  1194. execute_commands(s);
  1195. if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1196. char *error = NULL;
  1197. if (unlikely(fds[Collector].revents & POLLERR))
  1198. error = "pipe reports errors (POLLERR)";
  1199. else if (unlikely(fds[Collector].revents & POLLHUP))
  1200. error = "pipe closed (POLLHUP)";
  1201. else if (unlikely(fds[Collector].revents & POLLNVAL))
  1202. error = "pipe is invalid (POLLNVAL)";
  1203. if(error) {
  1204. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1205. netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
  1206. rrdhost_hostname(s->host), s->connected_to, error);
  1207. }
  1208. }
  1209. if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1210. char *error = NULL;
  1211. if (unlikely(fds[Socket].revents & POLLERR))
  1212. error = "socket reports errors (POLLERR)";
  1213. else if (unlikely(fds[Socket].revents & POLLHUP))
  1214. error = "connection closed by remote end (POLLHUP)";
  1215. else if (unlikely(fds[Socket].revents & POLLNVAL))
  1216. error = "connection is invalid (POLLNVAL)";
  1217. if(unlikely(error)) {
  1218. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
  1219. netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
  1220. rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
  1221. rrdpush_sender_thread_close_socket(s->host);
  1222. }
  1223. }
  1224. // protection from overflow
  1225. if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
  1226. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
  1227. errno = 0;
  1228. netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
  1229. rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
  1230. rrdpush_sender_thread_close_socket(s->host);
  1231. }
  1232. worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
  1233. }
  1234. netdata_thread_cleanup_pop(1);
  1235. return NULL;
  1236. }