sender.c 62 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #define WORKER_SENDER_JOB_CONNECT 0
  4. #define WORKER_SENDER_JOB_PIPE_READ 1
  5. #define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
  6. #define WORKER_SENDER_JOB_EXECUTE 3
  7. #define WORKER_SENDER_JOB_SOCKET_SEND 4
  8. #define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
  9. #define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
  10. #define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
  11. #define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
  12. #define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
  13. #define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
  14. #define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
  15. #define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
  16. #define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
  17. #define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
  18. #define WORKER_SENDER_JOB_BUFFER_RATIO 15
  19. #define WORKER_SENDER_JOB_BYTES_RECEIVED 16
  20. #define WORKER_SENDER_JOB_BYTES_SENT 17
  21. #define WORKER_SENDER_JOB_REPLAY_REQUEST 18
  22. #define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
  23. #define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
  24. #if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
  25. #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
  26. #endif
  27. extern struct config stream_config;
  28. extern char *netdata_ssl_ca_path;
  29. extern char *netdata_ssl_ca_file;
  30. static __thread BUFFER *sender_thread_buffer = NULL;
  31. static __thread bool sender_thread_buffer_used = false;
  32. static __thread time_t sender_thread_buffer_last_reset_s = 0;
  33. void sender_thread_buffer_free(void) {
  34. buffer_free(sender_thread_buffer);
  35. sender_thread_buffer = NULL;
  36. sender_thread_buffer_used = false;
  37. }
  38. // Collector thread starting a transmission
  39. BUFFER *sender_start(struct sender_state *s) {
  40. if(unlikely(sender_thread_buffer_used))
  41. fatal("STREAMING: thread buffer is used multiple times concurrently.");
  42. if(unlikely(rrdpush_sender_last_buffer_recreate_get(s) > sender_thread_buffer_last_reset_s)) {
  43. if(unlikely(sender_thread_buffer && sender_thread_buffer->size > THREAD_BUFFER_INITIAL_SIZE)) {
  44. buffer_free(sender_thread_buffer);
  45. sender_thread_buffer = NULL;
  46. }
  47. }
  48. if(unlikely(!sender_thread_buffer)) {
  49. sender_thread_buffer = buffer_create(THREAD_BUFFER_INITIAL_SIZE, &netdata_buffers_statistics.buffers_streaming);
  50. sender_thread_buffer_last_reset_s = rrdpush_sender_last_buffer_recreate_get(s);
  51. }
  52. sender_thread_buffer_used = true;
  53. buffer_flush(sender_thread_buffer);
  54. return sender_thread_buffer;
  55. }
  56. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
  57. #ifdef ENABLE_RRDPUSH_COMPRESSION
  58. /*
  59. * In case of stream compression buffer overflow
  60. * Inform the user through the error log file and
  61. * deactivate compression by downgrading the stream protocol.
  62. */
  63. static inline void deactivate_compression(struct sender_state *s) {
  64. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
  65. netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
  66. s->flags &= ~SENDER_FLAG_COMPRESSION;
  67. netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
  68. rrdpush_sender_thread_close_socket(s->host);
  69. }
  70. #endif
  71. #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
  72. // Collector thread finishing a transmission
  73. void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) {
  74. if(unlikely(wb != sender_thread_buffer))
  75. fatal("STREAMING: sender is trying to commit a buffer that is not this thread's buffer.");
  76. if(unlikely(!sender_thread_buffer_used))
  77. fatal("STREAMING: sender is committing a buffer twice.");
  78. sender_thread_buffer_used = false;
  79. char *src = (char *)buffer_tostring(wb);
  80. size_t src_len = buffer_strlen(wb);
  81. if(unlikely(!src || !src_len))
  82. return;
  83. sender_lock(s);
  84. // FILE *fp = fopen("/tmp/stream.txt", "a");
  85. // fprintf(fp,
  86. // "\n--- SEND BEGIN: %s ----\n"
  87. // "%s"
  88. // "--- SEND END ----------------------------------------\n"
  89. // , rrdhost_hostname(s->host), src);
  90. // fclose(fp);
  91. if(unlikely(s->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
  92. netdata_log_info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
  93. rrdhost_hostname(s->host), s->connected_to, s->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
  94. s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
  95. }
  96. #ifdef ENABLE_RRDPUSH_COMPRESSION
  97. if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
  98. while(src_len) {
  99. size_t size_to_compress = src_len;
  100. if(unlikely(size_to_compress > COMPRESSION_MAX_MSG_SIZE)) {
  101. if (stream_has_capability(s, STREAM_CAP_BINARY))
  102. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  103. else {
  104. if (size_to_compress > COMPRESSION_MAX_MSG_SIZE) {
  105. // we need to find the last newline
  106. // so that the decompressor will have a whole line to work with
  107. const char *t = &src[COMPRESSION_MAX_MSG_SIZE];
  108. while (--t >= src)
  109. if (unlikely(*t == '\n'))
  110. break;
  111. if (t <= src) {
  112. size_to_compress = COMPRESSION_MAX_MSG_SIZE;
  113. } else
  114. size_to_compress = t - src + 1;
  115. }
  116. }
  117. }
  118. char *dst;
  119. size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  120. if (!dst_len) {
  121. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
  122. rrdhost_hostname(s->host), s->connected_to);
  123. rrdpush_compressor_reset(&s->compressor);
  124. dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
  125. if(!dst_len) {
  126. netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
  127. rrdhost_hostname(s->host), s->connected_to);
  128. deactivate_compression(s);
  129. sender_unlock(s);
  130. return;
  131. }
  132. }
  133. if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
  134. s->flags |= SENDER_FLAG_OVERFLOW;
  135. else
  136. s->sent_bytes_on_this_connection_per_type[type] += dst_len;
  137. src = src + size_to_compress;
  138. src_len -= size_to_compress;
  139. }
  140. }
  141. else if(cbuffer_add_unsafe(s->buffer, src, src_len))
  142. s->flags |= SENDER_FLAG_OVERFLOW;
  143. else
  144. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  145. #else
  146. if(cbuffer_add_unsafe(s->buffer, src, src_len))
  147. s->flags |= SENDER_FLAG_OVERFLOW;
  148. else
  149. s->sent_bytes_on_this_connection_per_type[type] += src_len;
  150. #endif
  151. replication_recalculate_buffer_used_ratio_unsafe(s);
  152. bool signal_sender = false;
  153. if(!rrdpush_sender_pipe_has_pending_data(s)) {
  154. rrdpush_sender_pipe_set_pending_data(s);
  155. signal_sender = true;
  156. }
  157. sender_unlock(s);
  158. if(signal_sender)
  159. rrdpush_signal_sender_to_wake_up(s);
  160. }
  161. static inline void rrdpush_sender_add_host_variable_to_buffer(BUFFER *wb, const RRDVAR_ACQUIRED *rva) {
  162. buffer_sprintf(
  163. wb
  164. , "VARIABLE HOST %s = " NETDATA_DOUBLE_FORMAT "\n"
  165. , rrdvar_name(rva)
  166. , rrdvar2number(rva)
  167. );
  168. netdata_log_debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " NETDATA_DOUBLE_FORMAT, rrdvar_name(rva), rrdvar2number(rva));
  169. }
  170. void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva) {
  171. if(rrdhost_can_send_definitions_to_parent(host)) {
  172. BUFFER *wb = sender_start(host->sender);
  173. rrdpush_sender_add_host_variable_to_buffer(wb, rva);
  174. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  175. sender_thread_buffer_free();
  176. }
  177. }
  178. struct custom_host_variables_callback {
  179. BUFFER *wb;
  180. };
  181. static int rrdpush_sender_thread_custom_host_variables_callback(const DICTIONARY_ITEM *item __maybe_unused, void *rrdvar_ptr __maybe_unused, void *struct_ptr) {
  182. const RRDVAR_ACQUIRED *rv = (const RRDVAR_ACQUIRED *)item;
  183. struct custom_host_variables_callback *tmp = struct_ptr;
  184. BUFFER *wb = tmp->wb;
  185. if(unlikely(rrdvar_flags(rv) & RRDVAR_FLAG_CUSTOM_HOST_VAR && rrdvar_type(rv) == RRDVAR_TYPE_CALCULATED)) {
  186. rrdpush_sender_add_host_variable_to_buffer(wb, rv);
  187. return 1;
  188. }
  189. return 0;
  190. }
  191. static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) {
  192. if(rrdhost_can_send_definitions_to_parent(host)) {
  193. BUFFER *wb = sender_start(host->sender);
  194. struct custom_host_variables_callback tmp = {
  195. .wb = wb
  196. };
  197. int ret = rrdvar_walkthrough_read(host->rrdvars, rrdpush_sender_thread_custom_host_variables_callback, &tmp);
  198. (void)ret;
  199. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
  200. sender_thread_buffer_free();
  201. netdata_log_debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret);
  202. }
  203. }
  204. // resets all the chart, so that their definitions
  205. // will be resent to the central netdata
  206. static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) {
  207. RRDSET *st;
  208. rrdset_foreach_read(st, host) {
  209. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED | RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  210. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  211. st->upstream_resync_time_s = 0;
  212. RRDDIM *rd;
  213. rrddim_foreach_read(rd, st)
  214. rrddim_clear_exposed(rd);
  215. rrddim_foreach_done(rd);
  216. }
  217. rrdset_foreach_done(st);
  218. rrdhost_sender_replicating_charts_zero(host);
  219. }
  220. static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t now_s, bool have_mutex, bool force) {
  221. static __thread time_t last_reset_time_s = 0;
  222. if(!force && now_s - last_reset_time_s < 300)
  223. return;
  224. if(!have_mutex)
  225. sender_lock(s);
  226. rrdpush_sender_last_buffer_recreate_set(s, now_s);
  227. last_reset_time_s = now_s;
  228. if(s->buffer && s->buffer->size > CBUFFER_INITIAL_SIZE) {
  229. size_t max = s->buffer->max_size;
  230. cbuffer_free(s->buffer);
  231. s->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, max, &netdata_buffers_statistics.cbuffers_streaming);
  232. }
  233. sender_thread_buffer_free();
  234. if(!have_mutex)
  235. sender_unlock(s);
  236. }
  237. static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
  238. rrdpush_sender_set_flush_time(host->sender);
  239. sender_lock(host->sender);
  240. // flush the output buffer from any data it may have
  241. cbuffer_flush(host->sender->buffer);
  242. rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
  243. replication_recalculate_buffer_used_ratio_unsafe(host->sender);
  244. sender_unlock(host->sender);
  245. }
  246. static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
  247. rrdpush_sender_set_flush_time(host->sender);
  248. // stop all replication commands inflight
  249. replication_sender_delete_pending_requests(host->sender);
  250. // reset the state of all charts
  251. rrdpush_sender_thread_reset_all_charts(host);
  252. rrdpush_sender_replicating_charts_zero(host->sender);
  253. }
  254. static void rrdpush_sender_on_connect(RRDHOST *host) {
  255. rrdpush_sender_cbuffer_flush(host);
  256. rrdpush_sender_charts_and_replication_reset(host);
  257. }
  258. static void rrdpush_sender_after_connect(RRDHOST *host) {
  259. rrdpush_sender_thread_send_custom_host_variables(host);
  260. }
  261. static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
  262. #ifdef ENABLE_HTTPS
  263. netdata_ssl_close(&host->sender->ssl);
  264. #endif
  265. if(host->sender->rrdpush_sender_socket != -1) {
  266. close(host->sender->rrdpush_sender_socket);
  267. host->sender->rrdpush_sender_socket = -1;
  268. }
  269. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  270. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  271. // do not flush the circular buffer here
  272. // this function is called sometimes with the mutex lock, sometimes without the lock
  273. rrdpush_sender_charts_and_replication_reset(host);
  274. }
  275. void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
  276. {
  277. se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
  278. se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
  279. se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
  280. se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):strdupz("");
  281. se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
  282. }
  283. void rrdpush_clean_encoded(stream_encoded_t *se)
  284. {
  285. if (se->os_name)
  286. freez(se->os_name);
  287. if (se->os_id)
  288. freez(se->os_id);
  289. if (se->os_version)
  290. freez(se->os_version);
  291. if (se->kernel_name)
  292. freez(se->kernel_name);
  293. if (se->kernel_version)
  294. freez(se->kernel_version);
  295. }
  296. struct {
  297. const char *response;
  298. size_t length;
  299. int32_t version;
  300. bool dynamic;
  301. const char *error;
  302. int worker_job_id;
  303. int postpone_reconnect_seconds;
  304. bool prevent_log;
  305. } stream_responses[] = {
  306. {
  307. .response = START_STREAMING_PROMPT_VN,
  308. .length = sizeof(START_STREAMING_PROMPT_VN) - 1,
  309. .version = STREAM_HANDSHAKE_OK_V3, // and above
  310. .dynamic = true, // dynamic = we will parse the version / capabilities
  311. .error = NULL,
  312. .worker_job_id = 0,
  313. .postpone_reconnect_seconds = 0,
  314. },
  315. {
  316. .response = START_STREAMING_PROMPT_V2,
  317. .length = sizeof(START_STREAMING_PROMPT_V2) - 1,
  318. .version = STREAM_HANDSHAKE_OK_V2,
  319. .dynamic = false,
  320. .error = NULL,
  321. .worker_job_id = 0,
  322. .postpone_reconnect_seconds = 0,
  323. },
  324. {
  325. .response = START_STREAMING_PROMPT_V1,
  326. .length = sizeof(START_STREAMING_PROMPT_V1) - 1,
  327. .version = STREAM_HANDSHAKE_OK_V1,
  328. .dynamic = false,
  329. .error = NULL,
  330. .worker_job_id = 0,
  331. .postpone_reconnect_seconds = 0,
  332. },
  333. {
  334. .response = START_STREAMING_ERROR_SAME_LOCALHOST,
  335. .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
  336. .version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
  337. .dynamic = false,
  338. .error = "remote server rejected this stream, the host we are trying to stream is its localhost",
  339. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  340. .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
  341. .prevent_log = true,
  342. },
  343. {
  344. .response = START_STREAMING_ERROR_ALREADY_STREAMING,
  345. .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
  346. .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
  347. .dynamic = false,
  348. .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
  349. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  350. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  351. .prevent_log = true,
  352. },
  353. {
  354. .response = START_STREAMING_ERROR_NOT_PERMITTED,
  355. .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
  356. .version = STREAM_HANDSHAKE_ERROR_DENIED,
  357. .dynamic = false,
  358. .error = "remote server denied access, probably we don't have the right API key?",
  359. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  360. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  361. },
  362. {
  363. .response = START_STREAMING_ERROR_BUSY_TRY_LATER,
  364. .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
  365. .version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
  366. .dynamic = false,
  367. .error = "remote server is currently busy, we should try later",
  368. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  369. .postpone_reconnect_seconds = 2 * 60, // 2 minutes
  370. },
  371. {
  372. .response = START_STREAMING_ERROR_INTERNAL_ERROR,
  373. .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
  374. .version = STREAM_HANDSHAKE_INTERNAL_ERROR,
  375. .dynamic = false,
  376. .error = "remote server is encountered an internal error, we should try later",
  377. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  378. .postpone_reconnect_seconds = 5 * 60, // 5 minutes
  379. },
  380. {
  381. .response = START_STREAMING_ERROR_INITIALIZATION,
  382. .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
  383. .version = STREAM_HANDSHAKE_INITIALIZATION,
  384. .dynamic = false,
  385. .error = "remote server is initializing, we should try later",
  386. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  387. .postpone_reconnect_seconds = 2 * 60, // 2 minute
  388. },
  389. // terminator
  390. {
  391. .response = NULL,
  392. .length = 0,
  393. .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
  394. .dynamic = false,
  395. .error = "remote node response is not understood, is it Netdata?",
  396. .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
  397. .postpone_reconnect_seconds = 1 * 60, // 1 minute
  398. .prevent_log = false,
  399. }
  400. };
  401. static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender_state *s, char *http, size_t http_length) {
  402. int32_t version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE;
  403. int i;
  404. for(i = 0; stream_responses[i].response ; i++) {
  405. if(stream_responses[i].dynamic &&
  406. http_length > stream_responses[i].length && http_length < (stream_responses[i].length + 30) &&
  407. strncmp(http, stream_responses[i].response, stream_responses[i].length) == 0) {
  408. version = str2i(&http[stream_responses[i].length]);
  409. break;
  410. }
  411. else if(http_length == stream_responses[i].length && strcmp(http, stream_responses[i].response) == 0) {
  412. version = stream_responses[i].version;
  413. break;
  414. }
  415. }
  416. if(version >= STREAM_HANDSHAKE_OK_V1) {
  417. host->destination->reason = version;
  418. host->destination->postpone_reconnection_until = now_realtime_sec() + s->reconnect_delay;
  419. s->capabilities = convert_stream_version_to_capabilities(version, host, true);
  420. return true;
  421. }
  422. bool prevent_log = stream_responses[i].prevent_log;
  423. const char *error = stream_responses[i].error;
  424. int worker_job_id = stream_responses[i].worker_job_id;
  425. int delay = stream_responses[i].postpone_reconnect_seconds;
  426. worker_is_busy(worker_job_id);
  427. rrdpush_sender_thread_close_socket(host);
  428. host->destination->reason = version;
  429. host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
  430. char buf[LOG_DATE_LENGTH];
  431. log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
  432. if(prevent_log)
  433. internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
  434. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  435. else
  436. netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
  437. rrdhost_hostname(host), s->connected_to, error, delay, buf);
  438. return false;
  439. }
  440. static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
  441. #ifdef ENABLE_HTTPS
  442. RRDHOST *host = s->host;
  443. bool ssl_required = host->destination && host->destination->ssl;
  444. netdata_ssl_close(&host->sender->ssl);
  445. if(!ssl_required)
  446. return true;
  447. if (netdata_ssl_open(&host->sender->ssl, netdata_ssl_streaming_sender_ctx, s->rrdpush_sender_socket)) {
  448. if(!netdata_ssl_connect(&host->sender->ssl)) {
  449. // couldn't connect
  450. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  451. rrdpush_sender_thread_close_socket(host);
  452. host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
  453. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  454. return false;
  455. }
  456. if (netdata_ssl_validate_certificate_sender &&
  457. security_test_certificate(host->sender->ssl.conn)) {
  458. // certificate is not valid
  459. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  460. netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
  461. rrdpush_sender_thread_close_socket(host);
  462. host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
  463. host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
  464. return false;
  465. }
  466. return true;
  467. }
  468. netdata_log_error("SSL: failed to establish connection.");
  469. return false;
  470. #else
  471. // SSL is not enabled
  472. return true;
  473. #endif
  474. }
  475. static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_port, int timeout, struct sender_state *s) {
  476. struct timeval tv = {
  477. .tv_sec = timeout,
  478. .tv_usec = 0
  479. };
  480. // make sure the socket is closed
  481. rrdpush_sender_thread_close_socket(host);
  482. s->rrdpush_sender_socket = connect_to_one_of_destinations(
  483. host
  484. , default_port
  485. , &tv
  486. , &s->reconnects_counter
  487. , s->connected_to
  488. , sizeof(s->connected_to)-1
  489. , &host->destination
  490. );
  491. if(unlikely(s->rrdpush_sender_socket == -1)) {
  492. // netdata_log_error("STREAM %s [send to %s]: could not connect to parent node at this time.", rrdhost_hostname(host), host->rrdpush_send_destination);
  493. return false;
  494. }
  495. // netdata_log_info("STREAM %s [send to %s]: initializing communication...", rrdhost_hostname(host), s->connected_to);
  496. // reset our capabilities to default
  497. s->capabilities = stream_our_capabilities(host, true);
  498. #ifdef ENABLE_RRDPUSH_COMPRESSION
  499. // If we don't want compression, remove it from our capabilities
  500. if(!(s->flags & SENDER_FLAG_COMPRESSION))
  501. s->capabilities &= ~STREAM_CAP_COMPRESSION;
  502. #endif // ENABLE_RRDPUSH_COMPRESSION
  503. /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
  504. version negotiation resulted in a high enough version.
  505. */
  506. stream_encoded_t se;
  507. rrdpush_encode_variable(&se, host);
  508. host->sender->hops = host->system_info->hops + 1;
  509. char http[HTTP_HEADER_SIZE + 1];
  510. int eol = snprintfz(http, HTTP_HEADER_SIZE,
  511. "STREAM "
  512. "key=%s"
  513. "&hostname=%s"
  514. "&registry_hostname=%s"
  515. "&machine_guid=%s"
  516. "&update_every=%d"
  517. "&os=%s"
  518. "&timezone=%s"
  519. "&abbrev_timezone=%s"
  520. "&utc_offset=%d"
  521. "&hops=%d"
  522. "&ml_capable=%d"
  523. "&ml_enabled=%d"
  524. "&mc_version=%d"
  525. "&tags=%s"
  526. "&ver=%u"
  527. "&NETDATA_INSTANCE_CLOUD_TYPE=%s"
  528. "&NETDATA_INSTANCE_CLOUD_INSTANCE_TYPE=%s"
  529. "&NETDATA_INSTANCE_CLOUD_INSTANCE_REGION=%s"
  530. "&NETDATA_SYSTEM_OS_NAME=%s"
  531. "&NETDATA_SYSTEM_OS_ID=%s"
  532. "&NETDATA_SYSTEM_OS_ID_LIKE=%s"
  533. "&NETDATA_SYSTEM_OS_VERSION=%s"
  534. "&NETDATA_SYSTEM_OS_VERSION_ID=%s"
  535. "&NETDATA_SYSTEM_OS_DETECTION=%s"
  536. "&NETDATA_HOST_IS_K8S_NODE=%s"
  537. "&NETDATA_SYSTEM_KERNEL_NAME=%s"
  538. "&NETDATA_SYSTEM_KERNEL_VERSION=%s"
  539. "&NETDATA_SYSTEM_ARCHITECTURE=%s"
  540. "&NETDATA_SYSTEM_VIRTUALIZATION=%s"
  541. "&NETDATA_SYSTEM_VIRT_DETECTION=%s"
  542. "&NETDATA_SYSTEM_CONTAINER=%s"
  543. "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
  544. "&NETDATA_CONTAINER_OS_NAME=%s"
  545. "&NETDATA_CONTAINER_OS_ID=%s"
  546. "&NETDATA_CONTAINER_OS_ID_LIKE=%s"
  547. "&NETDATA_CONTAINER_OS_VERSION=%s"
  548. "&NETDATA_CONTAINER_OS_VERSION_ID=%s"
  549. "&NETDATA_CONTAINER_OS_DETECTION=%s"
  550. "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s"
  551. "&NETDATA_SYSTEM_CPU_FREQ=%s"
  552. "&NETDATA_SYSTEM_TOTAL_RAM=%s"
  553. "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s"
  554. "&NETDATA_PROTOCOL_VERSION=%s"
  555. " HTTP/1.1\r\n"
  556. "User-Agent: %s/%s\r\n"
  557. "Accept: */*\r\n\r\n"
  558. , host->rrdpush_send_api_key
  559. , rrdhost_hostname(host)
  560. , rrdhost_registry_hostname(host)
  561. , host->machine_guid
  562. , default_rrd_update_every
  563. , rrdhost_os(host)
  564. , rrdhost_timezone(host)
  565. , rrdhost_abbrev_timezone(host)
  566. , host->utc_offset
  567. , host->sender->hops
  568. , host->system_info->ml_capable
  569. , host->system_info->ml_enabled
  570. , host->system_info->mc_version
  571. , rrdhost_tags(host)
  572. , s->capabilities
  573. , (host->system_info->cloud_provider_type) ? host->system_info->cloud_provider_type : ""
  574. , (host->system_info->cloud_instance_type) ? host->system_info->cloud_instance_type : ""
  575. , (host->system_info->cloud_instance_region) ? host->system_info->cloud_instance_region : ""
  576. , se.os_name
  577. , se.os_id
  578. , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
  579. , se.os_version
  580. , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : ""
  581. , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : ""
  582. , (host->system_info->is_k8s_node) ? host->system_info->is_k8s_node : ""
  583. , se.kernel_name
  584. , se.kernel_version
  585. , (host->system_info->architecture) ? host->system_info->architecture : ""
  586. , (host->system_info->virtualization) ? host->system_info->virtualization : ""
  587. , (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
  588. , (host->system_info->container) ? host->system_info->container : ""
  589. , (host->system_info->container_detection) ? host->system_info->container_detection : ""
  590. , (host->system_info->container_os_name) ? host->system_info->container_os_name : ""
  591. , (host->system_info->container_os_id) ? host->system_info->container_os_id : ""
  592. , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : ""
  593. , (host->system_info->container_os_version) ? host->system_info->container_os_version : ""
  594. , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : ""
  595. , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : ""
  596. , (host->system_info->host_cores) ? host->system_info->host_cores : ""
  597. , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : ""
  598. , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : ""
  599. , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : ""
  600. , STREAMING_PROTOCOL_VERSION
  601. , rrdhost_program_name(host)
  602. , rrdhost_program_version(host)
  603. );
  604. http[eol] = 0x00;
  605. rrdpush_clean_encoded(&se);
  606. if(!rrdpush_sender_connect_ssl(s))
  607. return false;
  608. ssize_t bytes, len = (ssize_t)strlen(http);
  609. bytes = send_timeout(
  610. #ifdef ENABLE_HTTPS
  611. &host->sender->ssl,
  612. #endif
  613. s->rrdpush_sender_socket,
  614. http,
  615. len,
  616. 0,
  617. timeout);
  618. if(bytes <= 0) { // timeout is 0
  619. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  620. rrdpush_sender_thread_close_socket(host);
  621. netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
  622. host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
  623. host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
  624. return false;
  625. }
  626. bytes = recv_timeout(
  627. #ifdef ENABLE_HTTPS
  628. &host->sender->ssl,
  629. #endif
  630. s->rrdpush_sender_socket,
  631. http,
  632. HTTP_HEADER_SIZE,
  633. 0,
  634. timeout);
  635. if(bytes <= 0) { // timeout is 0
  636. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  637. rrdpush_sender_thread_close_socket(host);
  638. netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
  639. host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
  640. host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
  641. return false;
  642. }
  643. if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
  644. netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
  645. if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
  646. netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
  647. http[bytes] = '\0';
  648. netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
  649. if(!rrdpush_sender_validate_response(host, s, http, bytes))
  650. return false;
  651. #ifdef ENABLE_RRDPUSH_COMPRESSION
  652. if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
  653. rrdpush_compressor_reset(&s->compressor);
  654. else
  655. rrdpush_compressor_destroy(&s->compressor);
  656. #endif // ENABLE_RRDPUSH_COMPRESSION
  657. log_sender_capabilities(s);
  658. netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
  659. return true;
  660. }
  661. static bool attempt_to_connect(struct sender_state *state)
  662. {
  663. state->send_attempts = 0;
  664. // reset the bytes we have sent for this session
  665. state->sent_bytes_on_this_connection = 0;
  666. memset(state->sent_bytes_on_this_connection_per_type, 0, sizeof(state->sent_bytes_on_this_connection_per_type));
  667. if(rrdpush_sender_thread_connect_to_parent(state->host, state->default_port, state->timeout, state)) {
  668. // reset the buffer, to properly send charts and metrics
  669. rrdpush_sender_on_connect(state->host);
  670. // send from the beginning
  671. state->begin = 0;
  672. // make sure the next reconnection will be immediate
  673. state->not_connected_loops = 0;
  674. // let the data collection threads know we are ready
  675. rrdhost_flag_set(state->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  676. rrdpush_sender_after_connect(state->host);
  677. return true;
  678. }
  679. // we couldn't connect
  680. // increase the failed connections counter
  681. state->not_connected_loops++;
  682. // slow re-connection on repeating errors
  683. usec_t now_ut = now_monotonic_usec();
  684. usec_t end_ut = now_ut + USEC_PER_SEC * state->reconnect_delay;
  685. while(now_ut < end_ut) {
  686. netdata_thread_testcancel();
  687. sleep_usec(500 * USEC_PER_MS); // seconds
  688. now_ut = now_monotonic_usec();
  689. }
  690. return false;
  691. }
  692. // TCP window is open, and we have data to transmit.
  693. static ssize_t attempt_to_send(struct sender_state *s) {
  694. ssize_t ret;
  695. #ifdef NETDATA_INTERNAL_CHECKS
  696. struct circular_buffer *cb = s->buffer;
  697. #endif
  698. sender_lock(s);
  699. char *chunk;
  700. size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
  701. netdata_log_debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
  702. #ifdef ENABLE_HTTPS
  703. if(SSL_connection(&s->ssl))
  704. ret = netdata_ssl_write(&s->ssl, chunk, outstanding);
  705. else
  706. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  707. #else
  708. ret = send(s->rrdpush_sender_socket, chunk, outstanding, MSG_DONTWAIT);
  709. #endif
  710. if (likely(ret > 0)) {
  711. cbuffer_remove_unsafe(s->buffer, ret);
  712. s->sent_bytes_on_this_connection += ret;
  713. s->sent_bytes += ret;
  714. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: Sent %zd bytes", rrdhost_hostname(s->host), s->connected_to, ret);
  715. }
  716. else if (ret == -1 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
  717. netdata_log_debug(D_STREAM, "STREAM %s [send to %s]: unavailable after polling POLLOUT", rrdhost_hostname(s->host), s->connected_to);
  718. else if (ret == -1) {
  719. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
  720. netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
  721. netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
  722. rrdpush_sender_thread_close_socket(s->host);
  723. }
  724. else
  725. netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
  726. replication_recalculate_buffer_used_ratio_unsafe(s);
  727. sender_unlock(s);
  728. return ret;
  729. }
  730. static ssize_t attempt_read(struct sender_state *s) {
  731. ssize_t ret;
  732. #ifdef ENABLE_HTTPS
  733. if (SSL_connection(&s->ssl))
  734. ret = netdata_ssl_read(&s->ssl, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1);
  735. else
  736. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  737. #else
  738. ret = recv(s->rrdpush_sender_socket, s->read_buffer + s->read_len, sizeof(s->read_buffer) - s->read_len - 1,MSG_DONTWAIT);
  739. #endif
  740. if (ret > 0) {
  741. s->read_len += ret;
  742. return ret;
  743. }
  744. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
  745. return ret;
  746. #ifdef ENABLE_HTTPS
  747. if (SSL_connection(&s->ssl))
  748. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
  749. else
  750. #endif
  751. if (ret == 0 || errno == ECONNRESET) {
  752. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED);
  753. netdata_log_error("STREAM %s [send to %s]: connection closed by far end.", rrdhost_hostname(s->host), s->connected_to);
  754. }
  755. else {
  756. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR);
  757. netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
  758. }
  759. rrdpush_sender_thread_close_socket(s->host);
  760. return ret;
  761. }
  762. struct inflight_stream_function {
  763. struct sender_state *sender;
  764. STRING *transaction;
  765. usec_t received_ut;
  766. };
  767. void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
  768. struct inflight_stream_function *tmp = data;
  769. struct sender_state *s = tmp->sender;
  770. if(rrdhost_can_send_definitions_to_parent(s->host)) {
  771. BUFFER *wb = sender_start(s);
  772. pluginsd_function_result_begin_to_buffer(wb
  773. , string2str(tmp->transaction)
  774. , code
  775. , functions_content_type_to_format(func_wb->content_type)
  776. , func_wb->expires);
  777. buffer_fast_strcat(wb, buffer_tostring(func_wb), buffer_strlen(func_wb));
  778. pluginsd_function_result_end_to_buffer(wb);
  779. sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
  780. sender_thread_buffer_free();
  781. internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).",
  782. rrdhost_hostname(s->host), s->connected_to,
  783. string2str(tmp->transaction),
  784. buffer_strlen(func_wb),
  785. now_realtime_usec() - tmp->received_ut);
  786. }
  787. string_freez(tmp->transaction);
  788. buffer_free(func_wb);
  789. freez(tmp);
  790. }
  791. // This is just a placeholder until the gap filling state machine is inserted
  792. void execute_commands(struct sender_state *s) {
  793. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  794. char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
  795. *end = 0;
  796. while( start < end && (newline = strchr(start, '\n')) ) {
  797. *newline = '\0';
  798. if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
  799. if (buffer_strlen(s->function_payload.payload) != 0)
  800. buffer_strcat(s->function_payload.payload, "\n");
  801. buffer_strcat(s->function_payload.payload, start);
  802. start = newline + 1;
  803. continue;
  804. }
  805. netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
  806. gettid(), s->connected_to, rrdhost_hostname(s->host), start);
  807. // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
  808. char *words[PLUGINSD_MAX_WORDS] = { NULL };
  809. size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
  810. const char *keyword = get_word(words, num_words, 0);
  811. if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
  812. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  813. char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
  814. char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
  815. char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
  816. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  817. netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  818. rrdhost_hostname(s->host), s->connected_to,
  819. keyword,
  820. transaction?transaction:"(unset)",
  821. timeout_s?timeout_s:"(unset)",
  822. function?function:"(unset)");
  823. }
  824. else {
  825. int timeout = str2i(timeout_s);
  826. if(timeout <= 0) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  827. struct inflight_stream_function *tmp = callocz(1, sizeof(struct inflight_stream_function));
  828. tmp->received_ut = now_realtime_usec();
  829. tmp->sender = s;
  830. tmp->transaction = string_strdupz(transaction);
  831. BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
  832. char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
  833. int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
  834. stream_execute_function_callback, tmp, NULL, NULL, payload);
  835. if(code != HTTP_RESP_OK) {
  836. if (!buffer_strlen(wb))
  837. rrd_call_function_error(wb, "Failed to route request to collector", code);
  838. stream_execute_function_callback(wb, code, tmp);
  839. }
  840. }
  841. if (s->receiving_function_payload) {
  842. s->receiving_function_payload = false;
  843. buffer_free(s->function_payload.payload);
  844. freez(s->function_payload.txid);
  845. freez(s->function_payload.timeout);
  846. freez(s->function_payload.fn_name);
  847. memset(&s->function_payload, 0, sizeof(struct function_payload_state));
  848. }
  849. }
  850. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
  851. if (s->receiving_function_payload) {
  852. netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
  853. s->receiving_function_payload = false;
  854. buffer_free(s->function_payload.payload);
  855. s->function_payload.payload = NULL;
  856. // TODO send error response
  857. }
  858. char *transaction = get_word(words, num_words, 1);
  859. char *timeout_s = get_word(words, num_words, 2);
  860. char *function = get_word(words, num_words, 3);
  861. if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
  862. netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
  863. rrdhost_hostname(s->host), s->connected_to,
  864. keyword,
  865. transaction?transaction:"(unset)",
  866. timeout_s?timeout_s:"(unset)",
  867. function?function:"(unset)");
  868. }
  869. s->receiving_function_payload = true;
  870. s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
  871. s->function_payload.txid = strdupz(get_word(words, num_words, 1));
  872. s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
  873. s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
  874. }
  875. else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
  876. worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
  877. char *transaction = get_word(words, num_words, 1);
  878. if(transaction && *transaction)
  879. rrd_function_cancel(transaction);
  880. }
  881. else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
  882. worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
  883. const char *chart_id = get_word(words, num_words, 1);
  884. const char *start_streaming = get_word(words, num_words, 2);
  885. const char *after = get_word(words, num_words, 3);
  886. const char *before = get_word(words, num_words, 4);
  887. if (!chart_id || !start_streaming || !after || !before) {
  888. netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
  889. " (chart=%s, start_streaming=%s, after=%s, before=%s)",
  890. rrdhost_hostname(s->host), s->connected_to,
  891. keyword,
  892. chart_id ? chart_id : "(unset)",
  893. start_streaming ? start_streaming : "(unset)",
  894. after ? after : "(unset)",
  895. before ? before : "(unset)");
  896. }
  897. else {
  898. replication_add_request(s, chart_id,
  899. strtoll(after, NULL, 0),
  900. strtoll(before, NULL, 0),
  901. !strcmp(start_streaming, "true")
  902. );
  903. }
  904. }
  905. else {
  906. netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
  907. }
  908. worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
  909. start = newline + 1;
  910. }
  911. if (start < end) {
  912. memmove(s->read_buffer, start, end-start);
  913. s->read_len = end - start;
  914. }
  915. else {
  916. s->read_buffer[0] = '\0';
  917. s->read_len = 0;
  918. }
  919. }
  920. struct rrdpush_sender_thread_data {
  921. RRDHOST *host;
  922. char *pipe_buffer;
  923. };
  924. static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
  925. static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
  926. bool ret = true;
  927. netdata_mutex_lock(&mutex);
  928. int new_pipe_fds[2];
  929. if(reopen) {
  930. if(pipe(new_pipe_fds) != 0) {
  931. netdata_log_error("STREAM %s [send]: cannot create required pipe.", rrdhost_hostname(host));
  932. new_pipe_fds[PIPE_READ] = -1;
  933. new_pipe_fds[PIPE_WRITE] = -1;
  934. ret = false;
  935. }
  936. }
  937. int old_pipe_fds[2];
  938. old_pipe_fds[PIPE_READ] = pipe_fds[PIPE_READ];
  939. old_pipe_fds[PIPE_WRITE] = pipe_fds[PIPE_WRITE];
  940. if(reopen) {
  941. pipe_fds[PIPE_READ] = new_pipe_fds[PIPE_READ];
  942. pipe_fds[PIPE_WRITE] = new_pipe_fds[PIPE_WRITE];
  943. }
  944. else {
  945. pipe_fds[PIPE_READ] = -1;
  946. pipe_fds[PIPE_WRITE] = -1;
  947. }
  948. if(old_pipe_fds[PIPE_READ] > 2)
  949. close(old_pipe_fds[PIPE_READ]);
  950. if(old_pipe_fds[PIPE_WRITE] > 2)
  951. close(old_pipe_fds[PIPE_WRITE]);
  952. netdata_mutex_unlock(&mutex);
  953. return ret;
  954. }
  955. void rrdpush_signal_sender_to_wake_up(struct sender_state *s) {
  956. if(unlikely(s->tid == gettid()))
  957. return;
  958. RRDHOST *host = s->host;
  959. int pipe_fd = s->rrdpush_sender_pipe[PIPE_WRITE];
  960. // signal the sender there are more data
  961. if (pipe_fd != -1 && write(pipe_fd, " ", 1) == -1) {
  962. netdata_log_error("STREAM %s [send]: cannot write to internal pipe.", rrdhost_hostname(host));
  963. rrdpush_sender_pipe_close(host, s->rrdpush_sender_pipe, true);
  964. }
  965. }
  966. static bool rrdhost_set_sender(RRDHOST *host) {
  967. if(unlikely(!host->sender)) return false;
  968. bool ret = false;
  969. sender_lock(host->sender);
  970. if(!host->sender->tid) {
  971. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  972. rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
  973. host->rrdpush_sender_connection_counter++;
  974. host->sender->tid = gettid();
  975. host->sender->last_state_since_t = now_realtime_sec();
  976. host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
  977. ret = true;
  978. }
  979. sender_unlock(host->sender);
  980. rrdpush_reset_destinations_postpone_time(host);
  981. return ret;
  982. }
  983. static void rrdhost_clear_sender___while_having_sender_mutex(RRDHOST *host) {
  984. if(unlikely(!host->sender)) return;
  985. if(host->sender->tid == gettid()) {
  986. host->sender->tid = 0;
  987. host->sender->exit.shutdown = false;
  988. rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN | RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  989. host->sender->last_state_since_t = now_realtime_sec();
  990. if(host->destination) {
  991. host->destination->since = host->sender->last_state_since_t;
  992. host->destination->reason = host->sender->exit.reason;
  993. }
  994. }
  995. rrdpush_reset_destinations_postpone_time(host);
  996. }
  997. static bool rrdhost_sender_should_exit(struct sender_state *s) {
  998. // check for outstanding cancellation requests
  999. netdata_thread_testcancel();
  1000. if(unlikely(!service_running(SERVICE_STREAMING))) {
  1001. if(!s->exit.reason)
  1002. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT;
  1003. return true;
  1004. }
  1005. if(unlikely(!rrdhost_has_rrdpush_sender_enabled(s->host))) {
  1006. if(!s->exit.reason)
  1007. s->exit.reason = STREAM_HANDSHAKE_NON_STREAMABLE_HOST;
  1008. return true;
  1009. }
  1010. if(unlikely(s->exit.shutdown)) {
  1011. if(!s->exit.reason)
  1012. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN;
  1013. return true;
  1014. }
  1015. if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_ORPHAN))) {
  1016. if(!s->exit.reason)
  1017. s->exit.reason = STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST;
  1018. return true;
  1019. }
  1020. return false;
  1021. }
  1022. static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
  1023. struct rrdpush_sender_thread_data *s = ptr;
  1024. worker_unregister();
  1025. RRDHOST *host = s->host;
  1026. sender_lock(host->sender);
  1027. netdata_log_info("STREAM %s [send]: sending thread exits %s",
  1028. rrdhost_hostname(host),
  1029. host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
  1030. rrdpush_sender_thread_close_socket(host);
  1031. rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
  1032. rrdhost_clear_sender___while_having_sender_mutex(host);
  1033. sender_unlock(host->sender);
  1034. freez(s->pipe_buffer);
  1035. freez(s);
  1036. }
  1037. void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
  1038. #ifdef ENABLE_HTTPS
  1039. static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
  1040. spinlock_lock(&sp);
  1041. if(netdata_ssl_streaming_sender_ctx || !host) {
  1042. spinlock_unlock(&sp);
  1043. return;
  1044. }
  1045. for(struct rrdpush_destinations *d = host->destinations; d ; d = d->next) {
  1046. if (d->ssl) {
  1047. // we need to initialize SSL
  1048. netdata_ssl_initialize_ctx(NETDATA_SSL_STREAMING_SENDER_CTX);
  1049. ssl_security_location_for_context(netdata_ssl_streaming_sender_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path);
  1050. // stop the loop
  1051. break;
  1052. }
  1053. }
  1054. spinlock_unlock(&sp);
  1055. #endif
  1056. }
  1057. void *rrdpush_sender_thread(void *ptr) {
  1058. worker_register("STREAMSND");
  1059. worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
  1060. worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
  1061. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_RECEIVE, "receive");
  1062. worker_register_job_name(WORKER_SENDER_JOB_EXECUTE, "execute");
  1063. worker_register_job_name(WORKER_SENDER_JOB_SOCKET_SEND, "send");
  1064. // disconnection reasons
  1065. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT, "disconnect timeout");
  1066. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR, "disconnect poll error");
  1067. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR, "disconnect socket error");
  1068. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW, "disconnect overflow");
  1069. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR, "disconnect ssl error");
  1070. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED, "disconnect parent closed");
  1071. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR, "disconnect receive error");
  1072. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
  1073. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
  1074. worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
  1075. worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
  1076. worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
  1077. worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
  1078. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
  1079. worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
  1080. worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
  1081. struct sender_state *s = ptr;
  1082. if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
  1083. !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
  1084. !*s->host->rrdpush_send_api_key) {
  1085. netdata_log_error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.",
  1086. rrdhost_hostname(s->host), gettid());
  1087. return NULL;
  1088. }
  1089. if(!rrdhost_set_sender(s->host)) {
  1090. netdata_log_error("STREAM %s [send]: thread created (task id %d), but there is another sender running for this host.",
  1091. rrdhost_hostname(s->host), gettid());
  1092. return NULL;
  1093. }
  1094. rrdpush_initialize_ssl_ctx(s->host);
  1095. netdata_log_info("STREAM %s [send]: thread created (task id %d)", rrdhost_hostname(s->host), gettid());
  1096. s->timeout = (int)appconfig_get_number(
  1097. &stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 600);
  1098. s->default_port = (int)appconfig_get_number(
  1099. &stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
  1100. s->buffer->max_size = (size_t)appconfig_get_number(
  1101. &stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024 * 10);
  1102. s->reconnect_delay = (unsigned int)appconfig_get_number(
  1103. &stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
  1104. remote_clock_resync_iterations = (unsigned int)appconfig_get_number(
  1105. &stream_config, CONFIG_SECTION_STREAM,
  1106. "initial clock resync iterations",
  1107. remote_clock_resync_iterations); // TODO: REMOVE FOR SLEW / GAPFILLING
  1108. // initialize rrdpush globals
  1109. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1110. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  1111. int pipe_buffer_size = 10 * 1024;
  1112. #ifdef F_GETPIPE_SZ
  1113. pipe_buffer_size = fcntl(s->rrdpush_sender_pipe[PIPE_READ], F_GETPIPE_SZ);
  1114. #endif
  1115. if(pipe_buffer_size < 10 * 1024)
  1116. pipe_buffer_size = 10 * 1024;
  1117. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1118. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1119. rrdhost_hostname(s->host));
  1120. return NULL;
  1121. }
  1122. struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
  1123. thread_data->pipe_buffer = mallocz(pipe_buffer_size);
  1124. thread_data->host = s->host;
  1125. netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, thread_data);
  1126. size_t iterations = 0;
  1127. time_t now_s = now_monotonic_sec();
  1128. while(!rrdhost_sender_should_exit(s)) {
  1129. iterations++;
  1130. // The connection attempt blocks (after which we use the socket in nonblocking)
  1131. if(unlikely(s->rrdpush_sender_socket == -1)) {
  1132. worker_is_busy(WORKER_SENDER_JOB_CONNECT);
  1133. now_s = now_monotonic_sec();
  1134. rrdpush_sender_cbuffer_recreate_timed(s, now_s, false, true);
  1135. rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1136. s->flags &= ~SENDER_FLAG_OVERFLOW;
  1137. s->read_len = 0;
  1138. s->buffer->read = 0;
  1139. s->buffer->write = 0;
  1140. if(!attempt_to_connect(s))
  1141. continue;
  1142. if(rrdhost_sender_should_exit(s))
  1143. break;
  1144. now_s = s->last_traffic_seen_t = now_monotonic_sec();
  1145. rrdpush_send_claimed_id(s->host);
  1146. rrdpush_send_host_labels(s->host);
  1147. rrdpush_send_global_functions(s->host);
  1148. rrdpush_send_dyncfg(s->host);
  1149. s->replication.oldest_request_after_t = 0;
  1150. rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
  1151. netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
  1152. continue;
  1153. }
  1154. if(iterations % 1000 == 0)
  1155. now_s = now_monotonic_sec();
  1156. // If the TCP window never opened then something is wrong, restart connection
  1157. if(unlikely(now_s - s->last_traffic_seen_t > s->timeout &&
  1158. !rrdpush_sender_pending_replication_requests(s) &&
  1159. !rrdpush_sender_replicating_charts(s)
  1160. )) {
  1161. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
  1162. netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
  1163. rrdpush_sender_thread_close_socket(s->host);
  1164. continue;
  1165. }
  1166. sender_lock(s);
  1167. size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
  1168. size_t available = cbuffer_available_size_unsafe(s->buffer);
  1169. if (unlikely(!outstanding)) {
  1170. rrdpush_sender_pipe_clear_pending_data(s);
  1171. rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
  1172. }
  1173. sender_unlock(s);
  1174. worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);
  1175. if(outstanding)
  1176. s->send_attempts++;
  1177. if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
  1178. if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
  1179. netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
  1180. rrdhost_hostname(s->host));
  1181. rrdpush_sender_thread_close_socket(s->host);
  1182. break;
  1183. }
  1184. }
  1185. worker_is_idle();
  1186. // Wait until buffer opens in the socket or a rrdset_done_push wakes us
  1187. enum {
  1188. Collector = 0,
  1189. Socket = 1,
  1190. };
  1191. struct pollfd fds[2] = {
  1192. [Collector] = {
  1193. .fd = s->rrdpush_sender_pipe[PIPE_READ],
  1194. .events = POLLIN,
  1195. .revents = 0,
  1196. },
  1197. [Socket] = {
  1198. .fd = s->rrdpush_sender_socket,
  1199. .events = POLLIN | (outstanding ? POLLOUT : 0 ),
  1200. .revents = 0,
  1201. }
  1202. };
  1203. int poll_rc = poll(fds, 2, 1000);
  1204. netdata_log_debug(D_STREAM, "STREAM: poll() finished collector=%d socket=%d (current chunk %zu bytes)...",
  1205. fds[Collector].revents, fds[Socket].revents, outstanding);
  1206. if(unlikely(rrdhost_sender_should_exit(s)))
  1207. break;
  1208. internal_error(fds[Collector].fd != s->rrdpush_sender_pipe[PIPE_READ],
  1209. "STREAM %s [send to %s]: pipe changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1210. internal_error(fds[Socket].fd != s->rrdpush_sender_socket,
  1211. "STREAM %s [send to %s]: socket changed after poll().", rrdhost_hostname(s->host), s->connected_to);
  1212. // Spurious wake-ups without error - loop again
  1213. if (poll_rc == 0 || ((poll_rc == -1) && (errno == EAGAIN || errno == EINTR))) {
  1214. netdata_thread_testcancel();
  1215. netdata_log_debug(D_STREAM, "Spurious wakeup");
  1216. now_s = now_monotonic_sec();
  1217. continue;
  1218. }
  1219. // Only errors from poll() are internal, but try restarting the connection
  1220. if(unlikely(poll_rc == -1)) {
  1221. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
  1222. netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
  1223. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1224. rrdpush_sender_thread_close_socket(s->host);
  1225. continue;
  1226. }
  1227. // If we have data and have seen the TCP window open then try to close it by a transmission.
  1228. if(likely(outstanding && (fds[Socket].revents & POLLOUT))) {
  1229. worker_is_busy(WORKER_SENDER_JOB_SOCKET_SEND);
  1230. ssize_t bytes = attempt_to_send(s);
  1231. if(bytes > 0) {
  1232. s->last_traffic_seen_t = now_monotonic_sec();
  1233. worker_set_metric(WORKER_SENDER_JOB_BYTES_SENT, (NETDATA_DOUBLE)bytes);
  1234. }
  1235. }
  1236. // If the collector woke us up then empty the pipe to remove the signal
  1237. if (fds[Collector].revents & (POLLIN|POLLPRI)) {
  1238. worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
  1239. netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
  1240. if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
  1241. netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
  1242. }
  1243. // Read as much as possible to fill the buffer, split into full lines for execution.
  1244. if (fds[Socket].revents & POLLIN) {
  1245. worker_is_busy(WORKER_SENDER_JOB_SOCKET_RECEIVE);
  1246. ssize_t bytes = attempt_read(s);
  1247. if(bytes > 0) {
  1248. s->last_traffic_seen_t = now_monotonic_sec();
  1249. worker_set_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, (NETDATA_DOUBLE)bytes);
  1250. }
  1251. }
  1252. if(unlikely(s->read_len))
  1253. execute_commands(s);
  1254. if(unlikely(fds[Collector].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1255. char *error = NULL;
  1256. if (unlikely(fds[Collector].revents & POLLERR))
  1257. error = "pipe reports errors (POLLERR)";
  1258. else if (unlikely(fds[Collector].revents & POLLHUP))
  1259. error = "pipe closed (POLLHUP)";
  1260. else if (unlikely(fds[Collector].revents & POLLNVAL))
  1261. error = "pipe is invalid (POLLNVAL)";
  1262. if(error) {
  1263. rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
  1264. netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
  1265. rrdhost_hostname(s->host), s->connected_to, error);
  1266. }
  1267. }
  1268. if(unlikely(fds[Socket].revents & (POLLERR|POLLHUP|POLLNVAL))) {
  1269. char *error = NULL;
  1270. if (unlikely(fds[Socket].revents & POLLERR))
  1271. error = "socket reports errors (POLLERR)";
  1272. else if (unlikely(fds[Socket].revents & POLLHUP))
  1273. error = "connection closed by remote end (POLLHUP)";
  1274. else if (unlikely(fds[Socket].revents & POLLNVAL))
  1275. error = "connection is invalid (POLLNVAL)";
  1276. if(unlikely(error)) {
  1277. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
  1278. netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
  1279. rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
  1280. rrdpush_sender_thread_close_socket(s->host);
  1281. }
  1282. }
  1283. // protection from overflow
  1284. if(unlikely(s->flags & SENDER_FLAG_OVERFLOW)) {
  1285. worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
  1286. errno = 0;
  1287. netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
  1288. rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
  1289. rrdpush_sender_thread_close_socket(s->host);
  1290. }
  1291. worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
  1292. }
  1293. netdata_thread_cleanup_pop(1);
  1294. return NULL;
  1295. }