Browse Source

fix receiver deadlock (#18440)

Costa Tsaousis 6 months ago
parent
commit
1d1c3a8062
2 changed files with 45 additions and 47 deletions
  1. 39 41
      src/streaming/receiver.c
  2. 6 6
      src/streaming/sender.c

+ 39 - 41
src/streaming/receiver.c

@@ -460,46 +460,50 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
 
 static void rrdhost_clear_receiver(struct receiver_state *rpt) {
     RRDHOST *host = rpt->host;
-    if(host) {
-        bool signal_rrdcontext = false;
-        spinlock_lock(&host->receiver_lock);
+    if(!host) return;
 
+    spinlock_lock(&host->receiver_lock);
+    {
         // Make sure that we detach this thread and don't kill a freshly arriving receiver
-        if(host->receiver == rpt) {
+
+        if (host->receiver == rpt) {
+            spinlock_unlock(&host->receiver_lock);
+            {
+                // run all these without having the receiver lock
+
+                stream_path_child_disconnected(host);
+                rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
+                rrdpush_receiver_replication_reset(host);
+                rrdcontext_host_child_disconnected(host);
+
+                if (rpt->config.health_enabled)
+                    rrdcalc_child_disconnected(host);
+
+                rrdpush_reset_destinations_postpone_time(host);
+            }
+            spinlock_lock(&host->receiver_lock);
+
+            // now we have the lock again
+
             __atomic_sub_fetch(&localhost->connected_children_count, 1, __ATOMIC_RELAXED);
             rrdhost_flag_set(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
 
-            pluginsd_process_cleanup(rpt->parser);
-            __atomic_store_n(&rpt->parser, NULL, __ATOMIC_RELAXED);
-
             host->trigger_chart_obsoletion_check = 0;
             host->child_connect_time = 0;
             host->child_disconnected_time = now_realtime_sec();
-
             host->health.health_enabled = 0;
 
-            rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, false);
-
-            signal_rrdcontext = true;
-            rrdpush_receiver_replication_reset(host);
-
+            host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
             rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
             host->receiver = NULL;
-            host->rrdpush_last_receiver_exit_reason = rpt->exit.reason;
-
-            if(rpt->config.health_enabled)
-                rrdcalc_child_disconnected(host);
-
-            stream_path_child_disconnected(host);
         }
+    }
 
-        spinlock_unlock(&host->receiver_lock);
-
-        if(signal_rrdcontext)
-            rrdcontext_host_child_disconnected(host);
+    // this must be cleared with the receiver lock
+    pluginsd_process_cleanup(rpt->parser);
+    __atomic_store_n(&rpt->parser, NULL, __ATOMIC_RELAXED);
 
-        rrdpush_reset_destinations_postpone_time(host);
-    }
+    spinlock_unlock(&host->receiver_lock);
 }
 
 bool stop_streaming_receiver(RRDHOST *host, STREAM_HANDSHAKE reason) {
@@ -859,21 +863,6 @@ cleanup:
     ;
 }
 
-static void rrdpush_receiver_thread_cleanup(void *pptr) {
-    struct receiver_state *rpt = CLEANUP_FUNCTION_GET_PTR(pptr);
-    if(!rpt) return;
-
-    netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
-         "receive thread ended (task id %d)"
-         , rpt->hostname ? rpt->hostname : "-"
-         , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached());
-
-    worker_unregister();
-    rrdhost_clear_receiver(rpt);
-    receiver_state_free(rpt);
-    rrdhost_set_is_parent_label();
-}
-
 static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
     struct receiver_state *rpt = ptr;
     if(!rpt)
@@ -893,7 +882,6 @@ static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
 }
 
 void *rrdpush_receiver_thread(void *ptr) {
-    CLEANUP_FUNCTION_REGISTER(rrdpush_receiver_thread_cleanup) cleanup_ptr = ptr;
     worker_register("STREAMRCV");
 
     worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
@@ -925,5 +913,15 @@ void *rrdpush_receiver_thread(void *ptr) {
                      , rpt->client_port);
 
     rrdpush_receive(rpt);
+
+    netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
+                     "receive thread ended (task id %d)"
+                     , rpt->hostname ? rpt->hostname : "-"
+                     , rpt->client_ip ? rpt->client_ip : "-", rpt->client_port ? rpt->client_port : "-", gettid_cached());
+
+    worker_unregister();
+    rrdhost_clear_receiver(rpt);
+    receiver_state_free(rpt);
+    rrdhost_set_is_parent_label();
     return NULL;
 }

+ 6 - 6
src/streaming/sender.c

@@ -731,9 +731,9 @@ void *rrdpush_sender_thread(void *ptr) {
 
     netdata_log_info("STREAM %s [send]: sending thread exits %s",
                      rrdhost_hostname(s->host),
-                     s->host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(s->host->sender->exit.reason) : "");
+                     s->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(s->exit.reason) : "");
 
-    sender_lock(s->host->sender);
+    sender_lock(s);
     {
         rrdpush_sender_thread_close_socket(s);
         rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, false);
@@ -742,13 +742,13 @@ void *rrdpush_sender_thread(void *ptr) {
         rrdhost_clear_sender___while_having_sender_mutex(s->host);
 
 #ifdef NETDATA_LOG_STREAM_SENDER
-        if (s->host->sender->stream_log_fp) {
-            fclose(s->host->sender->stream_log_fp);
-            s->host->sender->stream_log_fp = NULL;
+        if (s->stream_log_fp) {
+            fclose(s->stream_log_fp);
+            s->stream_log_fp = NULL;
         }
 #endif
     }
-    sender_unlock(s->host->sender);
+    sender_unlock(s);
 
     freez(pipe_buffer);
     freez(s);