Browse Source

Add two functions that allow someone to start/stop ML. (#15185)

* Add two functions that allow someone to start/stop ML.

* Shutdown ML after stopping collector services

* Remove unnecessary mutex from ml charts.

There's already a spinlock that protects the
chart when a someone calls rrdset_done().

* Use a lightweight spinlock instead of a mutext for ML dimensions.
vkalintiris 1 year ago
parent
commit
c76538e2f0
8 changed files with 203 additions and 83 deletions
  1. 9 8
      collectors/all.h
  2. 5 5
      daemon/main.c
  3. 0 1
      database/sqlite/sqlite_metadata.c
  4. 81 38
      ml/ad_charts.cc
  5. 8 0
      ml/ml-dummy.c
  6. 6 3
      ml/ml-private.h
  7. 91 28
      ml/ml.cc
  8. 3 0
      ml/ml.h

+ 9 - 8
collectors/all.h

@@ -395,16 +395,17 @@
 #define ML_CHART_PRIO_DETECTOR_EVENTS   39183
 
 // [netdata.ml] charts
-#define NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS 890001
-#define NETDATA_ML_CHART_PRIO_METRIC_TYPES            890002
-#define NETDATA_ML_CHART_PRIO_TRAINING_STATUS         890003
+#define NETDATA_ML_CHART_RUNNING                      890001
+#define NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS 890002
+#define NETDATA_ML_CHART_PRIO_METRIC_TYPES            890003
+#define NETDATA_ML_CHART_PRIO_TRAINING_STATUS         890004
 
-#define NETDATA_ML_CHART_PRIO_PREDICTION_USAGE        890004
-#define NETDATA_ML_CHART_PRIO_TRAINING_USAGE          890005
+#define NETDATA_ML_CHART_PRIO_PREDICTION_USAGE        890005
+#define NETDATA_ML_CHART_PRIO_TRAINING_USAGE          890006
 
-#define NETDATA_ML_CHART_PRIO_QUEUE_STATS             890006
-#define NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS     890007
-#define NETDATA_ML_CHART_PRIO_TRAINING_RESULTS        890008
+#define NETDATA_ML_CHART_PRIO_QUEUE_STATS             890007
+#define NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS     890008
+#define NETDATA_ML_CHART_PRIO_TRAINING_RESULTS        890009
 
 #define NETDATA_ML_CHART_FAMILY "machine learning"
 #define NETDATA_ML_PLUGIN "ml.plugin"

+ 5 - 5
daemon/main.c

@@ -344,11 +344,6 @@ void netdata_cleanup_and_exit(int ret) {
 
     webrtc_close_all_connections();
 
-    delta_shutdown_time("disable ML detection and training threads");
-
-    ml_stop_threads();
-    ml_fini();
-
     delta_shutdown_time("disable maintenance, new queries, new web requests, new streaming connections and aclk");
 
     service_signal_exit(
@@ -377,6 +372,11 @@ void netdata_cleanup_and_exit(int ret) {
             | SERVICE_STREAMING
             , 3 * USEC_PER_SEC);
 
+    delta_shutdown_time("disable ML detection and training threads");
+
+    ml_stop_threads();
+    ml_fini();
+
     delta_shutdown_time("stop context thread");
 
     timeout = !service_wait_exit(

+ 0 - 1
database/sqlite/sqlite_metadata.c

@@ -1485,7 +1485,6 @@ static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *p
     cmd.param[1] = param1;
     cmd.completion = NULL;
     metadata_enq_cmd(&metasync_worker, &cmd);
-
 }
 
 // Public

+ 81 - 38
ml/ad_charts.cc

@@ -183,6 +183,41 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
 
         rrdset_done(host->dimensions_rs);
     }
+
+    // ML running
+    {
+        if (!host->ml_running_rs) {
+            char id_buf[1024];
+            char name_buf[1024];
+
+            snprintfz(id_buf, 1024, "ml_running_on_%s", localhost->machine_guid);
+            snprintfz(name_buf, 1024, "ml_running_on_%s", rrdhost_hostname(localhost));
+
+            host->ml_running_rs = rrdset_create(
+                    host->rh,
+                    "anomaly_detection", // type
+                    id_buf, // id
+                    name_buf, // name
+                    "anomaly_detection", // family
+                    "anomaly_detection.ml_running", // ctx
+                    "ML running", // title
+                    "boolean", // units
+                    NETDATA_ML_PLUGIN, // plugin
+                    NETDATA_ML_MODULE_DETECTION, // module
+                    NETDATA_ML_CHART_RUNNING, // priority
+                    localhost->rrd_update_every, // update_every
+                    RRDSET_TYPE_LINE // chart_type
+            );
+            rrdset_flag_set(host->ml_running_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+            host->ml_running_rd =
+                rrddim_add(host->ml_running_rs, "ml_running", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+        }
+
+        rrddim_set_by_pointer(host->ml_running_rs,
+                              host->ml_running_rd, host->ml_running);
+        rrdset_done(host->ml_running_rs);
+    }
 }
 
 void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
@@ -260,47 +295,55 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
         /*
          * Compute the values of the dimensions based on the host rate chart
         */
-        ONEWAYALLOC *OWA = onewayalloc_create(0);
-        time_t Now = now_realtime_sec();
-        time_t Before = Now - host->rh->rrd_update_every;
-        time_t After = Before - Cfg.anomaly_detection_query_duration;
-        RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000);
-
-        RRDR *R = rrd2rrdr_legacy(
-                OWA,
-                host->anomaly_rate_rs,
-                1 /* points wanted */,
-                After,
-                Before,
-                Cfg.anomaly_detection_grouping_method,
-                0 /* resampling time */,
-                Options, "anomaly_rate",
-                NULL /* group options */,
-                0, /* timeout */
-                0, /* tier */
-                QUERY_SOURCE_ML,
-                STORAGE_PRIORITY_SYNCHRONOUS
-        );
-
-        if (R) {
-            if (R->d == 1 && R->n == 1 && R->rows == 1) {
-                static thread_local bool prev_above_threshold = false;
-                bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold;
-                bool new_anomaly_event = above_threshold && !prev_above_threshold;
-                prev_above_threshold = above_threshold;
-
-                rrddim_set_by_pointer(host->detector_events_rs,
-                                      host->detector_events_above_threshold_rd, above_threshold);
-                rrddim_set_by_pointer(host->detector_events_rs,
-                                      host->detector_events_new_anomaly_event_rd, new_anomaly_event);
-
-                rrdset_done(host->detector_events_rs);
+        if (host->ml_running) {
+            ONEWAYALLOC *OWA = onewayalloc_create(0);
+            time_t Now = now_realtime_sec();
+            time_t Before = Now - host->rh->rrd_update_every;
+            time_t After = Before - Cfg.anomaly_detection_query_duration;
+            RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000);
+
+            RRDR *R = rrd2rrdr_legacy(
+                    OWA,
+                    host->anomaly_rate_rs,
+                    1 /* points wanted */,
+                    After,
+                    Before,
+                    Cfg.anomaly_detection_grouping_method,
+                    0 /* resampling time */,
+                    Options, "anomaly_rate",
+                    NULL /* group options */,
+                    0, /* timeout */
+                    0, /* tier */
+                    QUERY_SOURCE_ML,
+                    STORAGE_PRIORITY_SYNCHRONOUS
+            );
+
+            if (R) {
+                if (R->d == 1 && R->n == 1 && R->rows == 1) {
+                    static thread_local bool prev_above_threshold = false;
+                    bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold;
+                    bool new_anomaly_event = above_threshold && !prev_above_threshold;
+                    prev_above_threshold = above_threshold;
+
+                    rrddim_set_by_pointer(host->detector_events_rs,
+                                          host->detector_events_above_threshold_rd, above_threshold);
+                    rrddim_set_by_pointer(host->detector_events_rs,
+                                          host->detector_events_new_anomaly_event_rd, new_anomaly_event);
+
+                    rrdset_done(host->detector_events_rs);
+                }
+
+                rrdr_free(OWA, R);
             }
 
-            rrdr_free(OWA, R);
+            onewayalloc_destroy(OWA);
+        } else {
+            rrddim_set_by_pointer(host->detector_events_rs,
+                                  host->detector_events_above_threshold_rd, 0);
+            rrddim_set_by_pointer(host->detector_events_rs,
+                                  host->detector_events_new_anomaly_event_rd, 0);
+            rrdset_done(host->detector_events_rs);
         }
-
-        onewayalloc_destroy(OWA);
     }
 }
 

+ 8 - 0
ml/ml-dummy.c

@@ -33,6 +33,14 @@ void ml_host_delete(RRDHOST *rh) {
     UNUSED(rh);
 }
 
+void ml_host_start(RRDHOST *rh) {
+    UNUSED(rh);
+}
+
+void ml_host_stop(RRDHOST *rh) {
+    UNUSED(rh);
+}
+
 void ml_host_start_training_thread(RRDHOST *rh) {
     UNUSED(rh);
 }

+ 6 - 3
ml/ml-private.h

@@ -195,7 +195,7 @@ typedef struct {
     std::vector<calculated_number_t> cns;
 
     std::vector<ml_kmeans_t> km_contexts;
-    netdata_mutex_t mutex;
+    SPINLOCK slock;
     ml_kmeans_t kmeans;
     std::vector<DSample> feature;
 
@@ -206,8 +206,6 @@ typedef struct {
 typedef struct {
     RRDSET *rs;
     ml_machine_learning_stats_t mls;
-
-    netdata_mutex_t mutex;
 } ml_chart_t;
 
 void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous);
@@ -215,6 +213,8 @@ void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_a
 typedef struct {
     RRDHOST *rh;
 
+    std::atomic<bool> ml_running;
+
     ml_machine_learning_stats_t mls;
 
     calculated_number_t host_anomaly_rate;
@@ -227,6 +227,9 @@ typedef struct {
      * bookkeeping for anomaly detection charts
     */
 
+    RRDSET *ml_running_rs;
+    RRDDIM *ml_running_rd;
+
     RRDSET *machine_learning_status_rs;
     RRDDIM *machine_learning_status_enabled_rd;
     RRDDIM *machine_learning_status_disabled_sp_rd;

+ 91 - 28
ml/ml.cc

@@ -568,9 +568,9 @@ int ml_dimension_load_models(RRDDIM *rd) {
     if (!dim)
         return 0;
 
-    netdata_mutex_lock(&dim->mutex);
+    netdata_spinlock_lock(&dim->slock);
     bool is_empty = dim->km_contexts.empty();
-    netdata_mutex_unlock(&dim->mutex);
+    netdata_spinlock_unlock(&dim->slock);
 
     if (!is_empty)
         return 0;
@@ -602,7 +602,7 @@ int ml_dimension_load_models(RRDDIM *rd) {
     if (unlikely(rc != SQLITE_OK))
         goto bind_fail;
 
-    netdata_mutex_lock(&dim->mutex);
+    netdata_spinlock_lock(&dim->slock);
 
     dim->km_contexts.reserve(Cfg.num_models_to_use);
     while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
@@ -639,7 +639,7 @@ int ml_dimension_load_models(RRDDIM *rd) {
         dim->ts = TRAINING_STATUS_TRAINED;
     }
 
-    netdata_mutex_unlock(&dim->mutex);
+    netdata_spinlock_unlock(&dim->slock);
 
     if (unlikely(rc != SQLITE_DONE))
         error_report("Failed to load models, rc = %d", rc);
@@ -666,7 +666,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
     ml_training_response_t training_response = P.second;
 
     if (training_response.result != TRAINING_RESULT_OK) {
-        netdata_mutex_lock(&dim->mutex);
+        netdata_spinlock_lock(&dim->slock);
 
         dim->mt = METRIC_TYPE_CONSTANT;
 
@@ -687,7 +687,8 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
 
         dim->last_training_time = training_response.last_entry_on_response;
         enum ml_training_result result = training_response.result;
-        netdata_mutex_unlock(&dim->mutex);
+
+        netdata_spinlock_unlock(&dim->slock);
 
         return result;
     }
@@ -713,7 +714,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
     // update models
     worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
     {
-        netdata_mutex_lock(&dim->mutex);
+        netdata_spinlock_lock(&dim->slock);
 
         if (dim->km_contexts.size() < Cfg.num_models_to_use) {
             dim->km_contexts.push_back(std::move(dim->kmeans));
@@ -752,7 +753,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
         model_info.kmeans = dim->km_contexts.back();
         training_thread->pending_model_info.push_back(model_info);
 
-        netdata_mutex_unlock(&dim->mutex);
+        netdata_spinlock_unlock(&dim->slock);
     }
 
     return training_response.result;
@@ -851,7 +852,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
     /*
      * Lock to predict and possibly schedule the dimension for training
     */
-    if (netdata_mutex_trylock(&dim->mutex) != 0)
+    if (netdata_spinlock_trylock(&dim->slock) == 0)
         return false;
 
     // Mark the metric time as variable if we received different values
@@ -866,7 +867,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
         case TRAINING_STATUS_UNTRAINED:
         case TRAINING_STATUS_PENDING_WITHOUT_MODEL: {
         case TRAINING_STATUS_SILENCED:
-            netdata_mutex_unlock(&dim->mutex);
+            netdata_spinlock_unlock(&dim->slock);
             return false;
         }
         default:
@@ -891,7 +892,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
 
         if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) {
             global_statistics_ml_models_consulted(models_consulted);
-            netdata_mutex_unlock(&dim->mutex);
+            netdata_spinlock_unlock(&dim->slock);
             return false;
         }
 
@@ -905,7 +906,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
         dim->ts = TRAINING_STATUS_SILENCED;
     }
 
-    netdata_mutex_unlock(&dim->mutex);
+    netdata_spinlock_unlock(&dim->slock);
 
     global_statistics_ml_models_consulted(models_consulted);
     return sum;
@@ -992,7 +993,7 @@ ml_host_detect_once(ml_host_t *host)
     host->mls = {};
     ml_machine_learning_stats_t mls_copy = {};
 
-    {
+    if (host->ml_running) {
         netdata_mutex_lock(&host->mutex);
 
         /*
@@ -1036,6 +1037,8 @@ ml_host_detect_once(ml_host_t *host)
         mls_copy = host->mls;
 
         netdata_mutex_unlock(&host->mutex);
+    } else {
+        host->host_anomaly_rate = 0.0;
     }
 
     worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
@@ -1213,15 +1216,14 @@ void ml_host_new(RRDHOST *rh)
 
     host->rh = rh;
     host->mls = ml_machine_learning_stats_t();
-    //host->ts = ml_training_stats_t();
+    host->host_anomaly_rate = 0.0;
 
     static std::atomic<size_t> times_called(0);
     host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
 
-    host->host_anomaly_rate = 0.0;
-
     netdata_mutex_init(&host->mutex);
 
+    host->ml_running = true;
     rh->ml_host = (rrd_ml_host_t *) host;
 }
 
@@ -1237,6 +1239,70 @@ void ml_host_delete(RRDHOST *rh)
     rh->ml_host = NULL;
 }
 
+void ml_host_start(RRDHOST *rh) {
+    ml_host_t *host = (ml_host_t *) rh->ml_host;
+    if (!host)
+        return;
+
+    host->ml_running = true;
+}
+
+void ml_host_stop(RRDHOST *rh) {
+    ml_host_t *host = (ml_host_t *) rh->ml_host;
+    if (!host || !host->ml_running)
+        return;
+
+    netdata_mutex_lock(&host->mutex);
+
+    // reset host stats
+    host->mls = ml_machine_learning_stats_t();
+
+    // reset charts/dims
+    void *rsp = NULL;
+    rrdset_foreach_read(rsp, host->rh) {
+        RRDSET *rs = static_cast<RRDSET *>(rsp);
+
+        ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+        if (!chart)
+            continue;
+
+        // reset chart
+        chart->mls = ml_machine_learning_stats_t();
+
+        void *rdp = NULL;
+        rrddim_foreach_read(rdp, rs) {
+            RRDDIM *rd = static_cast<RRDDIM *>(rdp);
+
+            ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+            if (!dim)
+                continue;
+
+            netdata_spinlock_lock(&dim->slock);
+
+            // reset dim
+            // TODO: should we drop in-mem models, or mark them as stale? Is it
+            // okay to resume training straight away?
+
+            dim->mt = METRIC_TYPE_CONSTANT;
+            dim->ts = TRAINING_STATUS_UNTRAINED;
+            dim->last_training_time = 0;
+            dim->suppression_anomaly_counter = 0;
+            dim->suppression_window_counter = 0;
+            dim->cns.clear();
+
+            ml_kmeans_init(&dim->kmeans);
+
+            netdata_spinlock_unlock(&dim->slock);
+        }
+        rrddim_foreach_done(rdp);
+    }
+    rrdset_foreach_done(rsp);
+
+    netdata_mutex_unlock(&host->mutex);
+
+    host->ml_running = false;
+}
+
 void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
 {
     ml_host_t *host = (ml_host_t *) rh->ml_host;
@@ -1279,7 +1345,8 @@ void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
 
     netdata_mutex_lock(&host->mutex);
 
-    buffer_json_member_add_uint64(wb, "version", 1);
+    buffer_json_member_add_uint64(wb, "version", 2);
+    buffer_json_member_add_uint64(wb, "ml-running", host->ml_running);
     buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions);
     buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions);
     buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions +
@@ -1309,8 +1376,6 @@ void ml_chart_new(RRDSET *rs)
     chart->rs = rs;
     chart->mls = ml_machine_learning_stats_t();
 
-    netdata_mutex_init(&chart->mutex);
-
     rs->ml_chart = (rrd_ml_chart_t *) chart;
 }
 
@@ -1322,8 +1387,6 @@ void ml_chart_delete(RRDSET *rs)
 
     ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
 
-    netdata_mutex_destroy(&chart->mutex);
-
     delete chart;
     rs->ml_chart = NULL;
 }
@@ -1334,7 +1397,6 @@ bool ml_chart_update_begin(RRDSET *rs)
     if (!chart)
         return false;
 
-    netdata_mutex_lock(&chart->mutex);
     chart->mls = {};
     return true;
 }
@@ -1344,8 +1406,6 @@ void ml_chart_update_end(RRDSET *rs)
     ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
     if (!chart)
         return;
-
-    netdata_mutex_unlock(&chart->mutex);
 }
 
 void ml_dimension_new(RRDDIM *rd)
@@ -1360,8 +1420,9 @@ void ml_dimension_new(RRDDIM *rd)
 
     dim->mt = METRIC_TYPE_CONSTANT;
     dim->ts = TRAINING_STATUS_UNTRAINED;
-
     dim->last_training_time = 0;
+    dim->suppression_anomaly_counter = 0;
+    dim->suppression_window_counter = 0;
 
     ml_kmeans_init(&dim->kmeans);
 
@@ -1370,7 +1431,7 @@ void ml_dimension_new(RRDDIM *rd)
     else
         dim->mls = MACHINE_LEARNING_STATUS_ENABLED;
 
-    netdata_mutex_init(&dim->mutex);
+    netdata_spinlock_init(&dim->slock);
 
     dim->km_contexts.reserve(Cfg.num_models_to_use);
 
@@ -1385,8 +1446,6 @@ void ml_dimension_delete(RRDDIM *rd)
     if (!dim)
         return;
 
-    netdata_mutex_destroy(&dim->mutex);
-
     delete dim;
     rd->ml_dimension = NULL;
 }
@@ -1397,6 +1456,10 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
     if (!dim)
         return false;
 
+    ml_host_t *host = (ml_host_t *) rd->rrdset->rrdhost->ml_host;
+    if (!host->ml_running)
+        return false;
+
     ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
 
     bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists);

+ 3 - 0
ml/ml.h

@@ -23,6 +23,9 @@ void ml_stop_threads(void);
 void ml_host_new(RRDHOST *rh);
 void ml_host_delete(RRDHOST *rh);
 
+void ml_host_start(RRDHOST *RH);
+void ml_host_stop(RRDHOST *RH);
+
 void ml_host_get_info(RRDHOST *RH, BUFFER *wb);
 void ml_host_get_detection_info(RRDHOST *RH, BUFFER *wb);
 void ml_host_get_models(RRDHOST *RH, BUFFER *wb);