Browse Source

remove deadlock from sender (#18438)

* remove deadlock from sender

* rrdpush_sender_thread_close_socket() takes sender state as param
Costa Tsaousis 6 months ago
parent
commit
3465ff45c6

+ 51 - 64
src/streaming/sender.c

@@ -130,22 +130,14 @@ void rrdpush_sender_after_connect(RRDHOST *host) {
     rrdpush_sender_thread_send_custom_host_variables(host);
 }
 
-void rrdpush_sender_disconnect_and_cleanup(RRDHOST *host) {
-    rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
-
-    rrdpush_sender_thread_close_socket(host);
-
+static void rrdpush_sender_on_disconnect(RRDHOST *host) {
     // we have been connected to this parent - let's cleanup
 
-    // do not flush the circular buffer here
-    // this function is called sometimes with the sender lock, sometimes without the lock
-
     rrdpush_sender_charts_and_replication_reset(host);
 
     // clear the parent's claim id
     rrdpush_sender_clear_parent_claim_id(host);
     rrdpush_receiver_send_node_and_claim_id_to_child(host);
-
     stream_path_parent_disconnected(host);
 }
 
@@ -179,7 +171,7 @@ static ssize_t attempt_to_send(struct sender_state *s) {
         worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR);
         netdata_log_debug(D_STREAM, "STREAM: Send failed - closing socket...");
         netdata_log_error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.",  rrdhost_hostname(s->host), s->connected_to, s->sent_bytes_on_this_connection);
-        rrdpush_sender_disconnect_and_cleanup(s->host);
+        rrdpush_sender_thread_close_socket(s);
     }
     else
         netdata_log_debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
@@ -217,16 +209,11 @@ static ssize_t attempt_read(struct sender_state *s) {
         netdata_log_error("STREAM %s [send to %s]: error during receive (%zd) - closing connection.", rrdhost_hostname(s->host), s->connected_to, ret);
     }
 
-    rrdpush_sender_disconnect_and_cleanup(s->host);
+    rrdpush_sender_thread_close_socket(s);
 
     return ret;
 }
 
-struct rrdpush_sender_thread_data {
-    RRDHOST *host;
-    char *pipe_buffer;
-};
-
 static bool rrdpush_sender_pipe_close(RRDHOST *host, int *pipe_fds, bool reopen) {
     static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
 
@@ -354,38 +341,6 @@ static bool rrdhost_sender_should_exit(struct sender_state *s) {
     return false;
 }
 
-static void rrdpush_sender_thread_cleanup_callback(void *pptr) {
-    struct rrdpush_sender_thread_data *s = CLEANUP_FUNCTION_GET_PTR(pptr);
-    if(!s) return;
-
-    worker_unregister();
-
-    RRDHOST *host = s->host;
-
-    sender_lock(host->sender);
-    netdata_log_info("STREAM %s [send]: sending thread exits %s",
-         rrdhost_hostname(host),
-         host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
-
-    rrdpush_sender_disconnect_and_cleanup(host);
-    rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
-    rrdpush_sender_execute_commands_cleanup(host->sender);
-
-    rrdhost_clear_sender___while_having_sender_mutex(host);
-
-#ifdef NETDATA_LOG_STREAM_SENDER
-    if(host->sender->stream_log_fp) {
-        fclose(host->sender->stream_log_fp);
-        host->sender->stream_log_fp = NULL;
-    }
-#endif
-
-    sender_unlock(host->sender);
-
-    freez(s->pipe_buffer);
-    freez(s);
-}
-
 void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
     static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
     spinlock_lock(&sp);
@@ -546,12 +501,9 @@ void *rrdpush_sender_thread(void *ptr) {
         return NULL;
     }
 
-    struct rrdpush_sender_thread_data *thread_data = callocz(1, sizeof(struct rrdpush_sender_thread_data));
-    thread_data->pipe_buffer = mallocz(pipe_buffer_size);
-    thread_data->host = s->host;
-
-    CLEANUP_FUNCTION_REGISTER(rrdpush_sender_thread_cleanup_callback) cleanup_ptr = thread_data;
+    char *pipe_buffer = mallocz(pipe_buffer_size);
 
+    bool was_connected = false;
     size_t iterations = 0;
     time_t now_s = now_monotonic_sec();
     while(!rrdhost_sender_should_exit(s)) {
@@ -559,6 +511,11 @@ void *rrdpush_sender_thread(void *ptr) {
 
         // The connection attempt blocks (after which we use the socket in nonblocking)
         if(unlikely(s->rrdpush_sender_socket == -1)) {
+            if(was_connected) {
+                rrdpush_sender_on_disconnect(s->host);
+                was_connected = false;
+            }
+
             worker_is_busy(WORKER_SENDER_JOB_CONNECT);
 
             now_s = now_monotonic_sec();
@@ -583,6 +540,7 @@ void *rrdpush_sender_thread(void *ptr) {
             rrdpush_send_host_labels(s->host);
             rrdpush_send_global_functions(s->host);
             s->replication.oldest_request_after_t = 0;
+            was_connected = true;
 
             rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
 
@@ -603,7 +561,7 @@ void *rrdpush_sender_thread(void *ptr) {
         )) {
             worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
             netdata_log_error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", rrdhost_hostname(s->host), s->connected_to, s->timeout, s->sent_bytes_on_this_connection, s->send_attempts);
-            rrdpush_sender_disconnect_and_cleanup(s->host);
+            rrdpush_sender_thread_close_socket(s);
             continue;
         }
 
@@ -632,9 +590,9 @@ void *rrdpush_sender_thread(void *ptr) {
 
         if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
             if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {
-                netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. Disabling streaming.",
-                      rrdhost_hostname(s->host));
-                rrdpush_sender_disconnect_and_cleanup(s->host);
+                netdata_log_error("STREAM %s [send]: cannot create inter-thread communication pipe. "
+                                  "Disabling streaming.", rrdhost_hostname(s->host));
+                rrdpush_sender_thread_close_socket(s);
                 break;
             }
         }
@@ -685,7 +643,7 @@ void *rrdpush_sender_thread(void *ptr) {
             worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR);
             netdata_log_error("STREAM %s [send to %s]: failed to poll(). Closing socket.", rrdhost_hostname(s->host), s->connected_to);
             rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
-            rrdpush_sender_disconnect_and_cleanup(s->host);
+            rrdpush_sender_thread_close_socket(s);
             continue;
         }
 
@@ -704,7 +662,7 @@ void *rrdpush_sender_thread(void *ptr) {
             worker_is_busy(WORKER_SENDER_JOB_PIPE_READ);
             netdata_log_debug(D_STREAM, "STREAM: Data added to send buffer (current buffer chunk %zu bytes)...", outstanding);
 
-            if (read(fds[Collector].fd, thread_data->pipe_buffer, pipe_buffer_size) == -1)
+            if (read(fds[Collector].fd, pipe_buffer, pipe_buffer_size) == -1)
                 netdata_log_error("STREAM %s [send to %s]: cannot read from internal pipe.", rrdhost_hostname(s->host), s->connected_to);
         }
 
@@ -734,7 +692,7 @@ void *rrdpush_sender_thread(void *ptr) {
             if(error) {
                 rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true);
                 netdata_log_error("STREAM %s [send to %s]: restarting internal pipe: %s.",
-                      rrdhost_hostname(s->host), s->connected_to, error);
+                                  rrdhost_hostname(s->host), s->connected_to, error);
             }
         }
 
@@ -751,8 +709,8 @@ void *rrdpush_sender_thread(void *ptr) {
             if(unlikely(error)) {
                 worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR);
                 netdata_log_error("STREAM %s [send to %s]: restarting connection: %s - %zu bytes transmitted.",
-                      rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
-                rrdpush_sender_disconnect_and_cleanup(s->host);
+                                  rrdhost_hostname(s->host), s->connected_to, error, s->sent_bytes_on_this_connection);
+                rrdpush_sender_thread_close_socket(s);
             }
         }
 
@@ -761,12 +719,41 @@ void *rrdpush_sender_thread(void *ptr) {
             worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_OVERFLOW);
             errno_clear();
             netdata_log_error("STREAM %s [send to %s]: buffer full (allocated %zu bytes) after sending %zu bytes. Restarting connection",
-                  rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
-            rrdpush_sender_disconnect_and_cleanup(s->host);
+                              rrdhost_hostname(s->host), s->connected_to, s->buffer->size, s->sent_bytes_on_this_connection);
+            rrdpush_sender_thread_close_socket(s);
         }
 
         worker_set_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, (NETDATA_DOUBLE) dictionary_entries(s->replication.requests));
     }
 
+    if(was_connected)
+        rrdpush_sender_on_disconnect(s->host);
+
+    netdata_log_info("STREAM %s [send]: sending thread exits %s",
+                     rrdhost_hostname(s->host),
+                     s->host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(s->host->sender->exit.reason) : "");
+
+    sender_lock(s->host->sender);
+    {
+        rrdpush_sender_thread_close_socket(s);
+        rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, false);
+        rrdpush_sender_execute_commands_cleanup(s);
+
+        rrdhost_clear_sender___while_having_sender_mutex(s->host);
+
+#ifdef NETDATA_LOG_STREAM_SENDER
+        if (s->host->sender->stream_log_fp) {
+            fclose(s->host->sender->stream_log_fp);
+            s->host->sender->stream_log_fp = NULL;
+        }
+#endif
+    }
+    sender_unlock(s->host->sender);
+
+    freez(pipe_buffer);
+    freez(s);
+
+    worker_unregister();
+
     return NULL;
 }

+ 1 - 1
src/streaming/sender_commit.c

@@ -118,7 +118,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
 
                     worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
                     rrdpush_compression_deactivate(s);
-                    rrdpush_sender_disconnect_and_cleanup(s->host);
+                    rrdpush_sender_thread_close_socket(s);
                     sender_unlock(s);
                     return;
                 }

+ 19 - 14
src/streaming/sender_connect.c

@@ -2,13 +2,18 @@
 
 #include "sender_internals.h"
 
-void rrdpush_sender_thread_close_socket(RRDHOST *host) {
-    netdata_ssl_close(&host->sender->ssl);
+void rrdpush_sender_thread_close_socket(struct sender_state *s) {
+    rrdhost_flag_clear(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+
+    netdata_ssl_close(&s->ssl);
 
-    if(host->sender->rrdpush_sender_socket != -1) {
-        close(host->sender->rrdpush_sender_socket);
-        host->sender->rrdpush_sender_socket = -1;
+    if(s->rrdpush_sender_socket != -1) {
+        close(s->rrdpush_sender_socket);
+        s->rrdpush_sender_socket = -1;
     }
+
+    // do not flush the circular buffer here
+    // this function is called sometimes with the sender lock, sometimes without the lock
 }
 
 void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) {
@@ -204,7 +209,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
     int delay = stream_responses[i].postpone_reconnect_seconds;
 
     worker_is_busy(worker_job_id);
-    rrdpush_sender_thread_close_socket(host);
+    rrdpush_sender_thread_close_socket(s);
     host->destination->reason = version;
     host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
 
@@ -231,9 +236,9 @@ unsigned char alpn_proto_list[] = {
 
 #define CONN_UPGRADE_VAL "upgrade"
 
-static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
+static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
     RRDHOST *host = s->host;
-    bool ssl_required = host->destination && host->destination->ssl;
+    bool ssl_required = host && host->destination && host->destination->ssl;
 
     netdata_ssl_close(&host->sender->ssl);
 
@@ -251,7 +256,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
             ND_LOG_STACK_PUSH(lgs);
 
             worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
-            rrdpush_sender_thread_close_socket(host);
+            rrdpush_sender_thread_close_socket(s);
             host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
             host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
             return false;
@@ -269,7 +274,7 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
 
             worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
             netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
-            rrdpush_sender_thread_close_socket(host);
+            rrdpush_sender_thread_close_socket(s);
             host->destination->reason = STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE;
             host->destination->postpone_reconnection_until = now_realtime_sec() + 5 * 60;
             return false;
@@ -390,7 +395,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
     };
 
     // make sure the socket is closed
-    rrdpush_sender_thread_close_socket(host);
+    rrdpush_sender_thread_close_socket(s);
 
     s->rrdpush_sender_socket = connect_to_one_of_destinations(
         host
@@ -527,7 +532,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
         ND_LOG_STACK_PUSH(lgs);
 
         worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION);
-        rrdpush_sender_thread_close_socket(host);
+        rrdpush_sender_thread_close_socket(s);
         host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE;
         host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
         return false;
@@ -550,7 +555,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
         ND_LOG_STACK_PUSH(lgs);
 
         worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
-        rrdpush_sender_thread_close_socket(host);
+        rrdpush_sender_thread_close_socket(s);
 
         nd_log(NDLS_DAEMON, NDLP_ERR,
                "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.",
@@ -577,7 +582,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
         ND_LOG_STACK_PUSH(lgs);
 
         worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
-        rrdpush_sender_thread_close_socket(host);
+        rrdpush_sender_thread_close_socket(s);
 
         nd_log(NDLS_DAEMON, NDLP_ERR,
                "STREAM %s [send to %s]: remote netdata does not respond.",

+ 1 - 3
src/streaming/sender_internals.h

@@ -44,9 +44,7 @@ extern char *netdata_ssl_ca_file;
 bool attempt_to_connect(struct sender_state *state);
 void rrdpush_sender_on_connect(RRDHOST *host);
 void rrdpush_sender_after_connect(RRDHOST *host);
-void rrdpush_sender_thread_close_socket(RRDHOST *host);
-
-void rrdpush_sender_disconnect_and_cleanup(RRDHOST *host);
+void rrdpush_sender_thread_close_socket(struct sender_state *s);
 
 void rrdpush_sender_execute_commands_cleanup(struct sender_state *s);
 void rrdpush_sender_execute_commands(struct sender_state *s);