Просмотр исходного кода

Fixes the race-hazard in streaming during the shutdown sequence (#9370)

The streaming component detects when a receiver stream has closed, and stops an attached sender on the same host. This is to support proxy configurations where the stream is passed through. During the shutdown sequence, once netdata_exit has been set no thread should touch any RRDHOST structure as the non-static threads are not joined before the database shuts down.

The destruction of the thread state has been separated from the cleanup and can be called from two points. If the thread can detach itself from the host (i.e. it is not during the shutdown sequence) then it does so and destroys the state. During shutdown the thread leaves the state intact so that it can be destroyed during the host destruction, and the host destruction now cancels the thread to ensure a consistent sequence of events.
Andrew Moss 4 лет назад
Родитель
Сommit
85752833ad
4 измененных файлов с 66 добавлено и 40 удалено
  1. 15 1
      database/rrdhost.c
  2. 47 36
      streaming/receiver.c
  3. 1 2
      streaming/rrdpush.c
  4. 3 1
      streaming/rrdpush.h

+ 15 - 1
database/rrdhost.c

@@ -666,6 +666,7 @@ void rrdhost_system_info_free(struct rrdhost_system_info *system_info) {
     }
 }
 
+void destroy_receiver_state(struct receiver_state *rpt);
 void rrdhost_free(RRDHOST *host) {
     if(!host) return;
 
@@ -674,12 +675,25 @@ void rrdhost_free(RRDHOST *host) {
     rrd_check_wrlock();     // make sure the RRDs are write locked
 
     // ------------------------------------------------------------------------
-    // clean up the sender
+    // clean up streaming
     rrdpush_sender_thread_stop(host); // stop a possibly running thread
     cbuffer_free(host->sender->buffer);
     buffer_free(host->sender->build);
     freez(host->sender);
     host->sender = NULL;
+    if (netdata_exit) {
+        netdata_mutex_lock(&host->receiver_lock);
+        if (host->receiver) {
+            if (!host->receiver->exited)
+                netdata_thread_cancel(host->receiver->thread);
+            while (!host->receiver->exited)
+                sleep_usec(50 * USEC_PER_MS);
+            destroy_receiver_state(host->receiver);
+        }
+        netdata_mutex_unlock(&host->receiver_lock);
+    }
+
+
 
     rrdhost_wrlock(host);   // lock this RRDHOST
 

+ 47 - 36
streaming/receiver.c

@@ -4,14 +4,40 @@
 
 extern struct config stream_config;
 
+void destroy_receiver_state(struct receiver_state *rpt) {
+    freez(rpt->key);
+    freez(rpt->hostname);
+    freez(rpt->registry_hostname);
+    freez(rpt->machine_guid);
+    freez(rpt->os);
+    freez(rpt->timezone);
+    freez(rpt->tags);
+    freez(rpt->client_ip);
+    freez(rpt->client_port);
+    freez(rpt->program_name);
+    freez(rpt->program_version);
+#ifdef ENABLE_HTTPS
+    if(rpt->ssl.conn){
+        SSL_free(rpt->ssl.conn);
+    }
+#endif
+    freez(rpt);
+}
+
 static void rrdpush_receiver_thread_cleanup(void *ptr) {
     static __thread int executed = 0;
     if(!executed) {
         executed = 1;
         struct receiver_state *rpt = (struct receiver_state *) ptr;
+        // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
+        // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
+        if (netdata_exit && rpt->host) {
+            rpt->exited = 1;
+            return;
+        }
 
         // Make sure that we detach this thread and don't kill a freshly arriving receiver
-        if (rpt->host) {
+        if (!netdata_exit && rpt->host) {
             netdata_mutex_lock(&rpt->host->receiver_lock);
             if (rpt->host->receiver == rpt)
                 rpt->host->receiver = NULL;
@@ -19,25 +45,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
         }
 
         info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
-
-        freez(rpt->key);
-        freez(rpt->hostname);
-        freez(rpt->registry_hostname);
-        freez(rpt->machine_guid);
-        freez(rpt->os);
-        freez(rpt->timezone);
-        freez(rpt->tags);
-        freez(rpt->client_ip);
-        freez(rpt->client_port);
-        freez(rpt->program_name);
-        freez(rpt->program_version);
-#ifdef ENABLE_HTTPS
-        if(rpt->ssl.conn){
-            SSL_free(rpt->ssl.conn);
-        }
-#endif
-        freez(rpt);
-
+        destroy_receiver_state(rpt);
     }
 }
 
@@ -413,26 +421,29 @@ static int rrdpush_receive(struct receiver_state *rpt)
 
 
     size_t count = streaming_parser(rpt, &cd, fp);
-    //size_t count = pluginsd_process(host, &cd, fp, 1);
-
-    log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "DISCONNECTED");
-    error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->host->hostname, rpt->client_ip, rpt->client_port, count);
-
-    netdata_mutex_lock(&rpt->host->receiver_lock);
-    if (rpt->host->receiver == rpt) {
-        rrdhost_wrlock(rpt->host);
-        rpt->host->senders_disconnected_time = now_realtime_sec();
-        rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
-        if(health_enabled == CONFIG_BOOLEAN_AUTO)
-            rpt->host->health_enabled = 0;
-        rrdhost_unlock(rpt->host);
-        rrdpush_sender_thread_stop(rpt->host);
+
+    log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+                          "DISCONNECTED");
+    error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
+          rpt->client_port, count);
+
+    // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
+    if (!netdata_exit && rpt->host) {
+        netdata_mutex_lock(&rpt->host->receiver_lock);
+        if (rpt->host->receiver == rpt) {
+            rrdhost_wrlock(rpt->host);
+            rpt->host->senders_disconnected_time = now_realtime_sec();
+            rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
+            if(health_enabled == CONFIG_BOOLEAN_AUTO)
+                rpt->host->health_enabled = 0;
+            rrdhost_unlock(rpt->host);
+            rrdpush_sender_thread_stop(rpt->host);
+        }
+        netdata_mutex_unlock(&rpt->host->receiver_lock);
     }
-    netdata_mutex_unlock(&rpt->host->receiver_lock);
 
     // cleanup
     fclose(fp);
-
     return (int)count;
 }
 

+ 1 - 2
streaming/rrdpush.c

@@ -673,14 +673,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
     }
 
 
-    netdata_thread_t thread;
 
     debug(D_SYSTEM, "starting STREAM receive thread.");
 
     char tag[FILENAME_MAX + 1];
     snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
 
-    if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+    if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
         error("Failed to create new STREAM receive thread for client.");
 
     // prevent the caller from closing the streaming socket

+ 3 - 1
streaming/rrdpush.h

@@ -64,6 +64,7 @@ struct sender_state {
 
 struct receiver_state {
     RRDHOST *host;
+    netdata_thread_t thread;
     int fd;
     char *key;
     char *hostname;
@@ -85,7 +86,8 @@ struct receiver_state {
 #ifdef ENABLE_HTTPS
     struct netdata_ssl ssl;
 #endif
-    unsigned int shutdown:1;
+    unsigned int shutdown:1;    // Tell the thread to exit
+    unsigned int exited;      // Indicates that the thread has exited  (NOT A BITFIELD!)
 };