Просмотр исходного кода

Fix error handling in exporting connector (#8910)

Vladimir Kobal 4 лет назад
Родитель
Сommit
a606a27f16

+ 0 - 1
CMakeLists.txt

@@ -1152,7 +1152,6 @@ endif()
         -Wl,--wrap=rrdset_is_exportable
         -Wl,--wrap=exporting_calculate_value_from_stored_data
         -Wl,--wrap=prepare_buffers
-        -Wl,--wrap=notify_workers
         -Wl,--wrap=send_internal_metrics
         -Wl,--wrap=now_realtime_sec
         -Wl,--wrap=uv_thread_set_name_np

+ 0 - 1
Makefile.am

@@ -878,7 +878,6 @@ if ENABLE_UNITTESTS
         -Wl,--wrap=rrdset_is_exportable \
         -Wl,--wrap=exporting_calculate_value_from_stored_data \
         -Wl,--wrap=prepare_buffers \
-        -Wl,--wrap=notify_workers \
         -Wl,--wrap=send_internal_metrics \
         -Wl,--wrap=now_realtime_sec \
         -Wl,--wrap=uv_thread_set_name_np \

+ 2 - 11
exporting/exporting_engine.c

@@ -48,17 +48,8 @@ void *exporting_main(void *ptr)
         heartbeat_next(&hb, step_ut);
         engine->now = now_realtime_sec();
 
-        if (mark_scheduled_instances(engine)) {
-            if (prepare_buffers(engine) != 0) {
-                error("EXPORTING: cannot prepare data to send");
-                break;
-            }
-        }
-
-        if (notify_workers(engine) != 0) {
-            error("EXPORTING: cannot communicate with exporting connector instance working threads");
-            break;
-        }
+        if (mark_scheduled_instances(engine))
+            prepare_buffers(engine);
 
         send_main_rusage(st_main_rusage, rd_main_user, rd_main_system);
 

+ 17 - 9
exporting/exporting_engine.h

@@ -151,6 +151,7 @@ struct instance {
     struct stats stats;
 
     int scheduled;
+    int disabled;
     int skip_host;
     int skip_chart;
 
@@ -203,8 +204,7 @@ EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type);
 int init_connectors(struct engine *engine);
 
 int mark_scheduled_instances(struct engine *engine);
-int prepare_buffers(struct engine *engine);
-int notify_workers(struct engine *engine);
+void prepare_buffers(struct engine *engine);
 
 size_t exporting_name_copy(char *dst, const char *src, size_t max_len);
 
@@ -216,13 +216,13 @@ calculated_number exporting_calculate_value_from_stored_data(
     RRDDIM *rd,
     time_t *last_timestamp);
 
-int start_batch_formatting(struct engine *engine);
-int start_host_formatting(struct engine *engine, RRDHOST *host);
-int start_chart_formatting(struct engine *engine, RRDSET *st);
-int metric_formatting(struct engine *engine, RRDDIM *rd);
-int end_chart_formatting(struct engine *engine, RRDSET *st);
-int end_host_formatting(struct engine *engine, RRDHOST *host);
-int end_batch_formatting(struct engine *engine);
+void start_batch_formatting(struct engine *engine);
+void start_host_formatting(struct engine *engine, RRDHOST *host);
+void start_chart_formatting(struct engine *engine, RRDSET *st);
+void metric_formatting(struct engine *engine, RRDDIM *rd);
+void end_chart_formatting(struct engine *engine, RRDSET *st);
+void end_host_formatting(struct engine *engine, RRDHOST *host);
+void end_batch_formatting(struct engine *engine);
 int flush_host_labels(struct instance *instance, RRDHOST *host);
 int simple_connector_update_buffered_bytes(struct instance *instance);
 
@@ -235,6 +235,14 @@ void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_
 void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system);
 void send_internal_metrics(struct instance *instance);
 
+static inline void disable_instance(struct instance *instance)
+{
+    instance->disabled = 1;
+    instance->scheduled = 0;
+    uv_mutex_unlock(&instance->mutex);
+    error("EXPORTING: Instance %s disabled", instance->config.name);
+}
+
 #include "exporting/prometheus/prometheus.h"
 
 #endif /* NETDATA_EXPORTING_ENGINE_H */

+ 28 - 72
exporting/process_data.c

@@ -43,7 +43,7 @@ int mark_scheduled_instances(struct engine *engine)
     int instances_were_scheduled = 0;
 
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
-        if (engine->now % instance->config.update_every < localhost->rrd_update_every) {
+        if (!instance->disabled && (engine->now % instance->config.update_every < localhost->rrd_update_every)) {
             instance->scheduled = 1;
             instances_were_scheduled = 1;
             instance->before = engine->now;
@@ -160,21 +160,18 @@ calculated_number exporting_calculate_value_from_stored_data(
  * Start batch formatting for every connector instance's buffer
  *
  * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
  */
-int start_batch_formatting(struct engine *engine)
+void start_batch_formatting(struct engine *engine)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled) {
             uv_mutex_lock(&instance->mutex);
             if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
                 error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
-                return 1;
+                disable_instance(instance);
             }
         }
     }
-
-    return 0;
 }
 
 /**
@@ -182,24 +179,21 @@ int start_batch_formatting(struct engine *engine)
  *
  * @param engine an engine data structure.
  * @param host a data collecting host.
- * @return Returns 0 on success, 1 on failure.
  */
-int start_host_formatting(struct engine *engine, RRDHOST *host)
+void start_host_formatting(struct engine *engine, RRDHOST *host)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled) {
             if (rrdhost_is_exportable(instance, host)) {
                 if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
                     error("EXPORTING: cannot start host formatting for %s", instance->config.name);
-                    return 1;
+                    disable_instance(instance);
                 }
             } else {
                 instance->skip_host = 1;
             }
         }
     }
-
-    return 0;
 }
 
 /**
@@ -207,24 +201,21 @@ int start_host_formatting(struct engine *engine, RRDHOST *host)
  *
  * @param engine an engine data structure.
  * @param st a chart.
- * @return Returns 0 on success, 1 on failure.
  */
-int start_chart_formatting(struct engine *engine, RRDSET *st)
+void start_chart_formatting(struct engine *engine, RRDSET *st)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled && !instance->skip_host) {
             if (rrdset_is_exportable(instance, st)) {
                 if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
                     error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
-                    return 1;
+                    disable_instance(instance);
                 }
             } else {
                 instance->skip_chart = 1;
             }
         }
     }
-
-    return 0;
 }
 
 /**
@@ -232,21 +223,19 @@ int start_chart_formatting(struct engine *engine, RRDSET *st)
  *
  * @param engine an engine data structure.
  * @param rd a dimension(metric) in the Netdata database.
- * @return Returns 0 on success, 1 on failure.
  */
-int metric_formatting(struct engine *engine, RRDDIM *rd)
+void metric_formatting(struct engine *engine, RRDDIM *rd)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
             if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
                 error("EXPORTING: cannot format metric for %s", instance->config.name);
-                return 1;
+                disable_instance(instance);
+                continue;
             }
             instance->stats.buffered_metrics++;
         }
     }
-
-    return 0;
 }
 
 /**
@@ -254,21 +243,19 @@ int metric_formatting(struct engine *engine, RRDDIM *rd)
  *
  * @param engine an engine data structure.
  * @param a chart.
- * @return Returns 0 on success, 1 on failure.
  */
-int end_chart_formatting(struct engine *engine, RRDSET *st)
+void end_chart_formatting(struct engine *engine, RRDSET *st)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
             if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
                 error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
-                return 1;
+                disable_instance(instance);
+                continue;
             }
         }
         instance->skip_chart = 0;
     }
-
-    return 0;
 }
 
 /**
@@ -276,36 +263,34 @@ int end_chart_formatting(struct engine *engine, RRDSET *st)
  *
  * @param engine an engine data structure.
  * @param host a data collecting host.
- * @return Returns 0 on success, 1 on failure.
  */
-int end_host_formatting(struct engine *engine, RRDHOST *host)
+void end_host_formatting(struct engine *engine, RRDHOST *host)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled && !instance->skip_host) {
             if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
                 error("EXPORTING: cannot end host formatting for %s", instance->config.name);
-                return 1;
+                disable_instance(instance);
+                continue;
             }
         }
         instance->skip_host = 0;
     }
-
-    return 0;
 }
 
 /**
  * End batch formatting for every connector instance's buffer
  *
  * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
  */
-int end_batch_formatting(struct engine *engine)
+void end_batch_formatting(struct engine *engine)
 {
     for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
         if (instance->scheduled) {
             if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
                 error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
-                return 1;
+                disable_instance(instance);
+                continue;
             }
             uv_mutex_unlock(&instance->mutex);
             uv_cond_signal(&instance->cond_var);
@@ -314,8 +299,6 @@ int end_batch_formatting(struct engine *engine)
             instance->after = instance->before;
         }
     }
-
-    return 0;
 }
 
 /**
@@ -325,51 +308,39 @@ int end_batch_formatting(struct engine *engine)
  * configured rules.
  *
  * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
  */
-int prepare_buffers(struct engine *engine)
+void prepare_buffers(struct engine *engine)
 {
-    if (start_batch_formatting(engine) != 0)
-        return 1;
-
     netdata_thread_disable_cancelability();
+    start_batch_formatting(engine);
+
     rrd_rdlock();
     RRDHOST *host;
     rrdhost_foreach_read(host)
     {
         rrdhost_rdlock(host);
-        if (start_host_formatting(engine, host) != 0)
-            return 1;
+        start_host_formatting(engine, host);
         RRDSET *st;
         rrdset_foreach_read(st, host)
         {
             rrdset_rdlock(st);
-            if (start_chart_formatting(engine, st) != 0)
-                return 1;
+            start_chart_formatting(engine, st);
 
             RRDDIM *rd;
             rrddim_foreach_read(rd, st)
-            {
-                if (metric_formatting(engine, rd) != 0)
-                    return 1;
-            }
+                metric_formatting(engine, rd);
 
-            if (end_chart_formatting(engine, st) != 0)
-                return 1;
+            end_chart_formatting(engine, st);
             rrdset_unlock(st);
         }
 
-        if (end_host_formatting(engine, host) != 0)
-            return 1;
+        end_host_formatting(engine, host);
         rrdhost_unlock(host);
     }
     rrd_unlock();
     netdata_thread_enable_cancelability();
 
-    if (end_batch_formatting(engine) != 0)
-        return 1;
-
-    return 0;
+    end_batch_formatting(engine);
 }
 
 /**
@@ -401,18 +372,3 @@ int simple_connector_update_buffered_bytes(struct instance *instance)
 
     return 0;
 }
-
-/**
- * Notify workers
- *
- * Notify exporting connector instance working threads that data is ready to send.
- *
- * @param engine an engine data structure.
- * @return Returns 0 on success, 1 on failure.
- */
-int notify_workers(struct engine *engine)
-{
-    (void)engine;
-
-    return 0;
-}

+ 0 - 7
exporting/tests/exporting_doubles.c

@@ -75,13 +75,6 @@ int __wrap_prepare_buffers(struct engine *engine)
     return mock_type(int);
 }
 
-int __wrap_notify_workers(struct engine *engine)
-{
-    function_called();
-    check_expected_ptr(engine);
-    return mock_type(int);
-}
-
 void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system)
 {
     function_called();

+ 0 - 4
exporting/tests/test_exporting_engine.c

@@ -60,10 +60,6 @@ static void test_exporting_engine(void **state)
     expect_memory(__wrap_prepare_buffers, engine, engine, sizeof(struct engine));
     will_return(__wrap_prepare_buffers, 0);
 
-    expect_function_call(__wrap_notify_workers);
-    expect_memory(__wrap_notify_workers, engine, engine, sizeof(struct engine));
-    will_return(__wrap_notify_workers, 0);
-
     expect_function_call(__wrap_send_main_rusage);
     expect_value(__wrap_send_main_rusage, st_rusage, NULL);
     expect_value(__wrap_send_main_rusage, rd_user, NULL);

+ 0 - 2
exporting/tests/test_exporting_engine.h

@@ -93,8 +93,6 @@ calculated_number __wrap_exporting_calculate_value_from_stored_data(
 int __real_prepare_buffers(struct engine *engine);
 int __wrap_prepare_buffers(struct engine *engine);
 
-int __wrap_notify_workers(struct engine *engine);
-
 void __real_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);
 void __wrap_create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system);