sender.c 64 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #define WORKER_SENDER_JOB_CONNECT 0
  4. #define WORKER_SENDER_JOB_PIPE_READ 1
  5. #define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
  6. #define WORKER_SENDER_JOB_EXECUTE 3
  7. #define WORKER_SENDER_JOB_SOCKET_SEND 4
  8. #define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
  9. #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
  10. #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
  11. #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
  12. #define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
  13. #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
  14. #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
  15. #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
  16. #define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
  17. #define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
  18. #define WORKER_SENDER_JOB_BUFFER_RATIO 15
  19. #define WORKER_SENDER_JOB_BYTES_RECEIVED 16
  20. #define WORKER_SENDER_JOB_BYTES_SENT 17
  21. #define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
  22. #define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
  23. #define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
  24. #define WORKER_SENDER_JOB_REPLAY_REQUEST 21
  25. #define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
  26. #define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
  27. #if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
  28. #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
  29. #endif
  30. extern struct config stream_config;
  31. extern char *netdata_ssl_ca_path;
  32. extern char *netdata_ssl_ca_file;
  33. static __thread BUFFER *sender_thread_buffer = NULL;
  34. static __thread bool sender_thread_buffer_used = false;
  35. static __thread time_t sender_thread_buffer_last_reset_s = 0;
  36. void sender_thread_buffer_free(void) {
  37. buffer_free(sender_thread_buffer);
  38. sender_thread_buffer = NULL;
  39. sender_thread_buffer_used = false;
  40. }
  41. // Collector thread starting a transmission
  42. BUFFER *sender_start(struct sender_state *s) {
  43. if(unlikely(sender_thread_buffer_used))
  44. fatal("STREAMING: thread buffer is used multiple times concurrently.");
  45. if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
  46. if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
  47. buffer_free(sender_thread_buffer);
  48. sender_thread_buffer = NULL;
  49. }
  50. }
  51. if(unlikely(!sender_thread_buffer)) {
  52. sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
  53. sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
  54. }
  55. sender_thread_buffer_used = true;
  56. buffer_flush(sender_thread_buffer);
  57. return sender_thread_buffer;
  58. }
  59. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
  60. #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
  61. // Collector thread finishing a transmission
  62. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
  63. if(unlikely(wb != sender_thread_buffer))
  64. fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
  65. if(unlikely(!sender_thread_buffer_used))
  66. fatal("STREAMING: sender is committing a buffer twice.");
  67. sender_thread_buffer_used = false;
  68. char *src = (char *)buffer_tostring(wb);
  69. size_t src_len = buffer_strlen(wb);
  70. if(unlikely(!src || !src_len))
  71. return;
  72. sender_lock(s);
  73. // if(s->host == localhost && type == STREAM_TRAFFIC_TYPE_METADATA) {
  74. // FILE *fp = fopen("/tmp/stream.txt", "a");
  75. // fprintf(fp, "\n--- SEND MESSAGE START: %s ----\n"
  76. // "%s"
  77. // "--- SEND MESSAGE END ----------------------------------------\n"
  78. // , rrdhost_hostname(s->host), src
  79. // );
  80. // fclose(fp);
  81. // }
  82. if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
  83. netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
  84. rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
  85. s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
  86. }
  87. if (s->compressor.initialized) {
  88. while(src_len) {
  89. size_t size_to_compress = src_len;
  90. if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
  91. if (stream_has_capability(s, STREAM_CAP_BINARY))
  92. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  93. else {
  94. if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
  95. // we need to find the last newline
  96. // so that the decompressor will have a whole line to work with
  97. const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
  98. while (--t >= src)
  99. if (unlikely(*t == '\n'))
  100. break;
  101. if (t <= src) {
  102. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  103. } else
  104. size_to_compress = t - src + 1;
  105. }
  106. }
  107. }
  108. const char *dst;
  109. size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  110. if (!dst_len) {
  111. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
  112. rrdhost_hostname(s->host), s->connected_to);
  113. rrdpush_compression_initialize(s);
  114. dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  115. if(!dst_len) {
  116. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
  117. rrdhost_hostname(s->host), s->connected_to);
  118. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
  119. rrdpush_compression_deactivate(s);
  120. rrdpush_sender_thread_close_socket(s->host);
  121. sender_unlock(s);
  122. return;
  123. }
  124. }
  125. rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len);
  126. #ifdef NETDATA_INTERNAL_CHECKS
  127. // check if reversing the signature provides the same length
  128. size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature));
  129. if(decoded_dst_len != dst_len)
  130. fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, "
  131. "compressed payload length %zu bytes, but signature says payload is %zu bytes",
  132. size_to_compress, dst_len, decoded_dst_len);
  133. #endif
  134. if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature)))
  135. s->flags |= SENDER_FLAG_OVERFLOW;
  136. else {
  137. if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
  138. s->flags |= SENDER_FLAG_OVERFLOW;
  139. else
  140. s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature);
  141. }
  142. src = src + size_to_compress;
  143. src_len -= size_to_compress;
  144. }
  145. }
  146. else if(cbuffer_add_unsafe(s->buffer, src, src_len))
  147. s->flags |= SENDER_FLAG_OVERFLOW;
  148. else
  149. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  150. replication_recalculate_buffer_used_ratio_unsafe(s);
  151. bool signal_sender = false;
  152. if(!rrdpush_sender_pipe_has_pending_data(s)) {
  153. rrdpush_sender_pipe_set_pending_data(s);
  154. signal_sender = true;
  155. }
  156. sender_unlock(s);
  157. if(signal_sender && (!stream_has_capability(s, STREAM_CAP_INTERPOLATED) || type != STREAM_TRAFFIC_TYPE_DATA))
  158. rrdpush_signal_sender_to_wake_up(s);
  159. }
  160. static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
  161. buffer_sprintf(
  162. wb
  163. , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
  164. , rrdvar_name(rva)
  165. , rrdvar2number(rva)
  166. );
  167. netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
  168. }
  169. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
  170. if(rrdhost_can_send_definitions_to_parent(host)) {
  171. BUFFER *wb = sender_start(host->sender);
  172. rrdpush_sender_add_host_variable_to_buffer(wb, rva);
  173. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  174. sender_thread_buffer_free();
  175. }
  176. }
  177. struct custom_host_variables_callback {
  178. BUFFER *wb;
  179. };
  180. static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
  181. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  182. struct custom_host_variables_callback *tmp = struct_ptr;
  183. BUFFER *wb = tmp->wb;
  184. if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
  185. rrdpush_sender_add_host_variable_to_buffer(wb, rv);
  186. return 1;
  187. }
  188. return 0;
  189. }
  190. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  191. if(rrdhost_can_send_definitions_to_parent(host)) {
  192. BUFFER *wb = sender_start(host->sender);
  193. struct custom_host_variables_callback tmp = {
  194. .wb = wb
  195. };
  196. int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
  197. (void)ret;
  198. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  199. sender_thread_buffer_free();
  200. netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  201. }
  202. }
  203. // resets all the chart, so that their definitions
  204. // will be resent to the central netdata
  205. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  206. RRDSET *st;
  207. rrdset_foreach_read(st, host) {
  208. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  209. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  210. st->rrdpush.sender.resync_time_s = 0;
  211. rrdset_metadata_updated(st);
  212. RRDDIM *rd;
  213. rrddim_foreach_read(rd, st)
  214. rrddim_metadata_exposed_upstream_clear(rd);
  215. rrddim_foreach_done(rd);
  216. }
  217. rrdset_foreach_done(st);
  218. rrdhost_sender_replicating_charts_zero(host);
  219. }
  220. static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
  221. static __thread time_t last_reset_time_s = 0;
  222. if(!force && now_s - last_reset_time_s < 300)
  223. return;
  224. if(!have_mutex)
  225. sender_lock(s);
  226. rrdpush_sender_last_buffer_recreate_set(s, now_s);
  227. last_reset_time_s = now_s;
  228. if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
  229. size_t max = s->buffer->max_size;
  230. cbuffer_free(s->buffer);
  231. s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
  232. }
  233. sender_thread_buffer_free();
  234. if(!have_mutex)
  235. sender_unlock(s);
  236. }
  237. static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
  238. rrdpush_sender_set_flush_time(host->sender);
  239. sender_lock(host->sender);
  240. // flush the output buffer from any data it may have
  241. cbuffer_flush(host->sender->buffer);
  242. rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
  243. replication_recalculate_buffer_used_ratio_unsafe(host->sender);
  244. sender_unlock(host->sender);
  245. }
  246. static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
  247. rrdpush_sender_set_flush_time(host->sender);
  248. // stop all replication commands inflight
  249. replication_sender_delete_pending_requests(host->sender);
  250. // reset the state of all charts
  251. rrdpush_sender_thread_reset_all_charts(host);
  252. rrdpush_sender_replicating_charts_zero(host->sender);
  253. }
  254. static void rrdpush_sender_on_connect(RRDHOST *host) {
  255. rrdpush_sender_cbuffer_flush(host);
  256. rrdpush_sender_charts_and_replication_reset(host);
  257. }
  258. static void rrdpush_sender_after_connect(RRDHOST *host) {
  259. rrdpush_sender_thread_send_custom_host_variables(host);
  260. }
  261. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  262. #ifdef ENABLE_HTTPS
  263. netdata_ssl_close(&host->sender->ssl);
  264. #endif
  265. if(host->sender->rrdpush_sender_socket != -1) {
  266. close(host->sender->rrdpush_sender_socket);
  267. host->sender->rrdpush_sender_socket = -1;
  268. }
  269. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  270. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  271. // do not flush the circular buffer here
  272. // this function is called sometimes with the mutex lock, sometimes without the lock
  273. rrdpush_sender_charts_and_replication_reset(host);
  274. }
  275. void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
  276. {
  277. se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
  278. se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
  279. se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
  280. se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz("");
  281. se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
  282. }
  283. void rrdpush_clean_encoded(stream_encoded_t *se)
  284. {
  285. if (se->os_name)
  286. freez(se->os_name);
  287. if (se->os_id)
  288. freez(se->os_id);
  289. if (se->os_version)
  290. freez(se->os_version);
  291. if (se->kernel_name)
  292. freez(se->kernel_name);
  293. if (se->kernel_version)
  294. freez(se->kernel_version);
  295. }
  296. struct {
  297. const char *response;
  298. size_t length;
  299. int32_t version;
  300. bool dynamic;
  301. const char *error;
  302. int worker_job_id;
  303. int postpone_reconnect_seconds;
  304. bool prevent_log;
  305. } stream_responses[] = {
  306. {
  307. .response = START_STREAMING_PROMPT_VN,
  308. .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
  309. .version = STREAM_HANDSHAKE_OK_V3, // and above
  310. .dynamic = true, // dynamic = we will parse the version / capabilities
  311. .error = NULL,
  312. .worker_job_id = 0,
  313. .postpone_reconnect_seconds = 0,
  314. },
  315. {
  316. .response = START_STREAMING_PROMPT_V2,
  317. .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
  318. .version = STREAM_HANDSHAKE_OK_V2,
  319. .dynamic = false,
  320. .error = NULL,
  321. .worker_job_id = 0,
  322. .postpone_reconnect_seconds = 0,
  323. },
  324. {
  325. .response = START_STREAMING_PROMPT_V1,
  326. .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
  327. .version = STREAM_HANDSHAKE_OK_V1,
  328. .dynamic = false,
  329. .error = NULL,
  330. .worker_job_id = 0,
  331. .postpone_reconnect_seconds = 0,
  332. },
  333. {
  334. .response = START_STREAMING_ERROR_SAME_LOCALHOST,
  335. .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
  336. .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
  337. .dynamic = false,
  338. .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
  339. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  340. .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
  341. .prevent_log = true,
  342. },
  343. {
  344. .response = START_STREAMING_ERROR_ALREADY_STREAMING,
  345. .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
  346. .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
  347. .dynamic = false,
  348. .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
  349. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  350. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  351. .prevent_log = true,
  352. },
  353. {
  354. .response = START_STREAMING_ERROR_NOT_PERMITTED,
  355. .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
  356. .version = STREAM_HANDSHAKE_ERROR_DENIED,
  357. .dynamic = false,
  358. .error = "remote server denied access, probably we don't have the right API key?",
  359. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  360. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  361. },
  362. {
  363. .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
  364. .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
  365. .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
  366. .dynamic = false,
  367. .error = "remote server is currently busy, we should try later",
  368. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  369. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  370. },
  371. {
  372. .response = START_STREAMING_ERROR_INTERNAL_ERROR,
  373. .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
  374. .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
  375. .dynamic = false,
  376. .error = "remote server is encountered an internal error, we should try later",
  377. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  378. .postpone_reconnect_seconds = 5 * 60, // 5 minutes
  379. },
  380. {
  381. .response = START_STREAMING_ERROR_INITIALIZATION,
  382. .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
  383. .version = STREAM_HANDSHAKE_INITIALIZATION,
  384. .dynamic = false,
  385. .error = "remote server is initializing, we should try later",
  386. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  387. .postpone_reconnect_seconds = 2 * 60, // 2 minute
  388. },
  389. // terminator
  390. {
  391. .response = NULL,
  392. .length = 0,
  393. .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
  394. .dynamic = false,
  395. .error = "remote node response is not understood, is it Netdata?",
  396. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  397. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  398. .prevent_log = false,
  399. }
  400. };
  401. static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
  402. int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
  403. int i;
  404. for(i = 0; stream_responses[i].response ; i++) {
  405. if(stream_responses[i].dynamic &&
  406. http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
  407. strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
  408. version = str2i(&http[stream_responses[i].length]);
  409. break;
  410. }
  411. else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
  412. version = stream_responses[i].version;
  413. break;
  414. }
  415. }
  416. if(version >= STREAM_HANDSHAKE_OK_V1) {
  417. host->destination->reason = version;
  418. host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
  419. s->capabilities = convert_stream_version_to_capabilities(version, host, true);
  420. return true;
  421. }
  422. bool prevent_log = stream_responses[i].prevent_log;
  423. const char *error = stream_responses[i].error;
  424. int worker_job_id = stream_responses[i].worker_job_id;
  425. int delay = stream_responses[i].postpone_reconnect_seconds;
  426. worker_is_busy(worker_job_id);
  427. rrdpush_sender_thread_close_socket(host);
  428. host->destination->reason = version;
  429. host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
  430. char buf[LOG_DATE_LENGTH];
  431. log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
  432. if(prevent_log)
  433. internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
  434. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  435. else
  436. netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
  437. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  438. return false;
  439. }
  440. static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
  441. #ifdef ENABLE_HTTPS
  442. RRDHOST *host = s->host;
  443. bool ssl_required = host->destination && host->destination->ssl;
  444. netdata_ssl_close(&host->sender->ssl);
  445. if(!ssl_required)
  446. return true;
  447. if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
  448. if(!netdata_ssl_connect(&host->sender->ssl)) {
  449. // couldn't connect
  450. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  451. rrdpush_sender_thread_close_socket(host);
  452. host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
  453. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  454. return false;
  455. }
  456. if (netdata_ssl_validate_certificate_sender &&
  457. security_test_certificate(host->sender->ssl.conn)) {
  458. // certificate is not valid
  459. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  460. netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
  461. rrdpush_sender_thread_close_socket(host);
  462. host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
  463. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  464. return false;
  465. }
  466. return true;
  467. }
  468. netdata_log_error("SSL: failed to establish connection.");
  469. return false;
  470. #else
  471. // SSL is not enabled
  472. return true;
  473. #endif
  474. }
  475. static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
  476. struct timeval tv = {
  477. .tv_sec = timeout,
  478. .tv_usec = 0
  479. };
  480. // make sure the socket is closed
  481. rrdpush_sender_thread_close_socket(host);
  482. s->rrdpush_sender_socket = connect_to_one_of_destinations(
  483. host
  484. , default_port
  485. , &tv
  486. , &s->reconnects_counter
  487. , s->connected_to
  488. , sizeof(s->connected_to)-1
  489. , &host->destination
  490. );
  491. if(unlikely(s->rrdpush_sender_socket == -1)) {
  492. // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
  493. return false;
  494. }
  495. // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
  496. // reset our capabilities to default
  497. s->capabilities = stream_our_capabilities(host, true);
  498. /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
  499. version negotiation resulted in a high enough version.
  500. */
  501. stream_encoded_t se;
  502. rrdpush_encode_variable(&se, host);
  503. host->sender->hops = host->system_info->hops + 1;
  504. char http[HTTP_HEADER_SIZE + 1];
  505. int eol = snprintfz(http, HTTP_HEADER_SIZE,
  506. "STREAM "
  507. "key=%s"
  508. "&hostname=%s"
  509. "&registry_hostname=%s"
  510. "&machine_guid=%s"
  511. "&update_every=%d"
  512. "&os=%s"
  513. "&timezone=%s"
  514. "&abbrev_timezone=%s"
  515. "&utc_offset=%d"
  516. "&hops=%d"
  517. "&ml_capable=%d"
  518. "&ml_enabled=%d"
  519. "&mc_version=%d"
  520. "&tags=%s"
  521. "&ver=%u"
  522. "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
  523. "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
  524. "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
  525. "&NETDATA_SYSTEM_OS_NAME=%s"
  526. "&NETDATA_SYSTEM_OS_ID=%s"
  527. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  528. "&NETDATA_SYSTEM_OS_VERSION=%s"
  529. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  530. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  531. "&NETDATA_HOST_IS_K8S_NODE=%s"
  532. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  533. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  534. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  535. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  536. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  537. "&NETDATA_SYSTEM_CONTAINER=%s"
  538. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  539. "&NETDATA_CONTAINER_OS_NAME=%s"
  540. "&NETDATA_CONTAINER_OS_ID=%s"
  541. "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
  542. "&NETDATA_CONTAINER_OS_VERSION=%s"
  543. "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
  544. "&NETDATA_CONTAINER_OS_DETECTION=%s"
  545. "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
  546. "&NETDATA_SYSTEM_CPU_FREQ=%s"
  547. "&NETDATA_SYSTEM_TOTAL_RAM=%s"
  548. "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
  549. "&NETDATA_PROTOCOL_VERSION=%s"
  550. " HTTP/1.1\r\n"
  551. "User-Agent: %s/%s\r\n"
  552. "Accept: */*\r\n\r\n"
  553. , host->rrdpush_send_api_key
  554. , rrdhost_hostname(host)
  555. , rrdhost_registry_hostname(host)
  556. , host->machine_guid
  557. , default_rrd_update_every
  558. , rrdhost_os(host)
  559. , rrdhost_timezone(host)
  560. , rrdhost_abbrev_timezone(host)
  561. , host->utc_offset
  562. , host->sender->hops
  563. , host->system_info->ml_capable
  564. , host->system_info->ml_enabled
  565. , host->system_info->mc_version
  566. , rrdhost_tags(host)
  567. , s->capabilities
  568. , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
  569. , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
  570. , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
  571. , se.os_name
  572. , se.os_id
  573. , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
  574. , se.os_version
  575. , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
  576. , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
  577. , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
  578. , se.kernel_name
  579. , se.kernel_version
  580. , (host->system_info->architecture) ? host->system_info->architecture : ""
  581. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  582. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  583. , (host->system_info->container) ? host->system_info->container : ""
  584. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  585. , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
  586. , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
  587. , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
  588. , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
  589. , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
  590. , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
  591. , (host->system_info->host_cores) ? host->system_info->host_cores : ""
  592. , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
  593. , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
  594. , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
  595. , STREAMING_PROTOCOL_VERSION
  596. , rrdhost_program_name(host)
  597. , rrdhost_program_version(host)
  598. );
  599. http[eol] = 0x00;
  600. rrdpush_clean_encoded(&se);
  601. if(!rrdpush_sender_connect_ssl(s))
  602. return false;
  603. ssize_t bytes, len = (ssize_t)strlen(http);
  604. bytes = send_timeout(
  605. #ifdef ENABLE_HTTPS
  606. &host->sender->ssl,
  607. #endif
  608. s->rrdpush_sender_socket,
  609. http,
  610. len,
  611. 0,
  612. timeout);
  613. if(bytes <= 0) { // timeout is 0
  614. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  615. rrdpush_sender_thread_close_socket(host);
  616. netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
  617. host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
  618. host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
  619. return false;
  620. }
  621. bytes = recv_timeout(
  622. #ifdef ENABLE_HTTPS
  623. &host->sender->ssl,
  624. #endif
  625. s->rrdpush_sender_socket,
  626. http,
  627. HTTP_HEADER_SIZE,
  628. 0,
  629. timeout);
  630. if(bytes <= 0) { // timeout is 0
  631. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  632. rrdpush_sender_thread_close_socket(host);
  633. netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
  634. host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
  635. host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
  636. return false;
  637. }
  638. if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
  639. netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
  640. if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
  641. netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
  642. http[bytes] = '\0';
  643. netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
  644. if(!rrdpush_sender_validate_response(host, s, http, bytes))
  645. return false;
  646. rrdpush_compression_initialize(s);
  647. log_sender_capabilities(s);
  648. netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
  649. return true;
  650. }
  651. static bool attempt_to_connect(struct sender_state *state)
  652. {
  653. state->send_attempts = 0;
  654. // reset the bytes we have sent for this session
  655. state->sent_bytes_on_this_connection = 0;
  656. memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
  657. if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
  658. // reset the buffer, to properly send charts and metrics
  659. rrdpush_sender_on_connect(state->host);
  660. // send from the beginning
  661. state->begin = 0;
  662. // make sure the next reconnection will be immediate
  663. state->not_connected_loops = 0;
  664. // let the data collection threads know we are ready
  665. rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  666. rrdpush_sender_after_connect(state->host);
  667. return true;
  668. }
  669. // we couldn't connect
  670. // increase the failed connections counter
  671. state->not_connected_loops++;
  672. // slow re-connection on repeating errors
  673. usec_t now_ut = now_monotonic_usec();
  674. usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
  675. while(now_ut < end_ut) {
  676. netdata_thread_testcancel();
  677. sleep_usec(500 * USEC_PER_MS); // seconds
  678. now_ut = now_monotonic_usec();
  679. }
  680. return false;
  681. }
  682. // TCP window is open, and we have data to transmit.
  683. static ssize_t attempt_to_send(struct sender_state *s) {
  684. ssize_t ret;
  685. #ifdef NETDATA_INTERNAL_CHECKS
  686. struct circular_buffer *cb = s->buffer;
  687. #endif
  688. sender_lock(s);
  689. char *chunk;
  690. size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
  691. netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
  692. #ifdef ENABLE_HTTPS
  693. if(SSL_connection(&s->ssl))
  694. ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
  695. else
  696. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  697. #else
  698. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  699. #endif
  700. if (likely(ret > 0)) {
  701. cbuffer_remove_unsafe(s->buffer, ret);
  702. s->sent_bytes_on_this_connection += ret;
  703. s->sent_bytes += ret;
  704. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
  705. }
  706. else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
  707. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
  708. else if (ret == -1) {
  709. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
  710. netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
  711. netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
  712. rrdpush_sender_thread_close_socket(s->host);
  713. }
  714. else
  715. netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
  716. replication_recalculate_buffer_used_ratio_unsafe(s);
  717. sender_unlock(s);
  718. return ret;
  719. }
  720. static ssize_t attempt_read(struct sender_state *s) {
  721. ssize_t ret;
  722. #ifdef ENABLE_HTTPS
  723. if (SSL_connection(&s->ssl))
  724. ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
  725. else
  726. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  727. #else
  728. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  729. #endif
  730. if (ret > 0) {
  731. s->read_len += ret;
  732. return ret;
  733. }
  734. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
  735. return ret;
  736. #ifdef ENABLE_HTTPS
  737. if (SSL_connection(&s->ssl))
  738. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  739. else
  740. #endif
  741. if (ret == 0 || errno == ECONNRESET) {
  742. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
  743. netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
  744. }
  745. else {
  746. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
  747. netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
  748. }
  749. rrdpush_sender_thread_close_socket(s->host);
  750. return ret;
  751. }
  752. struct inflight_stream_function {
  753. struct sender_state *sender;
  754. STRING *transaction;
  755. usec_t received_ut;
  756. };
  757. void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
  758. struct inflight_stream_function *tmp = data;
  759. struct sender_state *s = tmp->sender;
  760. if(rrdhost_can_send_definitions_to_parent(s->host)) {
  761. BUFFER *wb = sender_start(s);
  762. pluginsd_function_result_begin_to_buffer(wb
  763. , string2str(tmp->transaction)
  764. , code
  765. , functions_content_type_to_format(func_wb->content_type)
  766. , func_wb->expires);
  767. buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
  768. pluginsd_function_result_end_to_buffer(wb);
  769. sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
  770. sender_thread_buffer_free();
  771. internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).",
  772. rrdhost_hostname(s->host), s->connected_to,
  773. string2str(tmp->transaction),
  774. buffer_strlen(func_wb),
  775. now_realtime_usec() - tmp->received_ut);
  776. }
  777. string_freez(tmp->transaction);
  778. buffer_free(func_wb);
  779. freez(tmp);
  780. }
  781. // This is just a placeholder until the gap filling state machine is inserted
  782. void execute_commands(struct sender_state *s) {
  783. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  784. char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
  785. *end = 0;
  786. while( start < end && (newline = strchr(start, '\n')) ) {
  787. *newline = '\0';
  788. if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
  789. if (buffer_strlen(s->function_payload.payload) != 0)
  790. buffer_strcat(s->function_payload.payload, "\n");
  791. buffer_strcat(s->function_payload.payload, start);
  792. start = newline + 1;
  793. continue;
  794. }
  795. netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
  796. gettid(), s->connected_to, rrdhost_hostname(s->host), start);
  797. // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
  798. char *words[PLUGINSD_MAX_WORDS] = { NULL };
  799. size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
  800. const char *keyword = get_word(words, num_words, 0);
  801. if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
  802. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  803. char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
  804. char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
  805. char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
  806. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  807. netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  808. rrdhost_hostname(s->host), s->connected_to,
  809. keyword,
  810. transaction?transaction:"(unset)",
  811. timeout_s?timeout_s:"(unset)",
  812. function?function:"(unset)");
  813. }
  814. else {
  815. int timeout = str2i(timeout_s);
  816. if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  817. struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
  818. tmp->received_ut = now_realtime_usec();
  819. tmp->sender = s;
  820. tmp->transaction = string_strdupz(transaction);
  821. BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
  822. char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
  823. int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
  824. stream_execute_function_callback, tmp, NULL, NULL, payload);
  825. if(code != HTTP_RESP_OK) {
  826. if (!buffer_strlen(wb))
  827. rrd_call_function_error(wb, "Failed to route request to collector", code);
  828. stream_execute_function_callback(wb, code, tmp);
  829. }
  830. }
  831. if (s->receiving_function_payload) {
  832. s->receiving_function_payload = false;
  833. buffer_free(s->function_payload.payload);
  834. freez(s->function_payload.txid);
  835. freez(s->function_payload.timeout);
  836. freez(s->function_payload.fn_name);
  837. memset(&s->function_payload, 0, sizeof(struct function_payload_state));
  838. }
  839. }
  840. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
  841. if (s->receiving_function_payload) {
  842. netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
  843. s->receiving_function_payload = false;
  844. buffer_free(s->function_payload.payload);
  845. s->function_payload.payload = NULL;
  846. // TODO send error response
  847. }
  848. char *transaction = get_word(words, num_words, 1);
  849. char *timeout_s = get_word(words, num_words, 2);
  850. char *function = get_word(words, num_words, 3);
  851. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  852. netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  853. rrdhost_hostname(s->host), s->connected_to,
  854. keyword,
  855. transaction?transaction:"(unset)",
  856. timeout_s?timeout_s:"(unset)",
  857. function?function:"(unset)");
  858. }
  859. s->receiving_function_payload = true;
  860. s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
  861. s->function_payload.txid = strdupz(get_word(words, num_words, 1));
  862. s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
  863. s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
  864. }
  865. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
  866. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  867. char *transaction = get_word(words, num_words, 1);
  868. if(transaction && *transaction)
  869. rrd_function_cancel(transaction);
  870. }
  871. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
  872. worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
  873. const char *chart_id = get_word(words, num_words, 1);
  874. const char *start_streaming = get_word(words, num_words, 2);
  875. const char *after = get_word(words, num_words, 3);
  876. const char *before = get_word(words, num_words, 4);
  877. if (!chart_id || !start_streaming || !after || !before) {
  878. netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
  879. " (chart=%s, start_streaming=%s, after=%s, before=%s)",
  880. rrdhost_hostname(s->host), s->connected_to,
  881. keyword,
  882. chart_id ? chart_id : "(unset)",
  883. start_streaming ? start_streaming : "(unset)",
  884. after ? after : "(unset)",
  885. before ? before : "(unset)");
  886. }
  887. else {
  888. replication_add_request(s, chart_id,
  889. strtoll(after, NULL, 0),
  890. strtoll(before, NULL, 0),
  891. !strcmp(start_streaming, "true")
  892. );
  893. }
  894. }
  895. else {
  896. netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
  897. }
  898. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  899. start = newline + 1;
  900. }
  901. if (start < end) {
  902. memmove(s->read_buffer, start, end-start);
  903. s->read_len = end - start;
  904. }
  905. else {
  906. s->read_buffer[0] = '\0';
  907. s->read_len = 0;
  908. }
  909. }
  910. struct rrdpush_sender_thread_data {
  911. RRDHOST *host;
  912. char *pipe_buffer;
  913. };
  914. static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
  915. static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
  916. bool ret = true;
  917. netdata_mutex_lock(&mutex);
  918. int new_pipe_fds[2];
  919. if(reopen) {
  920. if(pipe(new_pipe_fds) != 0) {
  921. netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
  922. new_pipe_fds[PIPE_READ] = -1;
  923. new_pipe_fds[PIPE_WRITE] = -1;
  924. ret = false;
  925. }
  926. }
  927. int old_pipe_fds[2];
  928. old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
  929. old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
  930. if(reopen) {
  931. pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
  932. pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
  933. }
  934. else {
  935. pipe_fds[PIPE_READ] = -1;
  936. pipe_fds[PIPE_WRITE] = -1;
  937. }
  938. if(old_pipe_fds[PIPE_READ] > 2)
  939. close(old_pipe_fds[PIPE_READ]);
  940. if(old_pipe_fds[PIPE_WRITE] > 2)
  941. close(old_pipe_fds[PIPE_WRITE]);
  942. netdata_mutex_unlock(&mutex);
  943. return ret;
  944. }
  945. void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
  946. if(unlikely(s->tid == gettid()))
  947. return;
  948. RRDHOST *host = s->host;
  949. int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
  950. // signal the sender there are more data
  951. if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
  952. netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
  953. rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
  954. }
  955. }
  956. static bool rrdhost_set_sender(RRDHOST *host) {
  957. if(unlikely(!host->sender)) return false;
  958. bool ret = false;
  959. sender_lock(host->sender);
  960. if(!host->sender->tid) {
  961. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  962. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  963. host->rrdpush_sender_connection_counter++;
  964. host->sender->tid = gettid();
  965. host->sender->last_state_since_t = now_realtime_sec();
  966. host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
  967. ret = true;
  968. }
  969. sender_unlock(host->sender);
  970. rrdpush_reset_destinations_postpone_time(host);
  971. return ret;
  972. }
  973. static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
  974. if(unlikely(!host->sender)) return;
  975. if(host->sender->tid == gettid()) {
  976. host->sender->tid = 0;
  977. host->sender->exit.shutdown = false;
  978. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  979. host->sender->last_state_since_t = now_realtime_sec();
  980. if(host->destination) {
  981. host->destination->since = host->sender->last_state_since_t;
  982. host->destination->reason = host->sender->exit.reason;
  983. }
  984. }
  985. rrdpush_reset_destinations_postpone_time(host);
  986. }
  987. static bool rrdhost_sender_should_exit(struct sender_state *s) {
  988. // check for outstanding cancellation requests
  989. netdata_thread_testcancel();
  990. if(unlikely(!service_running(SERVICE_STREAMING))) {
  991. if(!s->exit.reason)
  992. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
  993. return true;
  994. }
  995. if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
  996. if(!s->exit.reason)
  997. s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
  998. return true;
  999. }
  1000. if(unlikely(s->exit.shutdown)) {
  1001. if(!s->exit.reason)
  1002. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
  1003. return true;
  1004. }
  1005. if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
  1006. if(!s->exit.reason)
  1007. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
  1008. return true;
  1009. }
  1010. return false;
  1011. }
  1012. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  1013. struct rrdpush_sender_thread_data *s = ptr;
  1014. worker_unregister();
  1015. RRDHOST *host = s->host;
  1016. sender_lock(host->sender);
  1017. netdata_log_info("STREAM %s [send]: sending thread exits %s",
  1018. rrdhost_hostname(host),
  1019. host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
  1020. rrdpush_sender_thread_close_socket(host);
  1021. rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
  1022. rrdhost_clear_sender___while_having_sender_mutex(host);
  1023. sender_unlock(host->sender);
  1024. freez(s->pipe_buffer);
  1025. freez(s);
  1026. }
  1027. void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
  1028. #ifdef ENABLE_HTTPS
  1029. static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
  1030. spinlock_lock(&sp);
  1031. if(netdata_ssl_streaming_sender_ctx || !host) {
  1032. spinlock_unlock(&sp);
  1033. return;
  1034. }
  1035. for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
  1036. if (d->ssl) {
  1037. // we need to initialize SSL
  1038. netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
  1039. ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
  1040. // stop the loop
  1041. break;
  1042. }
  1043. }
  1044. spinlock_unlock(&sp);
  1045. #endif
  1046. }
  1047. void *rrdpush_sender_thread(void *ptr) {
  1048. worker_register("STREAMSND");
  1049. worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
  1050. worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
  1051. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
  1052. worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
  1053. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
  1054. // disconnection reasons
  1055. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
  1056. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
  1057. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
  1058. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
  1059. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
  1060. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
  1061. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
  1062. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
  1063. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
  1064. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
  1065. worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
  1066. worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
  1067. worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
  1068. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
  1069. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
  1070. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1071. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1072. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE);
  1073. worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
  1074. struct sender_state *s = ptr;
  1075. if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
  1076. !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
  1077. !*s->host->rrdpush_send_api_key) {
  1078. netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
  1079. rrdhost_hostname(s->host), gettid());
  1080. return NULL;
  1081. }
  1082. if(!rrdhost_set_sender(s->host)) {
  1083. netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
  1084. rrdhost_hostname(s->host), gettid());
  1085. return NULL;
  1086. }
  1087. rrdpush_initialize_ssl_ctx(s->host);
  1088. netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
  1089. s->timeout = (int)appconfig_get_number(
  1090. &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
  1091. s->default_port = (int)appconfig_get_number(
  1092. &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  1093. s->buffer->max_size = (size_t)appconfig_get_number(
  1094. &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
  1095. s->reconnect_delay = (unsigned int)appconfig_get_number(
  1096. &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  1097. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
  1098. &stream_config, CONFIG_SECTION_STREAM,
  1099. "initial clock resync iterations",
  1100. remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
  1101. // initialize rrdpush globals
  1102. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1103. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  1104. int pipe_buffer_size = 10 * 1024;
  1105. #ifdef F_GETPIPE_SZ
  1106. pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
  1107. #endif
  1108. if(pipe_buffer_size < 10 * 1024)
  1109. pipe_buffer_size = 10 * 1024;
  1110. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1111. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1112. rrdhost_hostname(s->host));
  1113. return NULL;
  1114. }
  1115. struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
  1116. thread_data->pipe_buffer = mallocz(pipe_buffer_size);
  1117. thread_data->host = s->host;
  1118. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
  1119. size_t iterations = 0;
  1120. time_t now_s = now_monotonic_sec();
  1121. while(!rrdhost_sender_should_exit(s)) {
  1122. iterations++;
  1123. // The connection attempt blocks (after which we use the socket in nonblocking)
  1124. if(unlikely(s->rrdpush_sender_socket == -1)) {
  1125. worker_is_busy(WORKER_SENDER_JOB_CONNECT);
  1126. now_s = now_monotonic_sec();
  1127. rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
  1128. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1129. s->flags &= ~SENDER_FLAG_OVERFLOW;
  1130. s->read_len = 0;
  1131. s->buffer->read = 0;
  1132. s->buffer->write = 0;
  1133. if(!attempt_to_connect(s))
  1134. continue;
  1135. if(rrdhost_sender_should_exit(s))
  1136. break;
  1137. now_s = s->last_traffic_seen_t = now_monotonic_sec();
  1138. rrdpush_send_claimed_id(s->host);
  1139. rrdpush_send_host_labels(s->host);
  1140. rrdpush_send_global_functions(s->host);
  1141. rrdpush_send_dyncfg(s->host);
  1142. s->replication.oldest_request_after_t = 0;
  1143. rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1144. netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
  1145. continue;
  1146. }
  1147. if(iterations % 1000 == 0)
  1148. now_s = now_monotonic_sec();
  1149. // If the TCP window never opened then something is wrong, restart connection
  1150. if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
  1151. !rrdpush_sender_pending_replication_requests(s) &&
  1152. !rrdpush_sender_replicating_charts(s)
  1153. )) {
  1154. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  1155. netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
  1156. rrdpush_sender_thread_close_socket(s->host);
  1157. continue;
  1158. }
  1159. sender_lock(s);
  1160. size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
  1161. size_t available = cbuffer_available_size_unsafe(s->buffer);
  1162. if (unlikely(!outstanding)) {
  1163. rrdpush_sender_pipe_clear_pending_data(s);
  1164. rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
  1165. }
  1166. if(s->compressor.initialized) {
  1167. size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed;
  1168. size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t);
  1169. NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed);
  1170. worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed);
  1171. worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed);
  1172. worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio);
  1173. }
  1174. sender_unlock(s);
  1175. worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
  1176. if(outstanding)
  1177. s->send_attempts++;
  1178. if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
  1179. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1180. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1181. rrdhost_hostname(s->host));
  1182. rrdpush_sender_thread_close_socket(s->host);
  1183. break;
  1184. }
  1185. }
  1186. worker_is_idle();
  1187. // Wait until buffer opens in the socket or a rrdset_done_push wakes us
  1188. enum {
  1189. Collector = 0,
  1190. Socket = 1,
  1191. };
  1192. struct pollfd fds[2] = {
  1193. [Collector] = {
  1194. .fd = s->rrdpush_sender_pipe[PIPE_READ],
  1195. .events = POLLIN,
  1196. .revents = 0,
  1197. },
  1198. [Socket] = {
  1199. .fd = s->rrdpush_sender_socket,
  1200. .events = POLLIN | (outstanding ? POLLOUT : 0 ),
  1201. .revents = 0,
  1202. }
  1203. };
  1204. int poll_rc = poll(fds, 2, 50); // timeout in milliseconds
  1205. netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
  1206. fds[Collector].revents, fds[Socket].revents, outstanding);
  1207. if(unlikely(rrdhost_sender_should_exit(s)))
  1208. break;
  1209. internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
  1210. "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1211. internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
  1212. "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1213. // Spurious wake-ups without error - loop again
  1214. if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
  1215. netdata_thread_testcancel();
  1216. netdata_log_debug(D_STREAM, "Spurious wakeup");
  1217. now_s = now_monotonic_sec();
  1218. continue;
  1219. }
  1220. // Only errors from poll() are internal, but try restarting the connection
  1221. if(unlikely(poll_rc == -1)) {
  1222. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
  1223. netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
  1224. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1225. rrdpush_sender_thread_close_socket(s->host);
  1226. continue;
  1227. }
  1228. // If we have data and have seen the TCP window open then try to close it by a transmission.
  1229. if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
  1230. worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
  1231. ssize_t bytes = attempt_to_send(s);
  1232. if(bytes > 0) {
  1233. s->last_traffic_seen_t = now_monotonic_sec();
  1234. worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
  1235. }
  1236. }
  1237. // If the collector woke us up then empty the pipe to remove the signal
  1238. if (fds[Collector].revents & (POLLIN|POLLPRI)) {
  1239. worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
  1240. netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
  1241. if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
  1242. netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
  1243. }
  1244. // Read as much as possible to fill the buffer, split into full lines for execution.
  1245. if (fds[Socket].revents & POLLIN) {
  1246. worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
  1247. ssize_t bytes = attempt_read(s);
  1248. if(bytes > 0) {
  1249. s->last_traffic_seen_t = now_monotonic_sec();
  1250. worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
  1251. }
  1252. }
  1253. if(unlikely(s->read_len))
  1254. execute_commands(s);
  1255. if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1256. char *error = NULL;
  1257. if (unlikely(fds[Collector].revents & POLLERR))
  1258. error = "pipe reports errors (POLLERR)";
  1259. else if (unlikely(fds[Collector].revents & POLLHUP))
  1260. error = "pipe closed (POLLHUP)";
  1261. else if (unlikely(fds[Collector].revents & POLLNVAL))
  1262. error = "pipe is invalid (POLLNVAL)";
  1263. if(error) {
  1264. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1265. netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
  1266. rrdhost_hostname(s->host), s->connected_to, error);
  1267. }
  1268. }
  1269. if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1270. char *error = NULL;
  1271. if (unlikely(fds[Socket].revents & POLLERR))
  1272. error = "socket reports errors (POLLERR)";
  1273. else if (unlikely(fds[Socket].revents & POLLHUP))
  1274. error = "connection closed by remote end (POLLHUP)";
  1275. else if (unlikely(fds[Socket].revents & POLLNVAL))
  1276. error = "connection is invalid (POLLNVAL)";
  1277. if(unlikely(error)) {
  1278. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
  1279. netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
  1280. rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
  1281. rrdpush_sender_thread_close_socket(s->host);
  1282. }
  1283. }
  1284. // protection from overflow
  1285. if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
  1286. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
  1287. errno = 0;
  1288. netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
  1289. rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
  1290. rrdpush_sender_thread_close_socket(s->host);
  1291. }
  1292. worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
  1293. }
  1294. netdata_thread_cleanup_pop(1);
  1295. return NULL;
  1296. }