receiver.c 35 KB

  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdpush.h"
  3. #include "web/server/h2o/http_server.h"
  4. extern struct config stream_config;
  5. void receiver_state_free(struct receiver_state *rpt) {
  6. freez(rpt->key);
  7. freez(rpt->hostname);
  8. freez(rpt->registry_hostname);
  9. freez(rpt->machine_guid);
  10. freez(rpt->os);
  11. freez(rpt->timezone);
  12. freez(rpt->abbrev_timezone);
  13. freez(rpt->client_ip);
  14. freez(rpt->client_port);
  15. freez(rpt->program_name);
  16. freez(rpt->program_version);
  17. #ifdef ENABLE_HTTPS
  18. netdata_ssl_close(&rpt->ssl);
  19. #endif
  20. if(rpt->fd != -1) {
  21. internal_error(true, "closing socket...");
  22. close(rpt->fd);
  23. }
  24. rrdpush_decompressor_destroy(&rpt->decompressor);
  25. if(rpt->system_info)
  26. rrdhost_system_info_free(rpt->system_info);
  27. __atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
  28. freez(rpt);
  29. }
  30. #include "collectors/plugins.d/pluginsd_parser.h"
  31. // IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
  34. // this has to be the same at parser.h
  37. #error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
  38. #endif
  39. static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) {
  40. if(unlikely(!size)) {
  41. internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
  42. return 0;
  43. }
  44. #ifdef ENABLE_H2O
  45. if (is_h2o_rrdpush(r))
  46. return (int)h2o_stream_read(r->h2o_ctx, buffer, size);
  47. #endif
  48. int tries = 100;
  49. ssize_t bytes_read;
  50. do {
  51. errno = 0;
  52. #ifdef ENABLE_HTTPS
  53. if (SSL_connection(&r->ssl))
  54. bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
  55. else
  56. bytes_read = read(r->fd, buffer, size);
  57. #else
  58. bytes_read = read(r->fd, buffer, size);
  59. #endif
  60. } while(bytes_read < 0 && errno == EINTR && tries--);
  61. if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
  62. netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
  63. bytes_read = -3;
  64. }
  65. else if (bytes_read == 0) {
  66. netdata_log_error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
  67. bytes_read = -1;
  68. }
  69. else if (bytes_read < 0) {
  70. netdata_log_error("STREAM: %s() failed to read from socket!", __FUNCTION__);
  71. bytes_read = -2;
  72. }
  73. return (int)bytes_read;
  74. }
  75. static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) {
  76. if(code > 0)
  77. return 0;
  78. switch(code) {
  79. case 0:
  80. // asked to read zero bytes
  82. case -1:
  83. // EOF
  85. case -2:
  86. // failed to read
  88. case -3:
  89. // timeout
  91. default:
  92. // anything else
  94. }
  95. }
  96. static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
  98. if(r->reader.read_buffer[r->reader.read_len] != '\0')
  99. fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
  100. #endif
  101. int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
  102. if(unlikely(bytes_read <= 0)) {
  103. *reason = read_stream_error_to_reason(bytes_read);
  104. return false;
  105. }
  106. worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
  108. r->reader.read_len += bytes_read;
  109. r->reader.read_buffer[r->reader.read_len] = '\0';
  110. return true;
  111. }
  112. static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
  113. internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
  114. "%s: read_buffer does not start with zero #2", __FUNCTION__ );
  115. // first use any available uncompressed data
  116. if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) {
  117. size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1;
  118. if (likely(available)) {
  119. size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available);
  120. if (unlikely(!len)) {
  121. internal_error(true, "decompressor returned zero length #1");
  122. return false;
  123. }
  124. r->reader.read_len += (int)len;
  125. r->reader.read_buffer[r->reader.read_len] = '\0';
  126. }
  127. else
  128. internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->reader.read_len);
  129. return true;
  130. }
  131. // no decompressed data available
  132. // read the compression signature of the next block
  133. if(unlikely(r->reader.read_len + r->decompressor.signature_size > sizeof(r->reader.read_buffer) - 1)) {
  134. internal_error(true, "The last incomplete line does not leave enough room for the next compression header! "
  135. "Already have %zd bytes in read_buffer.", r->reader.read_len);
  136. return false;
  137. }
  138. // read the compression signature from the stream
  139. // we have to do a loop here, because read_stream() may return less than the data we need
  140. int bytes_read = 0;
  141. do {
  142. int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
  143. if (unlikely(ret <= 0)) {
  144. *reason = read_stream_error_to_reason(ret);
  145. return false;
  146. }
  147. bytes_read += ret;
  148. } while(unlikely(bytes_read < (int)r->decompressor.signature_size));
  149. worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
  150. if(unlikely(bytes_read != (int)r->decompressor.signature_size))
  151. fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size);
  152. size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->reader.read_buffer + r->reader.read_len, bytes_read);
  153. if (unlikely(!compressed_message_size)) {
  154. internal_error(true, "multiplexed uncompressed data in compressed stream!");
  155. r->reader.read_len += bytes_read;
  156. r->reader.read_buffer[r->reader.read_len] = '\0';
  157. return true;
  158. }
  159. if(unlikely(compressed_message_size > COMPRESSION_MAX_MSG_SIZE)) {
  160. netdata_log_error("received a compressed message of %zu bytes, which is bigger than the max compressed message size supported of %zu. Ignoring message.",
  161. compressed_message_size, (size_t)COMPRESSION_MAX_MSG_SIZE);
  162. return false;
  163. }
  164. // delete compression header from our read buffer
  165. r->reader.read_buffer[r->reader.read_len] = '\0';
  166. // Read the entire compressed block of compressed data
  167. char compressed[compressed_message_size];
  168. size_t compressed_bytes_read = 0;
  169. do {
  170. size_t start = compressed_bytes_read;
  171. size_t remaining = compressed_message_size - start;
  172. int last_read_bytes = read_stream(r, &compressed[start], remaining);
  173. if (unlikely(last_read_bytes <= 0)) {
  174. *reason = read_stream_error_to_reason(last_read_bytes);
  175. return false;
  176. }
  177. compressed_bytes_read += last_read_bytes;
  178. } while(unlikely(compressed_message_size > compressed_bytes_read));
  179. worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)compressed_bytes_read);
  180. // decompress the compressed block
  181. size_t bytes_to_parse = rrdpush_decompress(&r->decompressor, compressed, compressed_bytes_read);
  182. if (unlikely(!bytes_to_parse)) {
  183. internal_error(true, "no bytes to parse.");
  184. return false;
  185. }
  186. worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse);
  187. // fill read buffer with decompressed data
  188. size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
  189. if (unlikely(!len)) {
  190. internal_error(true, "decompressor returned zero length #2");
  191. return false;
  192. }
  193. r->reader.read_len += (int)len;
  194. r->reader.read_buffer[r->reader.read_len] = '\0';
  195. return true;
  196. }
  197. bool plugin_is_enabled(struct plugind *cd);
  198. static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) {
  199. if(force || !rpt->exit.reason)
  200. rpt->exit.reason = reason;
  201. }
  202. static inline bool receiver_should_stop(struct receiver_state *rpt) {
  203. static __thread size_t counter = 0;
  204. if(unlikely(rpt->exit.shutdown)) {
  205. receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false);
  206. return true;
  207. }
  208. if(unlikely(!service_running(SERVICE_STREAMING))) {
  209. receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false);
  210. return true;
  211. }
  212. if(unlikely((counter++ % 1000) == 0)) {
  213. // check every 1000 lines read
  214. netdata_thread_testcancel();
  215. rpt->last_msg_t = now_monotonic_sec();
  216. }
  217. return false;
  218. }
  219. static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
  220. size_t result = 0;
  221. PARSER *parser = NULL;
  222. {
  223. PARSER_USER_OBJECT user = {
  224. .enabled = plugin_is_enabled(cd),
  225. .host = rpt->host,
  226. .opaque = rpt,
  227. .cd = cd,
  228. .trust_durations = 1,
  229. .capabilities = rpt->capabilities,
  230. };
  231. parser = parser_init(&user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
  232. }
  233. #ifdef ENABLE_H2O
  234. parser->h2o_ctx = rpt->h2o_ctx;
  235. #endif
  236. pluginsd_keywords_init(parser, PARSER_INIT_STREAMING);
  237. rrd_collector_started();
  238. // this keeps the parser with its current value
  239. // so, parser needs to be allocated before pushing it
  240. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser) {
  241. bool compressed_connection = rrdpush_decompression_initialize(rpt);
  242. buffered_reader_init(&rpt->reader);
  244. {
  245. char filename[FILENAME_MAX + 1];
  246. snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
  247. rpt->host) : "unknown"
  248. );
  249. parser->user.stream_log_fp = fopen(filename, "w");
  250. parser->user.stream_log_repertoire = PARSER_REP_METADATA;
  251. }
  252. #endif
  253. CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
  254. ND_LOG_STACK lgs[] = {
  255. ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
  256. ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
  257. ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
  258. ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
  259. ND_LOG_FIELD_END(),
  260. };
  261. ND_LOG_STACK_PUSH(lgs);
  262. while(!receiver_should_stop(rpt)) {
  263. if(!buffered_reader_next_line(&rpt->reader, buffer)) {
  265. bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
  266. : receiver_read_uncompressed(rpt, &reason);
  267. if(unlikely(!have_new_data)) {
  268. receiver_set_exit_reason(rpt, reason, false);
  269. break;
  270. }
  271. continue;
  272. }
  273. if(unlikely(parser_action(parser, buffer->buffer))) {
  274. receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
  275. break;
  276. }
  277. buffer->len = 0;
  278. buffer->buffer[0] = '\0';
  279. }
  280. result = parser->user.data_collections_count;
  281. }
  282. netdata_thread_cleanup_pop(1); // free parser with the pop function
  283. return result;
  284. }
  285. static void rrdpush_receiver_replication_reset(RRDHOST *host) {
  286. RRDSET *st;
  287. rrdset_foreach_read(st, host) {
  290. }
  291. rrdset_foreach_done(st);
  292. rrdhost_receiver_replicating_charts_zero(host);
  293. }
  294. static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
  295. bool signal_rrdcontext = false;
  296. bool set_this = false;
  297. netdata_mutex_lock(&host->receiver_lock);
  298. if (!host->receiver) {
  299. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  300. host->rrdpush_receiver_connection_counter++;
  301. __atomic_add_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
  302. host->receiver = rpt;
  303. rpt->host = host;
  304. host->child_connect_time = now_realtime_sec();
  305. host->child_disconnected_time = 0;
  306. host->child_last_chart_command = 0;
  307. host->trigger_chart_obsoletion_check = 1;
  308. if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
  309. if (rpt->config.alarms_delay > 0) {
  310. host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
  311. nd_log(NDLS_DAEMON, NDLP_DEBUG,
  312. "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
  313. rrdhost_hostname(host),
  314. (int64_t) rpt->config.alarms_delay);
  315. }
  316. }
  317. host->health_log.health_log_history = rpt->config.alarms_history;
  318. // this is a test
  319. // if(rpt->hops <= host->sender->hops)
  320. // rrdpush_sender_thread_stop(host, "HOPS MISMATCH", false);
  321. signal_rrdcontext = true;
  322. rrdpush_receiver_replication_reset(host);
  323. rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
  324. aclk_queue_node_info(rpt->host, true);
  325. rrdpush_reset_destinations_postpone_time(host);
  326. set_this = true;
  327. }
  328. netdata_mutex_unlock(&host->receiver_lock);
  329. if(signal_rrdcontext)
  330. rrdcontext_host_child_connected(host);
  331. return set_this;
  332. }
  333. static void rrdhost_clear_receiver(struct receiver_state *rpt) {
  334. bool signal_rrdcontext = false;
  335. RRDHOST *host = rpt->host;
  336. if(host) {
  337. netdata_mutex_lock(&host->receiver_lock);
  338. // Make sure that we detach this thread and don't kill a freshly arriving receiver
  339. if(host->receiver == rpt) {
  340. __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
  341. rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
  342. host->trigger_chart_obsoletion_check = 0;
  343. host->child_connect_time = 0;
  344. host->child_disconnected_time = now_realtime_sec();
  345. if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO)
  346. host->health.health_enabled = 0;
  347. rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
  348. signal_rrdcontext = true;
  349. rrdpush_receiver_replication_reset(host);
  350. rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
  351. host->receiver = NULL;
  352. host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
  353. }
  354. netdata_mutex_unlock(&host->receiver_lock);
  355. if(signal_rrdcontext)
  356. rrdcontext_host_child_disconnected(host);
  357. rrdpush_reset_destinations_postpone_time(host);
  358. }
  359. }
  360. bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
  361. bool ret = false;
  362. netdata_mutex_lock(&host->receiver_lock);
  363. if(host->receiver) {
  364. if(!host->receiver->exit.shutdown) {
  365. host->receiver->exit.shutdown = true;
  366. receiver_set_exit_reason(host->receiver, reason, true);
  367. shutdown(host->receiver->fd, SHUT_RDWR);
  368. }
  369. netdata_thread_cancel(host->receiver->thread);
  370. }
  371. int count = 2000;
  372. while (host->receiver && count-- > 0) {
  373. netdata_mutex_unlock(&host->receiver_lock);
  374. // let the lock for the receiver thread to exit
  375. sleep_usec(1 * USEC_PER_MS);
  376. netdata_mutex_lock(&host->receiver_lock);
  377. }
  378. if(host->receiver)
  379. netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
  380. "thread %d takes too long to stop, giving up..."
  381. , rrdhost_hostname(host)
  382. , host->receiver->client_ip, host->receiver->client_port
  383. , host->receiver->tid);
  384. else
  385. ret = true;
  386. netdata_mutex_unlock(&host->receiver_lock);
  387. return ret;
  388. }
  389. static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *rpt, const char *msg) {
  390. (void) send_timeout(
  391. #ifdef ENABLE_HTTPS
  392. &rpt->ssl,
  393. #endif
  394. rpt->fd,
  395. (char *)msg,
  396. strlen(msg),
  397. 0,
  398. 5);
  399. }
  400. void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
  401. // this function may be called BEFORE we spawn the receiver thread
  402. // so, we need to add the fields again (it does not harm)
  403. ND_LOG_STACK lgs[] = {
  404. ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
  405. ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
  406. ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""),
  408. ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid),
  409. ND_LOG_FIELD_END(),
  410. };
  411. ND_LOG_STACK_PUSH(lgs);
  412. nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'"
  413. , (rpt->key && *rpt->key)? rpt->key : ""
  414. , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : ""
  415. , msg);
  416. nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s"
  417. , (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""
  418. , msg
  419. , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
  420. , stream_handshake_error_to_string(rpt->exit.reason)
  421. , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
  422. );
  423. }
  424. static void rrdpush_receive(struct receiver_state *rpt)
  425. {
  426. rpt->config.mode = default_rrd_memory_mode;
  427. rpt->config.history = default_rrd_history_entries;
  428. rpt->config.health_enabled = health_plugin_enabled();
  429. rpt->config.alarms_delay = 60;
  430. rpt->config.alarms_history = HEALTH_LOG_DEFAULT_HISTORY;
  431. rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled;
  432. rpt->config.rrdpush_destination = default_rrdpush_destination;
  433. rpt->config.rrdpush_api_key = default_rrdpush_api_key;
  434. rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
  435. rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication;
  436. rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
  437. rpt->config.rrdpush_replication_step = default_rrdpush_replication_step;
  438. rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every);
  439. if(rpt->config.update_every < 0) rpt->config.update_every = 1;
  440. rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history);
  441. rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history);
  442. if(rpt->config.history < 5) rpt->config.history = 5;
  443. rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode)));
  444. rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
  445. if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
  446. netdata_log_error("STREAM '%s' [receive from %s:%s]: "
  447. "dbengine is not enabled, falling back to default."
  448. , rpt->hostname
  449. , rpt->client_ip, rpt->client_port
  450. );
  451. rpt->config.mode = default_rrd_memory_mode;
  452. }
  453. rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled);
  454. rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled);
  455. rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay);
  456. rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay);
  457. rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->key, "default health log history", rpt->config.alarms_history);
  458. rpt->config.alarms_history = appconfig_get_number(&stream_config, rpt->machine_guid, "health log history", rpt->config.alarms_history);
  459. rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled);
  460. rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled);
  461. rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination);
  462. rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination);
  463. rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key);
  464. rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key);
  465. rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
  466. rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
  467. rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication);
  468. rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication);
  469. rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
  470. rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
  471. rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
  472. rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
  473. rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
  474. rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
  475. rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
  476. bool is_ephemeral = false;
  477. is_ephemeral = appconfig_get_boolean(&stream_config, rpt->key, "is ephemeral node", CONFIG_BOOLEAN_NO);
  478. is_ephemeral = appconfig_get_boolean(&stream_config, rpt->machine_guid, "is ephemeral node", is_ephemeral);
  479. if(rpt->config.rrdpush_compression) {
  480. char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", RRDPUSH_COMPRESSION_ALGORITHMS_ORDER);
  481. order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order);
  482. rrdpush_parse_compression_order(rpt, order);
  483. }
  484. // find the host for this receiver
  485. {
  486. // this will also update the host with our system_info
  487. RRDHOST *host = rrdhost_find_or_create(
  488. rpt->hostname,
  489. rpt->registry_hostname,
  490. rpt->machine_guid,
  491. rpt->os,
  492. rpt->timezone,
  493. rpt->abbrev_timezone,
  494. rpt->utc_offset,
  495. rpt->program_name,
  496. rpt->program_version,
  497. rpt->config.update_every,
  498. rpt->config.history,
  499. rpt->config.mode,
  500. (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO),
  501. (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination &&
  502. *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key &&
  503. *rpt->config.rrdpush_api_key),
  504. rpt->config.rrdpush_destination,
  505. rpt->config.rrdpush_api_key,
  506. rpt->config.rrdpush_send_charts_matching,
  507. rpt->config.rrdpush_enable_replication,
  508. rpt->config.rrdpush_seconds_to_replicate,
  509. rpt->config.rrdpush_replication_step,
  510. rpt->system_info,
  511. 0);
  512. if(!host) {
  513. rrdpush_receive_log_status(
  514. rpt,"failed to find/create host structure, rejecting connection",
  516. rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
  517. goto cleanup;
  518. }
  519. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
  520. rrdpush_receive_log_status(
  521. rpt, "host is initializing, retry later",
  523. rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
  524. goto cleanup;
  525. }
  526. // system_info has been consumed by the host structure
  527. rpt->system_info = NULL;
  528. if(!rrdhost_set_receiver(host, rpt)) {
  529. rrdpush_receive_log_status(
  530. rpt, "host is already served by another receiver",
  532. rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
  533. goto cleanup;
  534. }
  535. }
  537. netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
  538. "client willing to stream metrics for host '%s' with machine_guid '%s': "
  539. "update every = %d, history = %d, memory mode = %s, health %s,%s"
  540. , rpt->hostname
  541. , rpt->client_ip
  542. , rpt->client_port
  543. , rrdhost_hostname(rpt->host)
  544. , rpt->host->machine_guid
  545. , rpt->host->rrd_update_every
  546. , rpt->host->rrd_history_entries
  547. , rrd_memory_mode_name(rpt->host->rrd_memory_mode)
  548. , (rpt->config.health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((rpt->config.health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
  549. #ifdef ENABLE_HTTPS
  550. , (rpt->ssl.conn != NULL) ? " SSL," : ""
  551. #else
  552. , ""
  553. #endif
  554. );
  556. struct plugind cd = {
  557. .update_every = default_rrd_update_every,
  558. .unsafe = {
  560. .running = true,
  561. .enabled = true,
  562. },
  563. .started_t = now_realtime_sec(),
  564. };
  565. // put the client IP and port into the buffers used by plugins.d
  566. snprintfz(, CONFIG_MAX_NAME, "%s:%s", rpt->client_ip, rpt->client_port);
  567. snprintfz(cd.filename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
  568. snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
  569. snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
  570. rrdpush_select_receiver_compression_algorithm(rpt);
  571. {
  572. // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
  573. char initial_response[HTTP_HEADER_SIZE];
  574. if (stream_has_capability(rpt, STREAM_CAP_VCAPS)) {
  575. log_receiver_capabilities(rpt);
  576. sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->capabilities);
  577. }
  578. else if (stream_has_capability(rpt, STREAM_CAP_VN)) {
  579. log_receiver_capabilities(rpt);
  580. sprintf(initial_response, "%s%d", START_STREAMING_PROMPT_VN, stream_capabilities_to_vn(rpt->capabilities));
  581. }
  582. else if (stream_has_capability(rpt, STREAM_CAP_V2)) {
  583. log_receiver_capabilities(rpt);
  584. sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
  585. }
  586. else { // stream_has_capability(rpt, STREAM_CAP_V1)
  587. log_receiver_capabilities(rpt);
  588. sprintf(initial_response, "%s", START_STREAMING_PROMPT_V1);
  589. }
  590. netdata_log_debug(D_STREAM, "Initial response to %s: %s", rpt->client_ip, initial_response);
  591. #ifdef ENABLE_H2O
  592. if (is_h2o_rrdpush(rpt)) {
  593. h2o_stream_write(rpt->h2o_ctx, initial_response, strlen(initial_response));
  594. } else {
  595. #endif
  596. ssize_t bytes_sent = send_timeout(
  597. #ifdef ENABLE_HTTPS
  598. &rpt->ssl,
  599. #endif
  600. rpt->fd, initial_response, strlen(initial_response), 0, 60);
  601. if(bytes_sent != (ssize_t)strlen(initial_response)) {
  602. internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
  603. rrdpush_receive_log_status(
  604. rpt, "cannot reply back, dropping connection",
  606. goto cleanup;
  607. }
  608. #ifdef ENABLE_H2O
  609. }
  610. #endif
  611. }
  612. #ifdef ENABLE_H2O
  613. unless_h2o_rrdpush(rpt)
  614. #endif
  615. {
  616. // remove the non-blocking flag from the socket
  617. if(sock_delnonblock(rpt->fd) < 0)
  618. netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
  619. "cannot remove the non-blocking flag from socket %d"
  620. , rrdhost_hostname(rpt->host)
  621. , rpt->client_ip, rpt->client_port
  622. , rpt->fd);
  623. struct timeval timeout;
  624. timeout.tv_sec = 600;
  625. timeout.tv_usec = 0;
  626. if (unlikely(setsockopt(rpt->fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) != 0))
  627. netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
  628. "cannot set timeout for socket %d"
  629. , rrdhost_hostname(rpt->host)
  630. , rpt->client_ip, rpt->client_port
  631. , rpt->fd);
  632. }
  633. rrdpush_receive_log_status(
  634. rpt, "connected and ready to receive data",
  636. #ifdef ENABLE_ACLK
  637. // in case we have cloud connection we inform cloud
  638. // new child connected
  639. if (netdata_cloud_enabled)
  640. aclk_host_state_update(rpt->host, 1, 1);
  641. #endif
  642. rrdhost_set_is_parent_label();
  643. if (is_ephemeral)
  644. rrdhost_option_set(rpt->host, RRDHOST_OPTION_EPHEMERAL_HOST);
  645. // let it reconnect to parent immediately
  646. rrdpush_reset_destinations_postpone_time(rpt->host);
  647. size_t count = streaming_parser(rpt, &cd, rpt->fd,
  648. #ifdef ENABLE_HTTPS
  649. (rpt->ssl.conn) ? &rpt->ssl : NULL
  650. #else
  651. NULL
  652. #endif
  653. );
  654. receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, false);
  655. {
  656. char msg[100 + 1];
  657. snprintfz(msg, sizeof(msg) - 1, "disconnected (completed %zu updates)", count);
  658. rrdpush_receive_log_status(
  659. rpt, msg,
  661. }
  662. #ifdef ENABLE_ACLK
  663. // in case we have cloud connection we inform cloud
  664. // a child disconnected
  665. if (netdata_cloud_enabled)
  666. aclk_host_state_update(rpt->host, 0, 1);
  667. #endif
  668. cleanup:
  669. ;
  670. }
  671. static void rrdpush_receiver_thread_cleanup(void *ptr) {
  672. struct receiver_state *rpt = (struct receiver_state *) ptr;
  673. worker_unregister();
  674. rrdhost_clear_receiver(rpt);
  675. netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
  676. "receive thread ended (task id %d)"
  677. , rpt->hostname ? rpt->hostname : "-"
  678. , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-"
  679. , gettid());
  680. receiver_state_free(rpt);
  681. rrdhost_set_is_parent_label();
  682. }
  683. static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
  684. struct receiver_state *rpt = ptr;
  685. if(!rpt)
  686. return false;
  687. stream_capabilities_to_string(wb, rpt->capabilities);
  688. return true;
  689. }
  690. static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
  691. struct receiver_state *rpt = ptr;
  692. if(!rpt)
  693. return false;
  694. #ifdef ENABLE_HTTPS
  695. buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
  696. #else
  697. buffer_strcat(wb, "http");
  698. #endif
  699. return true;
  700. }
  701. void *rrdpush_receiver_thread(void *ptr) {
  702. netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
  703. {
  704. worker_register("STREAMRCV");
  705. worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
  706. "received bytes", "bytes/s",
  708. worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
  709. "uncompressed bytes", "bytes/s",
  711. worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
  712. "replication completion", "%",
  714. struct receiver_state *rpt = (struct receiver_state *) ptr;
  715. rpt->tid = gettid();
  716. ND_LOG_STACK lgs[] = {
  717. ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
  718. ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
  719. ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
  720. ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
  721. ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
  722. ND_LOG_FIELD_END(),
  723. };
  724. ND_LOG_STACK_PUSH(lgs);
  725. netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
  726. , rpt->client_port);
  727. rrdpush_receive(rpt);
  728. }
  729. netdata_thread_cleanup_pop(1);
  730. return NULL;
  731. }