Browse Source

Re-write of SSL support in Netdata; restoration of SIGCHLD; detection of stale plugins; streaming improvements (#15113)

* add information about streaming connections to /api/v2/nodes; reset defer time when sender or receivers connect or disconnect

* make each streaming destination respect its SSL settings

* to not send SSL traffic over non-SSL connection

* keep track of outgoing streaming connection attempts

* retry SSL reads when SSL_read() returns SSL_ERROR_WANT_READ

* Revert "retry SSL reads when SSL_read() returns SSL_ERROR_WANT_READ"

This reverts commit 14c858677c6f2d3b08c94f298e2f45ecdb74c801.

* cleanup SSL connections properly

* initialize SSL in rpt before takeover

* sender should free SSL when talking to a non-SSL destination

* do not shutdown SSL when receiver exits

* restore operation of SIGCHLD when the reaper is not enabled

* create an fgets function that checks for data and times out

* work on error handling of plugins exiting

* remove newlines from logs

* global call to waitid(), caching the result for netdata_pclose() to process

* receiver tid

* parser timeouts in 2 minutes instead of 10

* fix crash when UUID is NULL in SQLite

* abstract sqlite3 parsing for uuid and text

* write proper ssl errors on read and write

* fix for SSL_ERROR_WANT_RETRY_VERIFY

* SSL WANT per function

* unified SSL error logging

* fix compilation warning

* additional logging about parser cleanup

* streaming parser should call the pluginsd parser cleanup

* SSL error handling work

* SSL initialization unification

* check for pending data when receiving SSL response with timeout

* macro to check if an SSL connection has been established

* remove SSL_pending()

* check for SSL macros

* use SSL_peek() to find if there is a response

* SSL renames

* more SSL renames & cleanup

* rrdpush ssl connection function

* abstract all SSL functions into security.c

* keep track of SSL connections and always attempt to use SSL read/write when on SSL connection

* signal openssl to skip certificate validation when configured to do so

* better SSL error handling and logging

* SSL code cleanup

* SSL retry on SSL_connect and SSL_accept

* SSL provide default return value for old compilers

* SSL read/write functions emulate system read/write functions

* fix receive/send timeout and switch from SSL_peek() to SSL_pending()

* remove SSL_pending()

* removed sender auto-retry and debug info for initial recevier response

* ssl skip certificate verification config for web server

* ssl errors log ip and port of the peer

* keep ssl with web_client for its whole lifetime

* thread safe socket peers to text

* use error_limit() for common ssl errors

* cleanup

* more cleanup

* coverity fixes

* ssl error logs include both local and remote ip/port info

* remove obsolete code
Costa Tsaousis 1 year ago
parent
commit
66c8546019

+ 1 - 1
aclk/https_client.c

@@ -528,7 +528,7 @@ int https_request(https_req_t *request, https_req_response_t *response) {
     }
     ctx->request = request;
 
-    ctx->ssl_ctx = security_initialize_openssl_client();
+    ctx->ssl_ctx = netdata_ssl_create_client_ctx(0);
     if (ctx->ssl_ctx==NULL) {
         error("Cannot allocate SSL context");
         goto exit_sock;

+ 1 - 1
collectors/plugins.d/plugins_d.c

@@ -72,7 +72,7 @@ static void pluginsd_worker_thread_cleanup(void *arg)
             info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
                  rrdhost_hostname(cd->host), pid);
 
-            waitid(P_PID, (id_t)pid, &info, WEXITED);
+            netdata_waitid(P_PID, (id_t)pid, &info, WEXITED);
         }
     }
 }

+ 1 - 0
collectors/plugins.d/plugins_d.h

@@ -87,6 +87,7 @@ struct plugind {
 extern struct plugind *pluginsd_root;
 
 size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
+void pluginsd_process_thread_cleanup(void *ptr);
 
 size_t pluginsd_initialize_plugin_directories();
 

+ 17 - 13
collectors/plugins.d/pluginsd_parser.c

@@ -11,10 +11,10 @@ static int send_to_plugin(const char *txt, void *data) {
         return 0;
 
 #ifdef ENABLE_HTTPS
-    struct netdata_ssl *ssl = parser->ssl_output;
+    NETDATA_SSL *ssl = parser->ssl_output;
     if(ssl) {
-        if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
-            return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt));
+        if(SSL_connection(ssl))
+            return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt));
 
         error("PLUGINSD: cannot send command (SSL)");
         return -1;
@@ -108,11 +108,12 @@ void pluginsd_rrdset_cleanup(RRDSET *st) {
     st->pluginsd.pos = 0;
 }
 
-static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
+static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) {
     PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
 
     if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
-        error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+        if(stale)
+            error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
               rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
     }
 
@@ -120,9 +121,16 @@ static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const
         ml_chart_update_end(u->st);
         u->v2.ml_locked = false;
 
-        error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+        if(stale)
+            error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
               rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
     }
+}
+
+static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
+    PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+
+    pluginsd_unlock_previous_chart(user, keyword, true);
 
     if(st) {
         size_t dims = dictionary_entries(st->rrddim_root_index);
@@ -1783,12 +1791,7 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
     // ------------------------------------------------------------------------
     // unblock data collection
 
-    ml_chart_update_end(st);
-    u->v2.ml_locked = false;
-
-    timing_step(TIMING_STEP_END2_ML);
-
-    pluginsd_unlock_rrdset_data_collection(user);
+    pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false);
     rrdcontext_collected_rrdset(st);
     store_metric_collection_completed();
 
@@ -1823,13 +1826,14 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
     return PARSER_RC_OK;
 }
 
-static void pluginsd_process_thread_cleanup(void *ptr) {
+void pluginsd_process_thread_cleanup(void *ptr) {
     PARSER *parser = (PARSER *)ptr;
 
     pluginsd_cleanup_v2(parser->user);
     pluginsd_host_define_cleanup(parser->user);
 
     rrd_collector_finished();
+
     parser_destroy(parser);
 }
 

+ 1 - 1
collectors/tc.plugin/plugin_tc.c

@@ -864,7 +864,7 @@ static void tc_main_cleanup(void *ptr) {
             siginfo_t info;
 
             collector_info("TC: waiting for tc plugin child process pid %d to exit...", tc_child_pid);
-            waitid(P_PID, (id_t) tc_child_pid, &info, WEXITED);
+            netdata_waitid(P_PID, (id_t) tc_child_pid, &info, WEXITED);
         }
 
         tc_child_pid = 0;

+ 6 - 2
daemon/analytics.c

@@ -375,8 +375,12 @@ void analytics_https(void)
     BUFFER *b = buffer_create(30, NULL);
 #ifdef ENABLE_HTTPS
     analytics_exporting_connectors_ssl(b);
-    buffer_strcat(b, netdata_ssl_client_ctx && rrdhost_flag_check(localhost, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) && localhost->sender->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE ? "streaming|" : "|");
-    buffer_strcat(b, netdata_ssl_srv_ctx ? "web" : "");
+
+    buffer_strcat(b, netdata_ssl_streaming_sender_ctx &&
+                     rrdhost_flag_check(localhost, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) &&
+                     SSL_connection(&localhost->sender->ssl) ? "streaming|" : "|");
+
+    buffer_strcat(b, netdata_ssl_web_server_ctx ? "web" : "");
 #else
     buffer_strcat(b, "||");
 #endif

+ 2 - 2
daemon/main.c

@@ -482,7 +482,7 @@ void netdata_cleanup_and_exit(int ret) {
 
 #ifdef ENABLE_HTTPS
     delta_shutdown_time("free openssl structures");
-    security_clean_openssl();
+    netdata_ssl_cleanup();
 #endif
 
     delta_shutdown_time("remove incomplete shutdown file");
@@ -834,7 +834,7 @@ static void security_init(){
     tls_version    = config_get(CONFIG_SECTION_WEB, "tls version",  "1.3");
     tls_ciphers    = config_get(CONFIG_SECTION_WEB, "tls ciphers",  "none");
 
-    security_openssl_library();
+    netdata_ssl_initialize_openssl();
 }
 #endif
 

+ 33 - 65
daemon/signals.c

@@ -2,8 +2,6 @@
 
 #include "common.h"
 
-static int reaper_enabled = 0;
-
 typedef enum signal_action {
     NETDATA_SIGNAL_END_OF_LIST,
     NETDATA_SIGNAL_IGNORE,
@@ -78,16 +76,6 @@ void signals_init(void) {
     struct sigaction sa;
     sa.sa_flags = 0;
 
-    // Enable process tracking / reaper if running as init (pid == 1).
-    // This prevents zombie processes when running in a container.
-    if (getpid() == 1) {
-        info("SIGNAL: Enabling reaper");
-        netdata_popen_tracking_init();
-        reaper_enabled = 1;
-    } else {
-        info("SIGNAL: Not enabling reaper");
-    }
-
     // ignore all signals while we run in a signal handler
     sigfillset(&sa.sa_mask);
 
@@ -97,10 +85,6 @@ void signals_init(void) {
         case NETDATA_SIGNAL_IGNORE:
             sa.sa_handler = SIG_IGN;
             break;
-        case NETDATA_SIGNAL_CHILD:
-            if (reaper_enabled == 0)
-                continue;
-            // FALLTHROUGH
         default:
             sa.sa_handler = signal_handler;
             break;
@@ -115,9 +99,6 @@ void signals_restore_SIGCHLD(void)
 {
     struct sigaction sa;
 
-    if (reaper_enabled == 0)
-        return;
-
     sa.sa_flags = 0;
     sigfillset(&sa.sa_mask);
     sa.sa_handler = signal_handler;
@@ -137,9 +118,6 @@ void signals_reset(void) {
         if(sigaction(signals_waiting[i].signo, &sa, NULL) == -1)
             error("SIGNAL: Failed to reset signal handler for: %s", signals_waiting[i].name);
     }
-
-    if (reaper_enabled == 1)
-        netdata_popen_tracking_cleanup();
 }
 
 // reap_child reaps the child identified by pid.
@@ -147,39 +125,42 @@ static void reap_child(pid_t pid) {
     siginfo_t i;
 
     errno = 0;
-    debug(D_CHILDS, "SIGNAL: Reaping pid: %d...", pid);
-    if (waitid(P_PID, (id_t)pid, &i, WEXITED|WNOHANG) == -1) {
+    debug(D_CHILDS, "SIGNAL: reap_child(%d)...", pid);
+    if (netdata_waitid(P_PID, (id_t)pid, &i, WEXITED|WNOHANG) == -1) {
         if (errno != ECHILD)
-            error("SIGNAL: Failed to wait for: %d", pid);
+            error("SIGNAL: waitid(%d): failed to wait for child", pid);
         else
-            debug(D_CHILDS, "SIGNAL: Already reaped: %d", pid);
+            info("SIGNAL: waitid(%d): failed - it seems the child is already reaped", pid);
         return;
-    } else if (i.si_pid == 0) {
+    }
+    else if (i.si_pid == 0) {
         // Process didn't exit, this shouldn't happen.
+        error("SIGNAL: waitid(%d): reports pid 0 - child has not exited", pid);
         return;
     }
 
     switch (i.si_code) {
-    case CLD_EXITED:
-        debug(D_CHILDS, "SIGNAL: Child %d exited: %d", pid, i.si_status);
-        break;
-    case CLD_KILLED:
-        debug(D_CHILDS, "SIGNAL: Child %d killed by signal: %d", pid, i.si_status);
-        break;
-    case CLD_DUMPED:
-        debug(D_CHILDS, "SIGNAL: Child %d dumped core by signal: %d", pid, i.si_status);
-        break;
-    case CLD_STOPPED:
-        debug(D_CHILDS, "SIGNAL: Child %d stopped by signal: %d", pid, i.si_status);
-        break;
-    case CLD_TRAPPED:
-        debug(D_CHILDS, "SIGNAL: Child %d trapped by signal: %d", pid, i.si_status);
-        break;
-    case CLD_CONTINUED:
-        debug(D_CHILDS, "SIGNAL: Child %d continued by signal: %d", pid, i.si_status);
-        break;
-    default:
-        debug(D_CHILDS, "SIGNAL: Child %d gave us a SIGCHLD with code %d and status %d.", pid, i.si_code, i.si_status);
+        case CLD_EXITED:
+            info("SIGNAL: reap_child(%d) exited with code: %d", pid, i.si_status);
+            break;
+        case CLD_KILLED:
+            info("SIGNAL: reap_child(%d) killed by signal: %d", pid, i.si_status);
+            break;
+        case CLD_DUMPED:
+            info("SIGNAL: reap_child(%d) dumped core by signal: %d", pid, i.si_status);
+            break;
+        case CLD_STOPPED:
+            info("SIGNAL: reap_child(%d) stopped by signal: %d", pid, i.si_status);
+            break;
+        case CLD_TRAPPED:
+            info("SIGNAL: reap_child(%d) trapped by signal: %d", pid, i.si_status);
+            break;
+        case CLD_CONTINUED:
+            info("SIGNAL: reap_child(%d) continued by signal: %d", pid, i.si_status);
+            break;
+        default:
+            info("SIGNAL: reap_child(%d) gave us a SIGCHLD with code %d and status %d.", pid, i.si_code, i.si_status);
+            break;
     }
 }
 
@@ -187,25 +168,13 @@ static void reap_child(pid_t pid) {
 static void reap_children() {
     siginfo_t i;
 
-    while (1 == 1) {
-        // Identify which process caused the signal so we can determine
-        // if we need to reap a re-parented process.
+    while(1) {
         i.si_pid = 0;
-        if (waitid(P_ALL, (id_t)0, &i, WEXITED|WNOHANG|WNOWAIT) == -1) {
-            if (errno != ECHILD) // This shouldn't happen with WNOHANG but does.
-                error("SIGNAL: Failed to wait");
-            return;
-        } else if (i.si_pid == 0) {
-            // No child exited.
+        if (netdata_waitid(P_ALL, (id_t)0, &i, WEXITED|WNOHANG|WNOWAIT) == -1 || i.si_pid == 0)
+            // nothing to do
             return;
-        } else if (netdata_popen_tracking_pid_shoud_be_reaped(i.si_pid) == 0) {
-            // myp managed, sleep for a short time to avoid busy wait while
-            // this is handled by myp.
-            usleep(10000);
-        } else {
-            // Unknown process, likely a re-parented child, reap it.
-            reap_child(i.si_pid);
-        }
+
+        reap_child(i.si_pid);
     }
 }
 
@@ -267,7 +236,6 @@ void signals_handle(void) {
                                 break;
 
                             case NETDATA_SIGNAL_CHILD:
-                                debug(D_CHILDS, "SIGNAL: Received %s. Reaping...", name);
                                 reap_children();
                                 break;
 

+ 46 - 2
database/contexts/api_v2.c

@@ -352,6 +352,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu
                 buffer_json_member_add_string_or_empty(wb, "osVersion", host->system_info->host_os_version);
             }
 
+            time_t now = now_realtime_sec();
             buffer_json_member_add_object(wb, "status");
 
             size_t receiver_hops = host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1;
@@ -359,12 +360,55 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu
             buffer_json_member_add_uint64(wb, "hops", receiver_hops);
             buffer_json_member_add_boolean(wb, "online", host == localhost || !rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED));
             buffer_json_member_add_boolean(wb, "replicating", rrdhost_receiver_replicating_charts(host));
+            if(host != localhost && host->receiver) {
+                buffer_json_member_add_object(wb, "source");
+
+                char buf[1024 + 1];
+                snprintfz(buf, 1024, "%s:%s", host->receiver->client_ip ? host->receiver->client_ip : "", host->receiver->client_port ? host->receiver->client_port : "");
+                buffer_json_member_add_string(wb, "connection", buf);
+                stream_capabilities_to_json_array(wb, host->receiver->capabilities, "capabilities");
+
+                buffer_json_object_close(wb);
+            }
             buffer_json_object_close(wb); // collection
 
+            bool sender_connected = rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
             buffer_json_member_add_object(wb, "streaming");
             buffer_json_member_add_uint64(wb, "hops", host->sender ? host->sender->hops : receiver_hops + 1);
-            buffer_json_member_add_boolean(wb, "online", rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED));
-            buffer_json_member_add_boolean(wb, "replicating", rrdhost_sender_replicating_charts(host));
+            buffer_json_member_add_boolean(wb, "online", sender_connected);
+            buffer_json_member_add_boolean(wb, "replicating", sender_connected && rrdhost_sender_replicating_charts(host));
+
+            if(host->sender) {
+                buffer_json_member_add_object(wb, "destination");
+                buffer_json_member_add_string(wb, "connected_to", sender_connected ? host->sender->connected_to : "");
+                stream_capabilities_to_json_array(wb, sender_connected ? host->sender->capabilities : 0, "capabilities");
+
+                buffer_json_member_add_array(wb, "candidates");
+                struct rrdpush_destinations *d;
+                for(d = host->destinations ; d ; d = d->next) {
+                    buffer_json_add_array_item_object(wb);
+
+                    if(d->ssl) {
+                        char buf[1024 + 1];
+                        snprintfz(buf, 1024, "%s:SSL", string2str(d->destination));
+                        buffer_json_member_add_string(wb, "destination", buf);
+                    }
+                    else
+                        buffer_json_member_add_string(wb, "destination", string2str(d->destination));
+
+                    buffer_json_member_add_time_t(wb, "last_check", d->last_attempt);
+                    buffer_json_member_add_time_t(wb, "last_check_secs_ago", now - d->last_attempt);
+                    buffer_json_member_add_string(wb, "last_error", d->last_error);
+                    buffer_json_member_add_string(wb, "last_handshake", stream_handshake_error_to_string(d->last_handshake));
+                    buffer_json_member_add_time_t(wb, "next_check", d->postpone_reconnection_until);
+                    buffer_json_member_add_time_t(wb, "next_check_in_secs", (d->postpone_reconnection_until > now) ? d->postpone_reconnection_until - now : 0);
+                    buffer_json_object_close(wb);
+                }
+                buffer_json_array_close(wb);
+
+                buffer_json_object_close(wb); // destination
+            }
+
             buffer_json_object_close(wb); // streaming
 
             buffer_json_object_close(wb); // status

+ 1 - 2
database/rrdhost.c

@@ -257,8 +257,7 @@ static void rrdhost_initialize_rrdpush_sender(RRDHOST *host,
         rrdhost_streaming_sender_structures_init(host);
 
 #ifdef ENABLE_HTTPS
-        host->sender->ssl.conn = NULL;
-        host->sender->ssl.flags = NETDATA_SSL_START;
+        host->sender->ssl = NETDATA_SSL_UNSET_CONNECTION;
 #endif
 
         host->rrdpush_send_destination = strdupz(rrdpush_destination);

Some files were not shown because too many files changed in this diff