Browse Source

Try to find worker thread from parked ones (#11928)

Emmanuel Vasilakis 3 years ago
parent
commit
bf023b50fe

+ 18 - 0
database/sqlite/sqlite_aclk.c

@@ -189,6 +189,24 @@ int aclk_worker_enq_cmd(char *node_id, struct aclk_database_cmd *cmd)
     return (wc == NULL);
 }
 
+struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id)
+{
+    if (unlikely(!node_id))
+        return NULL;
+
+    uv_mutex_lock(&aclk_async_lock);
+    struct aclk_database_worker_config *wc = aclk_thread_head;
+
+    while (wc) {
+        if (!strcmp(wc->node_id, node_id))
+            break;
+        wc = wc->next;
+    }
+    uv_mutex_unlock(&aclk_async_lock);
+
+    return (wc);
+}
+
 void aclk_sync_exit_all()
 {
     rrd_wrlock();

+ 1 - 0
database/sqlite/sqlite_aclk.h

@@ -229,4 +229,5 @@ void sql_delete_aclk_table_list(struct aclk_database_worker_config *wc, struct a
 void sql_maint_aclk_sync_database(struct aclk_database_worker_config *wc, struct aclk_database_cmd cmd);
 int claimed();
 void aclk_sync_exit_all();
+struct aclk_database_worker_config *find_inactive_wc_by_node_id(char *node_id);
 #endif //NETDATA_SQLITE_ACLK_H

+ 3 - 1
database/sqlite/sqlite_aclk_alert.c

@@ -546,7 +546,9 @@ void aclk_start_alert_streaming(char *node_id, uint64_t batch_id, uint64_t start
     rrd_wrlock();
     RRDHOST *host = find_host_by_node_id(node_id);
     if (likely(host))
-        wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+        wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
+                 (struct aclk_database_worker_config *)host->dbsync_worker :
+                 (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
     rrd_unlock();
 
     if (unlikely(!host->health_enabled)) {

+ 3 - 1
database/sqlite/sqlite_aclk_chart.c

@@ -682,7 +682,9 @@ void aclk_start_streaming(char *node_id, uint64_t sequence_id, time_t created_at
     while(host) {
         if (host->node_id && !(uuid_compare(*host->node_id, node_uuid))) {
             rrd_unlock();
-            wc = (struct aclk_database_worker_config *)host->dbsync_worker;
+            wc = (struct aclk_database_worker_config *)host->dbsync_worker ?
+                     (struct aclk_database_worker_config *)host->dbsync_worker :
+                     (struct aclk_database_worker_config *)find_inactive_wc_by_node_id(node_id);
             if (likely(wc)) {
                 wc->chart_reset_count++;
                 __sync_synchronize();