Browse Source

Prepare the main cleanup function for the exporting engine (#9099)

Vladimir Kobal 4 years ago
parent
commit
98c7260d92

+ 19 - 18
exporting/clean_connectors.c

@@ -3,35 +3,36 @@
 #include "exporting_engine.h"
 
 /**
- * Clean the instance config.
- * @param ptr
+ * Clean the instance config
+ *
+ * @param config an instance config structure.
  */
-static void clean_instance_config(struct instance_config *ptr)
+static void clean_instance_config(struct instance_config *config)
 {
-    if (ptr->name)
-        freez((void *)ptr->name);
+    if (config->name)
+        freez((void *)config->name);
 
-    if (ptr->destination)
-        freez((void *)ptr->destination);
+    if (config->destination)
+        freez((void *)config->destination);
 
-    if (ptr->charts_pattern)
-        simple_pattern_free(ptr->charts_pattern);
+    if (config->charts_pattern)
+        simple_pattern_free(config->charts_pattern);
 
-    if (ptr->hosts_pattern)
-        simple_pattern_free(ptr->hosts_pattern);
+    if (config->hosts_pattern)
+        simple_pattern_free(config->hosts_pattern);
 }
 
 /**
  * Clean the allocated variables
  *
- * @param ptr a pointer to the structure with variables to clean.
+ * @param instance an instance data structure.
  */
-void clean_instance(struct instance *ptr)
+void clean_instance(struct instance *instance)
 {
-    clean_instance_config(&ptr->config);
-    if (ptr->labels)
-        buffer_free(ptr->labels);
+    clean_instance_config(&instance->config);
+    if (instance->labels)
+        buffer_free(instance->labels);
 
-    uv_mutex_destroy(&ptr->mutex);
-    uv_cond_destroy(&ptr->cond_var);
+    uv_cond_destroy(&instance->cond_var);
+    // uv_mutex_destroy(&instance->mutex);
 }

+ 54 - 3
exporting/exporting_engine.c

@@ -2,12 +2,63 @@
 
 #include "exporting_engine.h"
 
-static void exporting_main_cleanup(void *ptr) {
+static struct engine *engine = NULL;
+
+/**
+ * Clean up the main exporting thread and all connector workers on Netdata exit
+ *
+ * @param ptr thread data.
+ */
+static void exporting_main_cleanup(void *ptr)
+{
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
     static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
 
     info("cleaning up...");
 
+    if (!engine) {
+        static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+        return;
+    }
+
+    engine->exit = 1;
+
+    int found = 0;
+    usec_t max = 2 * USEC_PER_SEC, step = 50000;
+
+    for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+        if (!instance->exited) {
+            found++;
+            info("stopping worker for instance %s", instance->config.name);
+            uv_cond_signal(&instance->cond_var);
+        } else
+            info("found stopped worker for instance %s", instance->config.name);
+    }
+
+    while (found && max > 0) {
+        max -= step;
+        info("Waiting %d exporting connectors to finish...", found);
+        sleep_usec(step);
+        found = 0;
+
+        for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
+            if (!instance->exited)
+                found++;
+        }
+    }
+
+    for (struct instance *instance = engine->instance_root; instance;) {
+        struct instance *current_instance = instance;
+        instance = instance->next;
+        clean_instance(current_instance);
+    }
+
+    if (engine->config.prefix)
+        freez((void *)engine->config.prefix);
+    if (engine->config.hostname)
+        freez((void *)engine->config.hostname);
+    freez(engine);
+
     static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
 }
 
@@ -24,7 +75,7 @@ void *exporting_main(void *ptr)
 {
     netdata_thread_cleanup_push(exporting_main_cleanup, ptr);
 
-    struct engine *engine = read_exporting_config();
+    engine = read_exporting_config();
     if (!engine) {
         info("EXPORTING: no exporting connectors configured");
         goto cleanup;
@@ -54,7 +105,7 @@ void *exporting_main(void *ptr)
         send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
 
 #ifdef UNIT_TESTING
-        break;
+        return NULL;
 #endif
     }
 

+ 4 - 0
exporting/exporting_engine.h

@@ -180,6 +180,8 @@ struct instance {
     size_t index;
     struct instance *next;
     struct engine *engine;
+
+    volatile sig_atomic_t exited;
 };
 
 struct engine {
@@ -192,6 +194,8 @@ struct engine {
     int mongoc_initialized;
 
     struct instance *instance_root;
+
+    volatile sig_atomic_t exit;
 };
 
 extern struct instance *prometheus_exporter_instance;

+ 21 - 2
exporting/send_data.c

@@ -140,6 +140,21 @@ void simple_connector_send_buffer(int *sock, int *failures, struct instance *ins
     }
 }
 
+/**
+ * Clean up a simple connector instance on Netdata exit
+ *
+ * @param instance an instance data structure.
+ */
+void simple_connector_cleanup(struct instance *instance)
+{
+    info("EXPORTING: cleaning up instance %s ...", instance->config.name);
+
+    // TODO free allocated resources
+
+    info("EXPORTING: instance %s exited", instance->config.name);
+    instance->exited = 1;
+}
+
 /**
  * Simple connector worker
  *
@@ -159,7 +174,7 @@ void simple_connector_worker(void *instance_p)
                               .tv_usec = (instance->config.timeoutms * 1000) % 1000000};
     int failures = 0;
 
-    while(!netdata_exit) {
+    while(!instance->engine->exit) {
 
         // reset the monitoring chart counters
         stats->received_bytes =
@@ -195,7 +210,7 @@ void simple_connector_worker(void *instance_p)
             stats->reconnects += reconnects;
         }
 
-        if(unlikely(netdata_exit)) break;
+        if(unlikely(instance->engine->exit)) break;
 
         // ------------------------------------------------------------------------
         // if we are connected, send our buffer to the data collecting server
@@ -203,6 +218,8 @@ void simple_connector_worker(void *instance_p)
         uv_mutex_lock(&instance->mutex);
         uv_cond_wait(&instance->cond_var, &instance->mutex);
 
+        if(unlikely(instance->engine->exit)) break;
+
         if (likely(sock != -1)) {
             simple_connector_send_buffer(&sock, &failures, instance);
         } else {
@@ -238,4 +255,6 @@ void simple_connector_worker(void *instance_p)
         break;
 #endif
     }
+
+    simple_connector_cleanup(instance);
 }

+ 3 - 2
exporting/tests/test_exporting_engine.c

@@ -65,8 +65,6 @@ static void test_exporting_engine(void **state)
     expect_value(__wrap_send_main_rusage, rd_user, NULL);
     expect_value(__wrap_send_main_rusage, rd_system, NULL);
 
-    expect_function_call(__wrap_info_int);
-
     void *ptr = malloc(sizeof(struct netdata_static_thread));
     assert_ptr_equal(exporting_main(ptr), NULL);
     assert_int_equal(engine->now, 2);
@@ -668,6 +666,9 @@ static void test_simple_connector_worker(void **state)
     expect_value(__wrap_send_internal_metrics, instance, instance);
     will_return(__wrap_send_internal_metrics, 0);
 
+    expect_function_call(__wrap_info_int);
+    expect_function_call(__wrap_info_int);
+
     simple_connector_worker(instance);
 
     assert_int_equal(stats->buffered_metrics, 0);