Browse Source

Fix Coverity defects 359164, 359165 and 358989. (#9268)

Removed uses of the host lock that could deadlock senders and replaced with the new fine-grained mutex.
Andrew Moss 4 years ago
parent
commit
ea7d7ea31d
4 changed files with 11 additions and 14 deletions
  1. 1 1
      daemon/commands.c
  2. 5 8
      streaming/rrdpush.c
  3. 3 3
      streaming/rrdpush.h
  4. 2 2
      streaming/sender.c

+ 1 - 1
daemon/commands.c

@@ -673,7 +673,7 @@ void commands_init(void)
 
     info("Initializing command server.");
     for (i = 0 ; i < CMD_TOTAL_COMMANDS ; ++i) {
-        uv_mutex_init(&command_lock_array[i]);
+        assert(0 == uv_mutex_init(&command_lock_array[i]));
     }
     assert(0 == uv_rwlock_init(&exclusive_rwlock));
 

+ 5 - 8
streaming/rrdpush.c

@@ -368,12 +368,10 @@ void rrdpush_send_labels(RRDHOST *host) {
 // rrdpush sender thread
 
 // Either the receiver lost the connection or the host is being destroyed.
-// Don't lock the sender buffer - doesn't affect consistency in either case.
-// TODO-GAPS During the host destruction sequence we should make sure the disconnect happens early enough to lock
-//           out collectors hitting the sender. Locking the mutex means there may be waiting threads when we free.
+// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
 void rrdpush_sender_thread_stop(RRDHOST *host) {
-    rrdhost_wrlock(host);
 
+    netdata_mutex_lock(&host->sender->mutex);
     netdata_thread_t thr = 0;
 
     if(host->rrdpush_sender_spawn) {
@@ -390,7 +388,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
         netdata_thread_cancel(host->rrdpush_sender_thread);
     }
 
-    rrdhost_unlock(host);
+    netdata_mutex_unlock(&host->sender->mutex);
 
     if(thr != 0) {
         info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname);
@@ -410,7 +408,7 @@ void log_stream_connection(const char *client_ip, const char *client_port, const
 
 
 static void rrdpush_sender_thread_spawn(RRDHOST *host) {
-    rrdhost_wrlock(host);
+    netdata_mutex_lock(&host->sender->mutex);
 
     if(!host->rrdpush_sender_spawn) {
         char tag[NETDATA_THREAD_TAG_MAX + 1];
@@ -421,8 +419,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
         else
             host->rrdpush_sender_spawn = 1;
     }
-
-    rrdhost_unlock(host);
+    netdata_mutex_unlock(&host->sender->mutex);
 }
 
 int rrdpush_receiver_permission_denied(struct web_client *w) {

+ 3 - 3
streaming/rrdpush.h

@@ -52,9 +52,9 @@ struct sender_state {
     size_t send_attempts;
     time_t last_sent_t;
     size_t not_connected_loops;
-    // metrics may be collected asynchronously
-    // these synchronize all the threads willing the write to our sending buffer
-    netdata_mutex_t mutex;    // Guard access to buffer / build
+    // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
+    // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
+    netdata_mutex_t mutex;
     struct circular_buffer *buffer;
     BUFFER *build;
     char read_buffer[512];

+ 2 - 2
streaming/sender.c

@@ -520,7 +520,7 @@ void execute_commands(struct sender_state *s) {
 static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
     RRDHOST *host = (RRDHOST *)ptr;
 
-    rrdhost_wrlock(host);
+    netdata_mutex_lock(&host->sender->mutex);
 
     info("STREAM %s [send]: sending thread cleans up...", host->hostname);
 
@@ -546,7 +546,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
 
     info("STREAM %s [send]: sending thread now exits.", host->hostname);
 
-    rrdhost_unlock(host);
+    netdata_mutex_unlock(&host->sender->mutex);
 }
 
 void sender_init(struct sender_state *s, RRDHOST *parent) {