sender.c 64 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #define WORKER_SENDER_JOB_CONNECT 0
  4. #define WORKER_SENDER_JOB_PIPE_READ 1
  5. #define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
  6. #define WORKER_SENDER_JOB_EXECUTE 3
  7. #define WORKER_SENDER_JOB_SOCKET_SEND 4
  8. #define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
  9. #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
  10. #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
  11. #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
  12. #define WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR 9
  13. #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
  14. #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
  15. #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
  16. #define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
  17. #define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
  18. #define WORKER_SENDER_JOB_BUFFER_RATIO 15
  19. #define WORKER_SENDER_JOB_BYTES_RECEIVED 16
  20. #define WORKER_SENDER_JOB_BYTES_SENT 17
  21. #define WORKER_SENDER_JOB_REPLAY_REQUEST 18
  22. #define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
  23. #define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
  24. #if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
  25. #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
  26. #endif
  27. extern struct config stream_config;
  28. extern char *netdata_ssl_ca_path;
  29. extern char *netdata_ssl_ca_file;
  30. static __thread BUFFER *sender_thread_buffer = NULL;
  31. static __thread bool sender_thread_buffer_used = false;
  32. static __thread time_t sender_thread_buffer_last_reset_s = 0;
  33. void sender_thread_buffer_free(void) {
  34. buffer_free(sender_thread_buffer);
  35. sender_thread_buffer = NULL;
  36. sender_thread_buffer_used = false;
  37. }
  38. // Collector thread starting a transmission
  39. BUFFER *sender_start(struct sender_state *s) {
  40. if(unlikely(sender_thread_buffer_used))
  41. fatal("STREAMING: thread buffer is used multiple times concurrently.");
  42. if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
  43. if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
  44. buffer_free(sender_thread_buffer);
  45. sender_thread_buffer = NULL;
  46. }
  47. }
  48. if(unlikely(!sender_thread_buffer)) {
  49. sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
  50. sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
  51. }
  52. sender_thread_buffer_used = true;
  53. buffer_flush(sender_thread_buffer);
  54. return sender_thread_buffer;
  55. }
  56. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
  57. #ifdef ENABLE_COMPRESSION
  58. /*
  59. * In case of stream compression buffer overflow
  60. * Inform the user through the error log file and
  61. * deactivate compression by downgrading the stream protocol.
  62. */
  63. static inline void deactivate_compression(struct sender_state *s) {
  64. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
  65. error("STREAM_COMPRESSION: Compression returned error, disabling it.");
  66. s->flags &= ~SENDER_FLAG_COMPRESSION;
  67. error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
  68. rrdpush_sender_thread_close_socket(s->host);
  69. }
  70. #endif
  71. #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
  72. // Collector thread finishing a transmission
  73. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
  74. if(unlikely(wb != sender_thread_buffer))
  75. fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
  76. if(unlikely(!sender_thread_buffer_used))
  77. fatal("STREAMING: sender is committing a buffer twice.");
  78. sender_thread_buffer_used = false;
  79. char *src = (char *)buffer_tostring(wb);
  80. size_t src_len = buffer_strlen(wb);
  81. if(unlikely(!src || !src_len))
  82. return;
  83. netdata_mutex_lock(&s->mutex);
  84. // FILE *fp = fopen("/tmp/stream.txt", "a");
  85. // fprintf(fp,
  86. // "\n--- SEND BEGIN: %s ----\n"
  87. // "%s"
  88. // "--- SEND END ----------------------------------------\n"
  89. // , rrdhost_hostname(s->host), src);
  90. // fclose(fp);
  91. if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
  92. info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
  93. rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
  94. s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
  95. }
  96. #ifdef ENABLE_COMPRESSION
  97. if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
  98. while(src_len) {
  99. size_t size_to_compress = src_len;
  100. if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
  101. if (stream_has_capability(s, STREAM_CAP_BINARY))
  102. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  103. else {
  104. if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
  105. // we need to find the last newline
  106. // so that the decompressor will have a whole line to work with
  107. const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
  108. while (--t >= src)
  109. if (unlikely(*t == '\n'))
  110. break;
  111. if (t <= src) {
  112. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  113. } else
  114. size_to_compress = t - src + 1;
  115. }
  116. }
  117. }
  118. char *dst;
  119. size_t dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
  120. if (!dst_len) {
  121. error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
  122. rrdhost_hostname(s->host), s->connected_to);
  123. s->compressor->reset(s->compressor);
  124. dst_len = s->compressor->compress(s->compressor, src, size_to_compress, &dst);
  125. if(!dst_len) {
  126. error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
  127. rrdhost_hostname(s->host), s->connected_to);
  128. deactivate_compression(s);
  129. netdata_mutex_unlock(&s->mutex);
  130. return;
  131. }
  132. }
  133. if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
  134. s->flags |= SENDER_FLAG_OVERFLOW;
  135. else
  136. s->sent_bytes_on_this_connection_per_type[type] += dst_len;
  137. src = src + size_to_compress;
  138. src_len -= size_to_compress;
  139. }
  140. }
  141. else if(cbuffer_add_unsafe(s->buffer, src, src_len))
  142. s->flags |= SENDER_FLAG_OVERFLOW;
  143. else
  144. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  145. #else
  146. if(cbuffer_add_unsafe(s->buffer, src, src_len))
  147. s->flags |= SENDER_FLAG_OVERFLOW;
  148. else
  149. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  150. #endif
  151. replication_recalculate_buffer_used_ratio_unsafe(s);
  152. bool signal_sender = false;
  153. if(!rrdpush_sender_pipe_has_pending_data(s)) {
  154. rrdpush_sender_pipe_set_pending_data(s);
  155. signal_sender = true;
  156. }
  157. netdata_mutex_unlock(&s->mutex);
  158. if(signal_sender)
  159. rrdpush_signal_sender_to_wake_up(s);
  160. }
  161. static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
  162. buffer_sprintf(
  163. wb
  164. , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
  165. , rrdvar_name(rva)
  166. , rrdvar2number(rva)
  167. );
  168. debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
  169. }
  170. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
  171. if(rrdhost_can_send_definitions_to_parent(host)) {
  172. BUFFER *wb = sender_start(host->sender);
  173. rrdpush_sender_add_host_variable_to_buffer(wb, rva);
  174. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  175. sender_thread_buffer_free();
  176. }
  177. }
  178. struct custom_host_variables_callback {
  179. BUFFER *wb;
  180. };
  181. static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
  182. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  183. struct custom_host_variables_callback *tmp = struct_ptr;
  184. BUFFER *wb = tmp->wb;
  185. if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
  186. rrdpush_sender_add_host_variable_to_buffer(wb, rv);
  187. return 1;
  188. }
  189. return 0;
  190. }
  191. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  192. if(rrdhost_can_send_definitions_to_parent(host)) {
  193. BUFFER *wb = sender_start(host->sender);
  194. struct custom_host_variables_callback tmp = {
  195. .wb = wb
  196. };
  197. int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
  198. (void)ret;
  199. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  200. sender_thread_buffer_free();
  201. debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  202. }
  203. }
  204. // resets all the chart, so that their definitions
  205. // will be resent to the central netdata
  206. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  207. RRDSET *st;
  208. rrdset_foreach_read(st, host) {
  209. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  210. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  211. st->upstream_resync_time_s = 0;
  212. RRDDIM *rd;
  213. rrddim_foreach_read(rd, st)
  214. rd->exposed = 0;
  215. rrddim_foreach_done(rd);
  216. }
  217. rrdset_foreach_done(st);
  218. rrdhost_sender_replicating_charts_zero(host);
  219. }
  220. static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
  221. static __thread time_t last_reset_time_s = 0;
  222. if(!force && now_s - last_reset_time_s < 300)
  223. return;
  224. if(!have_mutex)
  225. netdata_mutex_lock(&s->mutex);
  226. rrdpush_sender_last_buffer_recreate_set(s, now_s);
  227. last_reset_time_s = now_s;
  228. if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
  229. size_t max = s->buffer->max_size;
  230. cbuffer_free(s->buffer);
  231. s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
  232. }
  233. sender_thread_buffer_free();
  234. if(!have_mutex)
  235. netdata_mutex_unlock(&s->mutex);
  236. }
  237. static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
  238. rrdpush_sender_set_flush_time(host->sender);
  239. netdata_mutex_lock(&host->sender->mutex);
  240. // flush the output buffer from any data it may have
  241. cbuffer_flush(host->sender->buffer);
  242. rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
  243. replication_recalculate_buffer_used_ratio_unsafe(host->sender);
  244. netdata_mutex_unlock(&host->sender->mutex);
  245. }
  246. static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
  247. rrdpush_sender_set_flush_time(host->sender);
  248. // stop all replication commands inflight
  249. replication_sender_delete_pending_requests(host->sender);
  250. // reset the state of all charts
  251. rrdpush_sender_thread_reset_all_charts(host);
  252. rrdpush_sender_replicating_charts_zero(host->sender);
  253. }
  254. static void rrdpush_sender_on_connect(RRDHOST *host) {
  255. rrdpush_sender_cbuffer_flush(host);
  256. rrdpush_sender_charts_and_replication_reset(host);
  257. }
  258. static void rrdpush_sender_after_connect(RRDHOST *host) {
  259. rrdpush_sender_thread_send_custom_host_variables(host);
  260. }
  261. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  262. #ifdef ENABLE_HTTPS
  263. netdata_ssl_close(&host->sender->ssl);
  264. #endif
  265. if(host->sender->rrdpush_sender_socket != -1) {
  266. close(host->sender->rrdpush_sender_socket);
  267. host->sender->rrdpush_sender_socket = -1;
  268. }
  269. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  270. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  271. // do not flush the circular buffer here
  272. // this function is called sometimes with the mutex lock, sometimes without the lock
  273. rrdpush_sender_charts_and_replication_reset(host);
  274. }
  275. void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
  276. {
  277. se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
  278. se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
  279. se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
  280. se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz("");
  281. se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
  282. }
  283. void rrdpush_clean_encoded(stream_encoded_t *se)
  284. {
  285. if (se->os_name)
  286. freez(se->os_name);
  287. if (se->os_id)
  288. freez(se->os_id);
  289. if (se->os_version)
  290. freez(se->os_version);
  291. if (se->kernel_name)
  292. freez(se->kernel_name);
  293. if (se->kernel_version)
  294. freez(se->kernel_version);
  295. }
  296. struct {
  297. const char *response;
  298. size_t length;
  299. int32_t version;
  300. bool dynamic;
  301. const char *error;
  302. int worker_job_id;
  303. time_t postpone_reconnect_seconds;
  304. } stream_responses[] = {
  305. {
  306. .response = START_STREAMING_PROMPT_VN,
  307. .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
  308. .version = STREAM_HANDSHAKE_OK_V3, // and above
  309. .dynamic = true, // dynamic = we will parse the version / capabilities
  310. .error = NULL,
  311. .worker_job_id = 0,
  312. .postpone_reconnect_seconds = 0,
  313. },
  314. {
  315. .response = START_STREAMING_PROMPT_V2,
  316. .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
  317. .version = STREAM_HANDSHAKE_OK_V2,
  318. .dynamic = false,
  319. .error = NULL,
  320. .worker_job_id = 0,
  321. .postpone_reconnect_seconds = 0,
  322. },
  323. {
  324. .response = START_STREAMING_PROMPT_V1,
  325. .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
  326. .version = STREAM_HANDSHAKE_OK_V1,
  327. .dynamic = false,
  328. .error = NULL,
  329. .worker_job_id = 0,
  330. .postpone_reconnect_seconds = 0,
  331. },
  332. {
  333. .response = START_STREAMING_ERROR_SAME_LOCALHOST,
  334. .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
  335. .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
  336. .dynamic = false,
  337. .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
  338. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  339. .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
  340. },
  341. {
  342. .response = START_STREAMING_ERROR_ALREADY_STREAMING,
  343. .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
  344. .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
  345. .dynamic = false,
  346. .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
  347. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  348. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  349. },
  350. {
  351. .response = START_STREAMING_ERROR_NOT_PERMITTED,
  352. .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
  353. .version = STREAM_HANDSHAKE_ERROR_DENIED,
  354. .dynamic = false,
  355. .error = "remote server denied access, probably we don't have the right API key?",
  356. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  357. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  358. },
  359. {
  360. .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
  361. .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
  362. .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
  363. .dynamic = false,
  364. .error = "remote server is currently busy, we should try later",
  365. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  366. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  367. },
  368. {
  369. .response = START_STREAMING_ERROR_INTERNAL_ERROR,
  370. .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
  371. .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
  372. .dynamic = false,
  373. .error = "remote server is encountered an internal error, we should try later",
  374. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  375. .postpone_reconnect_seconds = 5 * 60, // 5 minutes
  376. },
  377. {
  378. .response = START_STREAMING_ERROR_INITIALIZATION,
  379. .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
  380. .version = STREAM_HANDSHAKE_INITIALIZATION,
  381. .dynamic = false,
  382. .error = "remote server is initializing, we should try later",
  383. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  384. .postpone_reconnect_seconds = 2 * 60, // 2 minute
  385. },
  386. // terminator
  387. {
  388. .response = NULL,
  389. .length = 0,
  390. .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
  391. .dynamic = false,
  392. .error = "remote node response is not understood, is it Netdata?",
  393. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  394. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  395. }
  396. };
  397. static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
  398. int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
  399. int i;
  400. for(i = 0; stream_responses[i].response ; i++) {
  401. if(stream_responses[i].dynamic &&
  402. http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
  403. strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
  404. version = str2i(&http[stream_responses[i].length]);
  405. break;
  406. }
  407. else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
  408. version = stream_responses[i].version;
  409. break;
  410. }
  411. }
  412. const char *error = stream_responses[i].error;
  413. int worker_job_id = stream_responses[i].worker_job_id;
  414. time_t delay = stream_responses[i].postpone_reconnect_seconds;
  415. if(version >= STREAM_HANDSHAKE_OK_V1) {
  416. host->destination->last_error = NULL;
  417. host->destination->last_handshake = version;
  418. host->destination->postpone_reconnection_until = 0;
  419. s->capabilities = convert_stream_version_to_capabilities(version);
  420. return true;
  421. }
  422. worker_is_busy(worker_job_id);
  423. rrdpush_sender_thread_close_socket(host);
  424. host->destination->last_error = error;
  425. host->destination->last_handshake = version;
  426. host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
  427. char buf[LOG_DATE_LENGTH];
  428. log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
  429. error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
  430. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  431. return false;
  432. }
  433. static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
  434. #ifdef ENABLE_HTTPS
  435. RRDHOST *host = s->host;
  436. bool ssl_required = host->destination && host->destination->ssl;
  437. netdata_ssl_close(&host->sender->ssl);
  438. if(!ssl_required)
  439. return true;
  440. if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
  441. if(!netdata_ssl_connect(&host->sender->ssl)) {
  442. // couldn't connect
  443. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  444. rrdpush_sender_thread_close_socket(host);
  445. host->destination->last_error = "SSL error";
  446. host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
  447. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  448. return false;
  449. }
  450. if (netdata_ssl_validate_certificate_sender &&
  451. security_test_certificate(host->sender->ssl.conn)) {
  452. // certificate is not valid
  453. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  454. error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
  455. rrdpush_sender_thread_close_socket(host);
  456. host->destination->last_error = "invalid SSL certificate";
  457. host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
  458. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  459. return false;
  460. }
  461. return true;
  462. }
  463. error("SSL: failed to establish connection.");
  464. return false;
  465. #else
  466. // SSL is not enabled
  467. return true;
  468. #endif
  469. }
  470. static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
  471. struct timeval tv = {
  472. .tv_sec = timeout,
  473. .tv_usec = 0
  474. };
  475. // make sure the socket is closed
  476. rrdpush_sender_thread_close_socket(host);
  477. s->rrdpush_sender_socket = connect_to_one_of_destinations(
  478. host
  479. , default_port
  480. , &tv
  481. , &s->reconnects_counter
  482. , s->connected_to
  483. , sizeof(s->connected_to)-1
  484. , &host->destination
  485. );
  486. if(unlikely(s->rrdpush_sender_socket == -1)) {
  487. // error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
  488. return false;
  489. }
  490. // info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
  491. // reset our capabilities to default
  492. s->capabilities = stream_our_capabilities();
  493. #ifdef ENABLE_COMPRESSION
  494. // If we don't want compression, remove it from our capabilities
  495. if(!(s->flags & SENDER_FLAG_COMPRESSION))
  496. s->capabilities &= ~STREAM_CAP_COMPRESSION;
  497. #endif // ENABLE_COMPRESSION
  498. /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
  499. version negotiation resulted in a high enough version.
  500. */
  501. stream_encoded_t se;
  502. rrdpush_encode_variable(&se, host);
  503. host->sender->hops = host->system_info->hops + 1;
  504. char http[HTTP_HEADER_SIZE + 1];
  505. int eol = snprintfz(http, HTTP_HEADER_SIZE,
  506. "STREAM "
  507. "key=%s"
  508. "&hostname=%s"
  509. "&registry_hostname=%s"
  510. "&machine_guid=%s"
  511. "&update_every=%d"
  512. "&os=%s"
  513. "&timezone=%s"
  514. "&abbrev_timezone=%s"
  515. "&utc_offset=%d"
  516. "&hops=%d"
  517. "&ml_capable=%d"
  518. "&ml_enabled=%d"
  519. "&mc_version=%d"
  520. "&tags=%s"
  521. "&ver=%u"
  522. "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
  523. "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
  524. "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
  525. "&NETDATA_SYSTEM_OS_NAME=%s"
  526. "&NETDATA_SYSTEM_OS_ID=%s"
  527. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  528. "&NETDATA_SYSTEM_OS_VERSION=%s"
  529. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  530. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  531. "&NETDATA_HOST_IS_K8S_NODE=%s"
  532. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  533. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  534. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  535. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  536. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  537. "&NETDATA_SYSTEM_CONTAINER=%s"
  538. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  539. "&NETDATA_CONTAINER_OS_NAME=%s"
  540. "&NETDATA_CONTAINER_OS_ID=%s"
  541. "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
  542. "&NETDATA_CONTAINER_OS_VERSION=%s"
  543. "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
  544. "&NETDATA_CONTAINER_OS_DETECTION=%s"
  545. "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
  546. "&NETDATA_SYSTEM_CPU_FREQ=%s"
  547. "&NETDATA_SYSTEM_TOTAL_RAM=%s"
  548. "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
  549. "&NETDATA_PROTOCOL_VERSION=%s"
  550. " HTTP/1.1\r\n"
  551. "User-Agent: %s/%s\r\n"
  552. "Accept: */*\r\n\r\n"
  553. , host->rrdpush_send_api_key
  554. , rrdhost_hostname(host)
  555. , rrdhost_registry_hostname(host)
  556. , host->machine_guid
  557. , default_rrd_update_every
  558. , rrdhost_os(host)
  559. , rrdhost_timezone(host)
  560. , rrdhost_abbrev_timezone(host)
  561. , host->utc_offset
  562. , host->sender->hops
  563. , host->system_info->ml_capable
  564. , host->system_info->ml_enabled
  565. , host->system_info->mc_version
  566. , rrdhost_tags(host)
  567. , s->capabilities
  568. , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
  569. , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
  570. , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
  571. , se.os_name
  572. , se.os_id
  573. , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
  574. , se.os_version
  575. , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
  576. , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
  577. , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
  578. , se.kernel_name
  579. , se.kernel_version
  580. , (host->system_info->architecture) ? host->system_info->architecture : ""
  581. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  582. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  583. , (host->system_info->container) ? host->system_info->container : ""
  584. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  585. , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
  586. , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
  587. , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
  588. , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
  589. , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
  590. , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
  591. , (host->system_info->host_cores) ? host->system_info->host_cores : ""
  592. , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
  593. , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
  594. , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
  595. , STREAMING_PROTOCOL_VERSION
  596. , rrdhost_program_name(host)
  597. , rrdhost_program_version(host)
  598. );
  599. http[eol] = 0x00;
  600. rrdpush_clean_encoded(&se);
  601. if(!rrdpush_sender_connect_ssl(s))
  602. return false;
  603. ssize_t bytes, len = strlen(http);
  604. bytes = send_timeout(
  605. #ifdef ENABLE_HTTPS
  606. &host->sender->ssl,
  607. #endif
  608. s->rrdpush_sender_socket,
  609. http,
  610. len,
  611. 0,
  612. timeout);
  613. if(bytes <= 0) { // timeout is 0
  614. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  615. rrdpush_sender_thread_close_socket(host);
  616. error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
  617. host->destination->last_error = "timeout while sending request";
  618. host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
  619. host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
  620. return false;
  621. }
  622. bytes = recv_timeout(
  623. #ifdef ENABLE_HTTPS
  624. &host->sender->ssl,
  625. #endif
  626. s->rrdpush_sender_socket,
  627. http,
  628. HTTP_HEADER_SIZE,
  629. 0,
  630. timeout);
  631. if(bytes <= 0) { // timeout is 0
  632. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  633. rrdpush_sender_thread_close_socket(host);
  634. error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
  635. host->destination->last_error = "timeout while expecting first response";
  636. host->destination->last_handshake = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
  637. host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
  638. return false;
  639. }
  640. if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
  641. error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
  642. if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
  643. error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
  644. http[bytes] = '\0';
  645. debug(D_STREAM, "Response to sender from far end: %s", http);
  646. if(!rrdpush_sender_validate_response(host, s, http, bytes))
  647. return false;
  648. #ifdef ENABLE_COMPRESSION
  649. if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
  650. if(!s->compressor)
  651. s->compressor = create_compressor();
  652. else
  653. s->compressor->reset(s->compressor);
  654. }
  655. #endif //ENABLE_COMPRESSION
  656. log_sender_capabilities(s);
  657. debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
  658. return true;
  659. }
  660. static bool attempt_to_connect(struct sender_state *state)
  661. {
  662. state->send_attempts = 0;
  663. // reset the bytes we have sent for this session
  664. state->sent_bytes_on_this_connection = 0;
  665. memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
  666. if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
  667. // reset the buffer, to properly send charts and metrics
  668. rrdpush_sender_on_connect(state->host);
  669. // send from the beginning
  670. state->begin = 0;
  671. // make sure the next reconnection will be immediate
  672. state->not_connected_loops = 0;
  673. // let the data collection threads know we are ready
  674. rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  675. rrdpush_sender_after_connect(state->host);
  676. return true;
  677. }
  678. // we couldn't connect
  679. // increase the failed connections counter
  680. state->not_connected_loops++;
  681. // slow re-connection on repeating errors
  682. usec_t now_ut = now_monotonic_usec();
  683. usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
  684. while(now_ut < end_ut) {
  685. netdata_thread_testcancel();
  686. sleep_usec(500 * USEC_PER_MS); // seconds
  687. now_ut = now_monotonic_usec();
  688. }
  689. return false;
  690. }
  691. // TCP window is open and we have data to transmit.
  692. static ssize_t attempt_to_send(struct sender_state *s) {
  693. ssize_t ret = 0;
  694. #ifdef NETDATA_INTERNAL_CHECKS
  695. struct circular_buffer *cb = s->buffer;
  696. #endif
  697. netdata_mutex_lock(&s->mutex);
  698. char *chunk;
  699. size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
  700. debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
  701. #ifdef ENABLE_HTTPS
  702. if(SSL_connection(&s->ssl))
  703. ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
  704. else
  705. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  706. #else
  707. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  708. #endif
  709. if (likely(ret > 0)) {
  710. cbuffer_remove_unsafe(s->buffer, ret);
  711. s->sent_bytes_on_this_connection += ret;
  712. s->sent_bytes += ret;
  713. debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
  714. }
  715. else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
  716. debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
  717. else if (ret == -1) {
  718. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
  719. debug(D_STREAM, "STREAM: Send failed - closing socket...");
  720. error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
  721. rrdpush_sender_thread_close_socket(s->host);
  722. }
  723. else
  724. debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
  725. replication_recalculate_buffer_used_ratio_unsafe(s);
  726. netdata_mutex_unlock(&s->mutex);
  727. return ret;
  728. }
  729. static ssize_t attempt_read(struct sender_state *s) {
  730. ssize_t ret;
  731. #ifdef ENABLE_HTTPS
  732. if (SSL_connection(&s->ssl))
  733. ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
  734. else
  735. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  736. #else
  737. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  738. #endif
  739. if (ret > 0) {
  740. s->read_len += ret;
  741. return ret;
  742. }
  743. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
  744. return ret;
  745. #ifdef ENABLE_HTTPS
  746. if (SSL_connection(&s->ssl))
  747. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  748. else
  749. #endif
  750. if (ret == 0 || errno == ECONNRESET) {
  751. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
  752. error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
  753. }
  754. else {
  755. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
  756. error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
  757. }
  758. rrdpush_sender_thread_close_socket(s->host);
  759. return ret;
  760. }
  761. struct inflight_stream_function {
  762. struct sender_state *sender;
  763. STRING *transaction;
  764. usec_t received_ut;
  765. };
  766. void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
  767. struct inflight_stream_function *tmp = data;
  768. struct sender_state *s = tmp->sender;
  769. if(rrdhost_can_send_definitions_to_parent(s->host)) {
  770. BUFFER *wb = sender_start(s);
  771. pluginsd_function_result_begin_to_buffer(wb
  772. , string2str(tmp->transaction)
  773. , code
  774. , functions_content_type_to_format(func_wb->content_type)
  775. , func_wb->expires);
  776. buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
  777. pluginsd_function_result_end_to_buffer(wb);
  778. sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
  779. sender_thread_buffer_free();
  780. internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
  781. rrdhost_hostname(s->host), s->connected_to,
  782. string2str(tmp->transaction),
  783. buffer_strlen(func_wb),
  784. now_realtime_usec() - tmp->received_ut);
  785. }
  786. string_freez(tmp->transaction);
  787. buffer_free(func_wb);
  788. freez(tmp);
  789. }
  790. // This is just a placeholder until the gap filling state machine is inserted
  791. void execute_commands(struct sender_state *s) {
  792. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  793. char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
  794. *end = 0;
  795. while( start < end && (newline = strchr(start, '\n')) ) {
  796. *newline = '\0';
  797. log_access("STREAM: %d from '%s' for host '%s': %s",
  798. gettid(), s->connected_to, rrdhost_hostname(s->host), start);
  799. // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
  800. char *words[PLUGINSD_MAX_WORDS] = { NULL };
  801. size_t num_words = pluginsd_split_words(start, words, PLUGINSD_MAX_WORDS);
  802. const char *keyword = get_word(words, num_words, 0);
  803. if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
  804. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  805. char *transaction = get_word(words, num_words, 1);
  806. char *timeout_s = get_word(words, num_words, 2);
  807. char *function = get_word(words, num_words, 3);
  808. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  809. error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  810. rrdhost_hostname(s->host), s->connected_to,
  811. keyword,
  812. transaction?transaction:"(unset)",
  813. timeout_s?timeout_s:"(unset)",
  814. function?function:"(unset)");
  815. }
  816. else {
  817. int timeout = str2i(timeout_s);
  818. if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  819. struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
  820. tmp->received_ut = now_realtime_usec();
  821. tmp->sender = s;
  822. tmp->transaction = string_strdupz(transaction);
  823. BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
  824. int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
  825. if(code != HTTP_RESP_OK) {
  826. rrd_call_function_error(wb, "Failed to route request to collector", code);
  827. stream_execute_function_callback(wb, code, tmp);
  828. }
  829. }
  830. }
  831. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
  832. worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
  833. const char *chart_id = get_word(words, num_words, 1);
  834. const char *start_streaming = get_word(words, num_words, 2);
  835. const char *after = get_word(words, num_words, 3);
  836. const char *before = get_word(words, num_words, 4);
  837. if (!chart_id || !start_streaming || !after || !before) {
  838. error("STREAM %s [send to %s] %s command is incomplete"
  839. " (chart=%s, start_streaming=%s, after=%s, before=%s)",
  840. rrdhost_hostname(s->host), s->connected_to,
  841. keyword,
  842. chart_id ? chart_id : "(unset)",
  843. start_streaming ? start_streaming : "(unset)",
  844. after ? after : "(unset)",
  845. before ? before : "(unset)");
  846. }
  847. else {
  848. replication_add_request(s, chart_id,
  849. strtoll(after, NULL, 0),
  850. strtoll(before, NULL, 0),
  851. !strcmp(start_streaming, "true")
  852. );
  853. }
  854. }
  855. else {
  856. error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
  857. }
  858. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  859. start = newline + 1;
  860. }
  861. if (start < end) {
  862. memmove(s->read_buffer, start, end-start);
  863. s->read_len = end - start;
  864. }
  865. else {
  866. s->read_buffer[0] = '\0';
  867. s->read_len = 0;
  868. }
  869. }
  870. struct rrdpush_sender_thread_data {
  871. RRDHOST *host;
  872. char *pipe_buffer;
  873. };
  874. static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
  875. static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
  876. bool ret = true;
  877. netdata_mutex_lock(&mutex);
  878. int new_pipe_fds[2];
  879. if(reopen) {
  880. if(pipe(new_pipe_fds) != 0) {
  881. error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
  882. new_pipe_fds[PIPE_READ] = -1;
  883. new_pipe_fds[PIPE_WRITE] = -1;
  884. ret = false;
  885. }
  886. }
  887. int old_pipe_fds[2];
  888. old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
  889. old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
  890. if(reopen) {
  891. pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
  892. pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
  893. }
  894. else {
  895. pipe_fds[PIPE_READ] = -1;
  896. pipe_fds[PIPE_WRITE] = -1;
  897. }
  898. if(old_pipe_fds[PIPE_READ] > 2)
  899. close(old_pipe_fds[PIPE_READ]);
  900. if(old_pipe_fds[PIPE_WRITE] > 2)
  901. close(old_pipe_fds[PIPE_WRITE]);
  902. netdata_mutex_unlock(&mutex);
  903. return ret;
  904. }
  905. void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
  906. if(unlikely(s->tid == gettid()))
  907. return;
  908. RRDHOST *host = s->host;
  909. int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
  910. // signal the sender there are more data
  911. if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
  912. error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
  913. rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
  914. }
  915. }
  916. static NETDATA_DOUBLE rrdhost_sender_replication_completion(RRDHOST *host, time_t now, size_t *instances) {
  917. size_t charts = rrdhost_sender_replicating_charts(host);
  918. NETDATA_DOUBLE completion;
  919. if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t)
  920. completion = 100.0;
  921. else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t)
  922. completion = 0.0;
  923. else {
  924. time_t total = now - host->sender->replication.oldest_request_after_t;
  925. time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t;
  926. completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total;
  927. }
  928. *instances = charts;
  929. return completion;
  930. }
  931. void rrdhost_sender_to_json(BUFFER *wb, RRDHOST *host, const char *key, time_t now __maybe_unused) {
  932. bool online = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  933. buffer_json_member_add_object(wb, key);
  934. if(host->sender)
  935. buffer_json_member_add_uint64(wb, "hops", host->sender->hops);
  936. buffer_json_member_add_boolean(wb, "online", online);
  937. if(host->sender && host->sender->last_state_since_t) {
  938. buffer_json_member_add_time_t(wb, "since", host->sender->last_state_since_t);
  939. buffer_json_member_add_time_t(wb, "age", now - host->sender->last_state_since_t);
  940. }
  941. if(!online && host->sender && host->sender->exit.reason)
  942. buffer_json_member_add_string(wb, "reason", host->sender->exit.reason);
  943. buffer_json_member_add_object(wb, "replication");
  944. {
  945. size_t instances;
  946. NETDATA_DOUBLE completion = rrdhost_sender_replication_completion(host, now, &instances);
  947. buffer_json_member_add_boolean(wb, "in_progress", instances);
  948. buffer_json_member_add_double(wb, "completion", completion);
  949. buffer_json_member_add_uint64(wb, "instances", instances);
  950. }
  951. buffer_json_object_close(wb);
  952. if(host->sender) {
  953. netdata_mutex_lock(&host->sender->mutex);
  954. buffer_json_member_add_object(wb, "destination");
  955. {
  956. char buf[1024 + 1];
  957. if(online && host->sender->rrdpush_sender_socket != -1) {
  958. SOCKET_PEERS peers = socket_peers(host->sender->rrdpush_sender_socket);
  959. bool ssl = SSL_connection(&host->sender->ssl);
  960. snprintfz(buf, 1024, "[%s]:%d%s", peers.local.ip, peers.local.port, ssl ? ":SSL" : "");
  961. buffer_json_member_add_string(wb, "local", buf);
  962. snprintfz(buf, 1024, "[%s]:%d%s", peers.peer.ip, peers.peer.port, ssl ? ":SSL" : "");
  963. buffer_json_member_add_string(wb, "remote", buf);
  964. stream_capabilities_to_json_array(wb, host->sender->capabilities, "capabilities");
  965. buffer_json_member_add_object(wb, "traffic");
  966. {
  967. bool compression = false;
  968. #ifdef ENABLE_COMPRESSION
  969. compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor);
  970. #endif
  971. buffer_json_member_add_boolean(wb, "compression", compression);
  972. buffer_json_member_add_uint64(wb, "data", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_DATA]);
  973. buffer_json_member_add_uint64(wb, "metadata", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_METADATA]);
  974. buffer_json_member_add_uint64(wb, "functions", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_FUNCTIONS]);
  975. buffer_json_member_add_uint64(wb, "replication", host->sender->sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_REPLICATION]);
  976. }
  977. buffer_json_object_close(wb); // traffic
  978. }
  979. buffer_json_member_add_array(wb, "candidates");
  980. struct rrdpush_destinations *d;
  981. for (d = host->destinations; d; d = d->next) {
  982. buffer_json_add_array_item_object(wb);
  983. {
  984. if (d->ssl) {
  985. snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
  986. buffer_json_member_add_string(wb, "destination", buf);
  987. }
  988. else
  989. buffer_json_member_add_string(wb, "destination", string2str(d->destination));
  990. buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
  991. buffer_json_member_add_time_t(wb, "age", now - d->last_attempt);
  992. buffer_json_member_add_string(wb, "last_error", d->last_error);
  993. buffer_json_member_add_string(wb, "last_handshake",
  994. stream_handshake_error_to_string(d->last_handshake));
  995. buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
  996. buffer_json_member_add_time_t(wb, "next_in",
  997. (d->postpone_reconnection_until > now) ?
  998. d->postpone_reconnection_until - now : 0);
  999. }
  1000. buffer_json_object_close(wb); // each candidate
  1001. }
  1002. buffer_json_array_close(wb); // candidates
  1003. }
  1004. buffer_json_object_close(wb); // destination
  1005. netdata_mutex_unlock(&host->sender->mutex);
  1006. }
  1007. buffer_json_object_close(wb); // streaming
  1008. }
  1009. static bool rrdhost_set_sender(RRDHOST *host) {
  1010. if(unlikely(!host->sender)) return false;
  1011. bool ret = false;
  1012. netdata_mutex_lock(&host->sender->mutex);
  1013. if(!host->sender->tid) {
  1014. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1015. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  1016. host->sender->tid = gettid();
  1017. host->sender->last_state_since_t = now_realtime_sec();
  1018. host->sender->exit.reason = NULL;
  1019. ret = true;
  1020. }
  1021. netdata_mutex_unlock(&host->sender->mutex);
  1022. rrdpush_reset_destinations_postpone_time(host);
  1023. return ret;
  1024. }
  1025. static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
  1026. if(unlikely(!host->sender)) return;
  1027. if(host->sender->tid == gettid()) {
  1028. host->sender->tid = 0;
  1029. host->sender->exit.shutdown = false;
  1030. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1031. host->sender->last_state_since_t = now_realtime_sec();
  1032. }
  1033. rrdpush_reset_destinations_postpone_time(host);
  1034. }
  1035. static bool rrdhost_sender_should_exit(struct sender_state *s) {
  1036. // check for outstanding cancellation requests
  1037. netdata_thread_testcancel();
  1038. if(unlikely(!service_running(SERVICE_STREAMING))) {
  1039. if(!s->exit.reason)
  1040. s->exit.reason = "NETDATA EXIT";
  1041. return true;
  1042. }
  1043. if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
  1044. if(!s->exit.reason)
  1045. s->exit.reason = "NON STREAMABLE HOST";
  1046. return true;
  1047. }
  1048. if(unlikely(s->exit.shutdown)) {
  1049. if(!s->exit.reason)
  1050. s->exit.reason = "SENDER SHUTDOWN REQUESTED";
  1051. return true;
  1052. }
  1053. if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
  1054. if(!s->exit.reason)
  1055. s->exit.reason = "RECEIVER LEFT (ORPHAN HOST)";
  1056. return true;
  1057. }
  1058. return false;
  1059. }
  1060. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  1061. struct rrdpush_sender_thread_data *s = ptr;
  1062. worker_unregister();
  1063. RRDHOST *host = s->host;
  1064. netdata_mutex_lock(&host->sender->mutex);
  1065. info("STREAM %s [send]: sending thread exits %s",
  1066. rrdhost_hostname(host),
  1067. host->sender->exit.reason ? host->sender->exit.reason : "");
  1068. rrdpush_sender_thread_close_socket(host);
  1069. rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
  1070. rrdhost_clear_sender___while_having_sender_mutex(host);
  1071. netdata_mutex_unlock(&host->sender->mutex);
  1072. freez(s->pipe_buffer);
  1073. freez(s);
  1074. }
  1075. void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
  1076. #ifdef ENABLE_HTTPS
  1077. static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
  1078. netdata_spinlock_lock(&sp);
  1079. if(netdata_ssl_streaming_sender_ctx || !host) {
  1080. netdata_spinlock_unlock(&sp);
  1081. return;
  1082. }
  1083. for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
  1084. if (d->ssl) {
  1085. // we need to initialize SSL
  1086. netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
  1087. ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
  1088. // stop the loop
  1089. break;
  1090. }
  1091. }
  1092. netdata_spinlock_unlock(&sp);
  1093. #endif
  1094. }
  1095. void *rrdpush_sender_thread(void *ptr) {
  1096. worker_register("STREAMSND");
  1097. worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
  1098. worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
  1099. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
  1100. worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
  1101. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
  1102. // disconnection reasons
  1103. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
  1104. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
  1105. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR, "disconnect socket error");
  1106. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
  1107. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
  1108. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
  1109. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
  1110. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
  1111. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
  1112. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
  1113. worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
  1114. worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
  1115. worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
  1116. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
  1117. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
  1118. worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
  1119. struct sender_state *s = ptr;
  1120. if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
  1121. !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
  1122. !*s->host->rrdpush_send_api_key) {
  1123. error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
  1124. rrdhost_hostname(s->host), gettid());
  1125. return NULL;
  1126. }
  1127. if(!rrdhost_set_sender(s->host)) {
  1128. error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
  1129. rrdhost_hostname(s->host), gettid());
  1130. return NULL;
  1131. }
  1132. rrdpush_initialize_ssl_ctx(s->host);
  1133. info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
  1134. s->timeout = (int)appconfig_get_number(
  1135. &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
  1136. s->default_port = (int)appconfig_get_number(
  1137. &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  1138. s->buffer->max_size = (size_t)appconfig_get_number(
  1139. &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
  1140. s->reconnect_delay = (unsigned int)appconfig_get_number(
  1141. &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  1142. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
  1143. &stream_config, CONFIG_SECTION_STREAM,
  1144. "initial clock resync iterations",
  1145. remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
  1146. // initialize rrdpush globals
  1147. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1148. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  1149. int pipe_buffer_size = 10 * 1024;
  1150. #ifdef F_GETPIPE_SZ
  1151. pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
  1152. #endif
  1153. if(pipe_buffer_size < 10 * 1024)
  1154. pipe_buffer_size = 10 * 1024;
  1155. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1156. error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1157. rrdhost_hostname(s->host));
  1158. return NULL;
  1159. }
  1160. struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
  1161. thread_data->pipe_buffer = mallocz(pipe_buffer_size);
  1162. thread_data->host = s->host;
  1163. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
  1164. size_t iterations = 0;
  1165. time_t now_s = now_monotonic_sec();
  1166. while(!rrdhost_sender_should_exit(s)) {
  1167. iterations++;
  1168. // The connection attempt blocks (after which we use the socket in nonblocking)
  1169. if(unlikely(s->rrdpush_sender_socket == -1)) {
  1170. worker_is_busy(WORKER_SENDER_JOB_CONNECT);
  1171. now_s = now_monotonic_sec();
  1172. rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
  1173. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1174. s->flags &= ~SENDER_FLAG_OVERFLOW;
  1175. s->read_len = 0;
  1176. s->buffer->read = 0;
  1177. s->buffer->write = 0;
  1178. if(!attempt_to_connect(s))
  1179. continue;
  1180. if(rrdhost_sender_should_exit(s))
  1181. break;
  1182. now_s = s->last_traffic_seen_t = now_monotonic_sec();
  1183. rrdpush_claimed_id(s->host);
  1184. rrdpush_send_host_labels(s->host);
  1185. s->replication.oldest_request_after_t = 0;
  1186. rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1187. info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
  1188. continue;
  1189. }
  1190. if(iterations % 1000 == 0)
  1191. now_s = now_monotonic_sec();
  1192. // If the TCP window never opened then something is wrong, restart connection
  1193. if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
  1194. !rrdpush_sender_pending_replication_requests(s) &&
  1195. !rrdpush_sender_replicating_charts(s)
  1196. )) {
  1197. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  1198. error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
  1199. rrdpush_sender_thread_close_socket(s->host);
  1200. continue;
  1201. }
  1202. netdata_mutex_lock(&s->mutex);
  1203. size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
  1204. size_t available = cbuffer_available_size_unsafe(s->buffer);
  1205. if (unlikely(!outstanding)) {
  1206. rrdpush_sender_pipe_clear_pending_data(s);
  1207. rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
  1208. }
  1209. netdata_mutex_unlock(&s->mutex);
  1210. worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
  1211. if(outstanding)
  1212. s->send_attempts++;
  1213. if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
  1214. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1215. error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1216. rrdhost_hostname(s->host));
  1217. rrdpush_sender_thread_close_socket(s->host);
  1218. break;
  1219. }
  1220. }
  1221. worker_is_idle();
  1222. // Wait until buffer opens in the socket or a rrdset_done_push wakes us
  1223. enum {
  1224. Collector = 0,
  1225. Socket = 1,
  1226. };
  1227. struct pollfd fds[2] = {
  1228. [Collector] = {
  1229. .fd = s->rrdpush_sender_pipe[PIPE_READ],
  1230. .events = POLLIN,
  1231. .revents = 0,
  1232. },
  1233. [Socket] = {
  1234. .fd = s->rrdpush_sender_socket,
  1235. .events = POLLIN | (outstanding ? POLLOUT : 0 ),
  1236. .revents = 0,
  1237. }
  1238. };
  1239. int poll_rc = poll(fds, 2, 1000);
  1240. debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
  1241. fds[Collector].revents, fds[Socket].revents, outstanding);
  1242. if(unlikely(rrdhost_sender_should_exit(s)))
  1243. break;
  1244. internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
  1245. "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1246. internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
  1247. "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1248. // Spurious wake-ups without error - loop again
  1249. if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
  1250. netdata_thread_testcancel();
  1251. debug(D_STREAM, "Spurious wakeup");
  1252. now_s = now_monotonic_sec();
  1253. continue;
  1254. }
  1255. // Only errors from poll() are internal, but try restarting the connection
  1256. if(unlikely(poll_rc == -1)) {
  1257. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
  1258. error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
  1259. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1260. rrdpush_sender_thread_close_socket(s->host);
  1261. continue;
  1262. }
  1263. // If we have data and have seen the TCP window open then try to close it by a transmission.
  1264. if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
  1265. worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
  1266. ssize_t bytes = attempt_to_send(s);
  1267. if(bytes > 0) {
  1268. s->last_traffic_seen_t = now_monotonic_sec();
  1269. worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
  1270. }
  1271. }
  1272. // If the collector woke us up then empty the pipe to remove the signal
  1273. if (fds[Collector].revents & (POLLIN|POLLPRI)) {
  1274. worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
  1275. debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
  1276. if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
  1277. error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
  1278. }
  1279. // Read as much as possible to fill the buffer, split into full lines for execution.
  1280. if (fds[Socket].revents & POLLIN) {
  1281. worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
  1282. ssize_t bytes = attempt_read(s);
  1283. if(bytes > 0) {
  1284. s->last_traffic_seen_t = now_monotonic_sec();
  1285. worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
  1286. }
  1287. }
  1288. if(unlikely(s->read_len))
  1289. execute_commands(s);
  1290. if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1291. char *error = NULL;
  1292. if (unlikely(fds[Collector].revents & POLLERR))
  1293. error = "pipe reports errors (POLLERR)";
  1294. else if (unlikely(fds[Collector].revents & POLLHUP))
  1295. error = "pipe closed (POLLHUP)";
  1296. else if (unlikely(fds[Collector].revents & POLLNVAL))
  1297. error = "pipe is invalid (POLLNVAL)";
  1298. if(error) {
  1299. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1300. error("STREAM %s [send to %s]: restarting internal pipe: %s.",
  1301. rrdhost_hostname(s->host), s->connected_to, error);
  1302. }
  1303. }
  1304. if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1305. char *error = NULL;
  1306. if (unlikely(fds[Socket].revents & POLLERR))
  1307. error = "socket reports errors (POLLERR)";
  1308. else if (unlikely(fds[Socket].revents & POLLHUP))
  1309. error = "connection closed by remote end (POLLHUP)";
  1310. else if (unlikely(fds[Socket].revents & POLLNVAL))
  1311. error = "connection is invalid (POLLNVAL)";
  1312. if(unlikely(error)) {
  1313. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKER_ERROR);
  1314. error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
  1315. rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
  1316. rrdpush_sender_thread_close_socket(s->host);
  1317. }
  1318. }
  1319. // protection from overflow
  1320. if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
  1321. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
  1322. errno = 0;
  1323. error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
  1324. rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
  1325. rrdpush_sender_thread_close_socket(s->host);
  1326. }
  1327. worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
  1328. }
  1329. netdata_thread_cleanup_pop(1);
  1330. return NULL;
  1331. }