Browse Source

Save and load ML models (#14810)

* Revert "Revert "Use static thread-pool for training. (#14702)" (#14782)"

This reverts commit 5321ca8d1ef8d974a6a2b2128ca8804de6acb693.

* Model I/O.

* Minor changes

Meant to make debugging a crash issues easier
on cloud VMs:

  - Less verbose logging
  - Higher logging history
  - Modify installer to use debug info by default

* Fix ML initialization order.

* read lock hosts when running detection.

* Revert debugging changes.

* Update ml/Config.cc

Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>

---------

Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
vkalintiris 1 year ago
parent
commit
003df5f2b7
10 changed files with 767 additions and 427 deletions
  1. 1 27
      daemon/global_statistics.c
  2. 9 9
      daemon/main.c
  3. 9 11
      daemon/main.h
  4. 0 3
      database/rrdhost.c
  5. 11 1
      ml/Config.cc
  6. 98 69
      ml/ad_charts.cc
  7. 1 1
      ml/ad_charts.h
  8. 10 0
      ml/ml-dummy.c
  9. 29 13
      ml/ml-private.h
  10. 599 293
      ml/ml.cc

+ 1 - 27
daemon/global_statistics.c

@@ -827,33 +827,7 @@ static void global_statistics_charts(void) {
         rrdset_done(st_points_stored);
     }
 
-    {
-        static RRDSET *st = NULL;
-        static RRDDIM *rd = NULL;
-
-        if (unlikely(!st)) {
-            st = rrdset_create_localhost(
-                    "netdata" // type
-                    , "ml_models_consulted" // id
-                    , NULL // name
-                    , NETDATA_ML_CHART_FAMILY // family
-                    , NULL // context
-                    , "KMeans models used for prediction" // title
-                    , "models" // units
-                    , NETDATA_ML_PLUGIN // plugin
-                    , NETDATA_ML_MODULE_DETECTION // module
-                    , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority
-                    , localhost->rrd_update_every // update_every
-                    , RRDSET_TYPE_AREA // chart_type
-            );
-
-            rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
-        }
-
-        rrddim_set_by_pointer(st, rd, (collected_number) gs.ml_models_consulted);
-
-        rrdset_done(st);
-    }
+    ml_update_global_statistics_charts(gs.ml_models_consulted);
 }
 
 // ----------------------------------------------------------------------------

+ 9 - 9
daemon/main.c

@@ -148,10 +148,6 @@ static void service_to_buffer(BUFFER *wb, SERVICE_TYPE service) {
         buffer_strcat(wb, "MAINTENANCE ");
     if(service & SERVICE_COLLECTORS)
         buffer_strcat(wb, "COLLECTORS ");
-    if(service & SERVICE_ML_TRAINING)
-        buffer_strcat(wb, "ML_TRAINING ");
-    if(service & SERVICE_ML_PREDICTION)
-        buffer_strcat(wb, "ML_PREDICTION ");
     if(service & SERVICE_REPLICATION)
         buffer_strcat(wb, "REPLICATION ");
     if(service & ABILITY_DATA_QUERIES)
@@ -340,6 +336,11 @@ void netdata_cleanup_and_exit(int ret) {
     }
 #endif
 
+    delta_shutdown_time("disable ML detection and training threads");
+
+    ml_stop_threads();
+    ml_fini();
+
     delta_shutdown_time("disable maintenance, new queries, new web requests, new streaming connections and aclk");
 
     service_signal_exit(
@@ -351,12 +352,11 @@ void netdata_cleanup_and_exit(int ret) {
             | SERVICE_ACLKSYNC
             );
 
-    delta_shutdown_time("stop replication, exporters, ML training, health and web servers threads");
+    delta_shutdown_time("stop replication, exporters, health and web servers threads");
 
     timeout = !service_wait_exit(
             SERVICE_REPLICATION
             | SERVICE_EXPORTERS
-            | SERVICE_ML_TRAINING
             | SERVICE_HEALTH
             | SERVICE_WEB_SERVER
             , 3 * USEC_PER_SEC);
@@ -368,11 +368,10 @@ void netdata_cleanup_and_exit(int ret) {
             | SERVICE_STREAMING
             , 3 * USEC_PER_SEC);
 
-    delta_shutdown_time("stop ML prediction and context threads");
+    delta_shutdown_time("stop context thread");
 
     timeout = !service_wait_exit(
-            SERVICE_ML_PREDICTION
-            | SERVICE_CONTEXT
+            SERVICE_CONTEXT
             , 3 * USEC_PER_SEC);
 
     delta_shutdown_time("stop maintenance thread");
@@ -2085,6 +2084,7 @@ int main(int argc, char **argv) {
         }
         else debug(D_SYSTEM, "Not starting thread %s.", st->name);
     }
+    ml_start_threads();
 
     // ------------------------------------------------------------------------
     // Initialize netdata agent command serving from cli and signals

+ 9 - 11
daemon/main.h

@@ -33,17 +33,15 @@ typedef enum {
     ABILITY_STREAMING_CONNECTIONS = (1 << 2),
     SERVICE_MAINTENANCE           = (1 << 3),
     SERVICE_COLLECTORS            = (1 << 4),
-    SERVICE_ML_TRAINING           = (1 << 5),
-    SERVICE_ML_PREDICTION         = (1 << 6),
-    SERVICE_REPLICATION           = (1 << 7),
-    SERVICE_WEB_SERVER            = (1 << 8),
-    SERVICE_ACLK                  = (1 << 9),
-    SERVICE_HEALTH                = (1 << 10),
-    SERVICE_STREAMING             = (1 << 11),
-    SERVICE_CONTEXT               = (1 << 12),
-    SERVICE_ANALYTICS             = (1 << 13),
-    SERVICE_EXPORTERS             = (1 << 14),
-    SERVICE_ACLKSYNC              = (1 << 15)
+    SERVICE_REPLICATION           = (1 << 5),
+    SERVICE_WEB_SERVER            = (1 << 6),
+    SERVICE_ACLK                  = (1 << 7),
+    SERVICE_HEALTH                = (1 << 8),
+    SERVICE_STREAMING             = (1 << 9),
+    SERVICE_CONTEXT               = (1 << 10),
+    SERVICE_ANALYTICS             = (1 << 11),
+    SERVICE_EXPORTERS             = (1 << 12),
+    SERVICE_ACLKSYNC              = (1 << 13)
 } SERVICE_TYPE;
 
 typedef enum {

+ 0 - 3
database/rrdhost.c

@@ -524,7 +524,6 @@ int is_legacy = 1;
         rrdhost_load_rrdcontext_data(host);
 //        rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
         ml_host_new(host);
-        ml_host_start_training_thread(host);
     } else
         rrdhost_flag_set(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN);
 
@@ -641,7 +640,6 @@ static void rrdhost_update(RRDHOST *host
         host->rrdpush_replication_step = rrdpush_replication_step;
 
         ml_host_new(host);
-        ml_host_start_training_thread(host);
         
         rrdhost_load_rrdcontext_data(host);
         info("Host %s is not in archived mode anymore", rrdhost_hostname(host));
@@ -1143,7 +1141,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) {
     rrdcalctemplate_index_destroy(host);
 
     // cleanup ML resources
-    ml_host_stop_training_thread(host);
     ml_host_delete(host);
 
     freez(host->exporting_flags);

+ 11 - 1
ml/Config.cc

@@ -34,7 +34,7 @@ void ml_config_load(ml_config_t *cfg) {
     unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3);
     unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5);
 
-    double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n);
+    double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */);
     unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000);
 
     double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99);
@@ -43,6 +43,10 @@ void ml_config_load(ml_config_t *cfg) {
     std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average");
     time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60);
 
+    size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4);
+
+    bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", false);
+
     /*
      * Clamp
      */
@@ -64,6 +68,8 @@ void ml_config_load(ml_config_t *cfg) {
     host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0);
     anomaly_detection_query_duration = clamp<time_t>(anomaly_detection_query_duration, 60, 15 * 60);
 
+    num_training_threads = clamp<size_t>(num_training_threads, 1, 128);
+
     /*
      * Validate
      */
@@ -109,4 +115,8 @@ void ml_config_load(ml_config_t *cfg) {
     cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true);
 
     cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true);
+
+    cfg->num_training_threads = num_training_threads;
+
+    cfg->enable_statistics_charts = enable_statistics_charts;
 }

+ 98 - 69
ml/ad_charts.cc

@@ -6,7 +6,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
     /*
      * Machine learning status
     */
-    {
+    if (Cfg.enable_statistics_charts) {
         if (!host->machine_learning_status_rs) {
             char id_buf[1024];
             char name_buf[1024];
@@ -48,7 +48,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
     /*
      * Metric type
     */
-    {
+    if (Cfg.enable_statistics_charts) {
         if (!host->metric_type_rs) {
             char id_buf[1024];
             char name_buf[1024];
@@ -90,7 +90,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
     /*
      * Training status
     */
-    {
+    if (Cfg.enable_statistics_charts) {
         if (!host->training_status_rs) {
             char id_buf[1024];
             char name_buf[1024];
@@ -179,7 +179,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
 
         rrdset_done(host->dimensions_rs);
     }
-
 }
 
 void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
@@ -301,20 +300,20 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
     }
 }
 
-void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) {
+void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) {
     /*
      * queue stats
     */
     {
-        if (!host->queue_stats_rs) {
+        if (!training_thread->queue_stats_rs) {
             char id_buf[1024];
             char name_buf[1024];
 
-            snprintfz(id_buf, 1024, "queue_stats_on_%s", localhost->machine_guid);
-            snprintfz(name_buf, 1024, "queue_stats_on_%s", rrdhost_hostname(localhost));
+            snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id);
+            snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id);
 
-            host->queue_stats_rs = rrdset_create(
-                    host->rh,
+            training_thread->queue_stats_rs = rrdset_create(
+                    localhost,
                     "netdata", // type
                     id_buf, // id
                     name_buf, // name
@@ -328,35 +327,35 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
                     localhost->rrd_update_every, // update_every
                     RRDSET_TYPE_LINE// chart_type
             );
-            rrdset_flag_set(host->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+            rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
 
-            host->queue_stats_queue_size_rd =
-                rrddim_add(host->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
-            host->queue_stats_popped_items_rd =
-                rrddim_add(host->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->queue_stats_queue_size_rd =
+                rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->queue_stats_popped_items_rd =
+                rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
         }
 
-        rrddim_set_by_pointer(host->queue_stats_rs,
-                              host->queue_stats_queue_size_rd, ts.queue_size);
-        rrddim_set_by_pointer(host->queue_stats_rs,
-                              host->queue_stats_popped_items_rd, ts.num_popped_items);
+        rrddim_set_by_pointer(training_thread->queue_stats_rs,
+                              training_thread->queue_stats_queue_size_rd, ts.queue_size);
+        rrddim_set_by_pointer(training_thread->queue_stats_rs,
+                              training_thread->queue_stats_popped_items_rd, ts.num_popped_items);
 
-        rrdset_done(host->queue_stats_rs);
+        rrdset_done(training_thread->queue_stats_rs);
     }
 
     /*
      * training stats
     */
     {
-        if (!host->training_time_stats_rs) {
+        if (!training_thread->training_time_stats_rs) {
             char id_buf[1024];
             char name_buf[1024];
 
-            snprintfz(id_buf, 1024, "training_time_stats_on_%s", localhost->machine_guid);
-            snprintfz(name_buf, 1024, "training_time_stats_on_%s", rrdhost_hostname(localhost));
+            snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
+            snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
 
-            host->training_time_stats_rs = rrdset_create(
-                    host->rh,
+            training_thread->training_time_stats_rs = rrdset_create(
+                    localhost,
                     "netdata", // type
                     id_buf, // id
                     name_buf, // name
@@ -370,39 +369,39 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
                     localhost->rrd_update_every, // update_every
                     RRDSET_TYPE_LINE// chart_type
             );
-            rrdset_flag_set(host->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
-            host->training_time_stats_allotted_rd =
-                rrddim_add(host->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
-            host->training_time_stats_consumed_rd =
-                rrddim_add(host->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
-            host->training_time_stats_remaining_rd =
-                rrddim_add(host->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+            rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+            training_thread->training_time_stats_allotted_rd =
+                rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_time_stats_consumed_rd =
+                rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_time_stats_remaining_rd =
+                rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
         }
 
-        rrddim_set_by_pointer(host->training_time_stats_rs,
-                              host->training_time_stats_allotted_rd, ts.allotted_ut);
-        rrddim_set_by_pointer(host->training_time_stats_rs,
-                              host->training_time_stats_consumed_rd, ts.consumed_ut);
-        rrddim_set_by_pointer(host->training_time_stats_rs,
-                              host->training_time_stats_remaining_rd, ts.remaining_ut);
+        rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+                              training_thread->training_time_stats_allotted_rd, ts.allotted_ut);
+        rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+                              training_thread->training_time_stats_consumed_rd, ts.consumed_ut);
+        rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+                              training_thread->training_time_stats_remaining_rd, ts.remaining_ut);
 
-        rrdset_done(host->training_time_stats_rs);
+        rrdset_done(training_thread->training_time_stats_rs);
     }
 
     /*
      * training result stats
     */
     {
-        if (!host->training_results_rs) {
+        if (!training_thread->training_results_rs) {
             char id_buf[1024];
             char name_buf[1024];
 
-            snprintfz(id_buf, 1024, "training_results_on_%s", localhost->machine_guid);
-            snprintfz(name_buf, 1024, "training_results_on_%s", rrdhost_hostname(localhost));
+            snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id);
+            snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id);
 
-            host->training_results_rs = rrdset_create(
-                    host->rh,
+            training_thread->training_results_rs = rrdset_create(
+                    localhost,
                     "netdata", // type
                     id_buf, // id
                     name_buf, // name
@@ -416,31 +415,61 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
                     localhost->rrd_update_every, // update_every
                     RRDSET_TYPE_LINE// chart_type
             );
-            rrdset_flag_set(host->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
-            host->training_results_ok_rd =
-                rrddim_add(host->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
-            host->training_results_invalid_query_time_range_rd =
-                rrddim_add(host->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
-            host->training_results_not_enough_collected_values_rd =
-                rrddim_add(host->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
-            host->training_results_null_acquired_dimension_rd =
-                rrddim_add(host->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
-            host->training_results_chart_under_replication_rd =
-                rrddim_add(host->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+            training_thread->training_results_ok_rd =
+                rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_results_invalid_query_time_range_rd =
+                rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_results_not_enough_collected_values_rd =
+                rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_results_null_acquired_dimension_rd =
+                rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+            training_thread->training_results_chart_under_replication_rd =
+                rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
         }
 
-        rrddim_set_by_pointer(host->training_results_rs,
-                              host->training_results_ok_rd, ts.training_result_ok);
-        rrddim_set_by_pointer(host->training_results_rs,
-                              host->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
-        rrddim_set_by_pointer(host->training_results_rs,
-                              host->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
-        rrddim_set_by_pointer(host->training_results_rs,
-                              host->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
-        rrddim_set_by_pointer(host->training_results_rs,
-                              host->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
-
-        rrdset_done(host->training_results_rs);
+        rrddim_set_by_pointer(training_thread->training_results_rs,
+                              training_thread->training_results_ok_rd, ts.training_result_ok);
+        rrddim_set_by_pointer(training_thread->training_results_rs,
+                              training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
+        rrddim_set_by_pointer(training_thread->training_results_rs,
+                              training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
+        rrddim_set_by_pointer(training_thread->training_results_rs,
+                              training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
+        rrddim_set_by_pointer(training_thread->training_results_rs,
+                              training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
+
+        rrdset_done(training_thread->training_results_rs);
+    }
+}
+
+void ml_update_global_statistics_charts(uint64_t models_consulted) {
+    if (Cfg.enable_statistics_charts) {
+        static RRDSET *st = NULL;
+        static RRDDIM *rd = NULL;
+
+        if (unlikely(!st)) {
+            st = rrdset_create_localhost(
+                    "netdata" // type
+                    , "ml_models_consulted" // id
+                    , NULL // name
+                    , NETDATA_ML_CHART_FAMILY // family
+                    , NULL // context
+                    , "KMeans models used for prediction" // title
+                    , "models" // units
+                    , NETDATA_ML_PLUGIN // plugin
+                    , NETDATA_ML_MODULE_DETECTION // module
+                    , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority
+                    , localhost->rrd_update_every // update_every
+                    , RRDSET_TYPE_AREA // chart_type
+            );
+
+            rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+        }
+
+        rrddim_set_by_pointer(st, rd, (collected_number) models_consulted);
+
+        rrdset_done(st);
     }
 }

+ 1 - 1
ml/ad_charts.h

@@ -9,6 +9,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
 
 void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate);
 
-void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts);
+void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts);
 
 #endif /* ML_ADCHARTS_H */

+ 10 - 0
ml/ml-dummy.c

@@ -19,6 +19,12 @@ bool ml_streaming_enabled() {
 
 void ml_init(void) {}
 
+void ml_fini(void) {}
+
+void ml_start_threads(void) {}
+
+void ml_stop_threads(void) {}
+
 void ml_host_new(RRDHOST *rh) {
     UNUSED(rh);
 }
@@ -86,4 +92,8 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
     return false;
 }
 
+void ml_update_global_statistics_charts(uint64_t models_consulted) {
+    UNUSED(models_consulted);
+}
+
 #endif

+ 29 - 13
ml/ml-private.h

@@ -33,14 +33,15 @@ typedef struct {
 /*
  * KMeans
  */
-typedef struct {
-    size_t num_clusters;
-    size_t max_iterations;
 
+typedef struct {
     std::vector<DSample> cluster_centers;
 
     calculated_number_t min_dist;
     calculated_number_t max_dist;
+
+    uint32_t after;
+    uint32_t before;
 } ml_kmeans_t;
 
 typedef struct machine_learning_stats_t {
@@ -123,6 +124,7 @@ enum ml_training_result {
 
 typedef struct {
     // Chart/dimension we want to train
+    STRING *host_id;
     STRING *chart_id;
     STRING *dimension_id;
 
@@ -168,6 +170,7 @@ typedef struct {
 /*
  * Queue
 */
+
 typedef struct {
     std::queue<ml_training_request_t> internal;
     netdata_mutex_t mutex;
@@ -175,7 +178,6 @@ typedef struct {
     std::atomic<bool> exit;
 } ml_queue_t;
 
-
 typedef struct {
     RRDDIM *rd;
 
@@ -207,19 +209,12 @@ typedef struct {
     RRDHOST *rh;
 
     ml_machine_learning_stats_t mls;
-    ml_training_stats_t ts;
 
     calculated_number_t host_anomaly_rate;
 
-    std::atomic<bool> threads_running;
-    std::atomic<bool> threads_cancelled;
-    std::atomic<bool> threads_joined;
-
-    ml_queue_t *training_queue;
-
     netdata_mutex_t mutex;
 
-    netdata_thread_t training_thread;
+    ml_queue_t *training_queue;
 
     /*
      * bookkeeping for anomaly detection charts
@@ -249,6 +244,19 @@ typedef struct {
     RRDSET *detector_events_rs;
     RRDDIM *detector_events_above_threshold_rd;
     RRDDIM *detector_events_new_anomaly_event_rd;
+} ml_host_t;
+
+typedef struct {
+    size_t id;
+    netdata_thread_t nd_thread;
+    netdata_mutex_t nd_mutex;
+
+    ml_queue_t *training_queue;
+    ml_training_stats_t training_stats;
+
+    calculated_number_t *training_cns;
+    calculated_number_t *scratch_training_cns;
+    std::vector<DSample> training_samples;
 
     RRDSET *queue_stats_rs;
     RRDDIM *queue_stats_queue_size_rd;
@@ -265,7 +273,7 @@ typedef struct {
     RRDDIM *training_results_not_enough_collected_values_rd;
     RRDDIM *training_results_null_acquired_dimension_rd;
     RRDDIM *training_results_chart_under_replication_rd;
-} ml_host_t;
+} ml_training_thread_t;
 
 typedef struct {
     bool enable_anomaly_detection;
@@ -302,6 +310,14 @@ typedef struct {
     std::vector<uint32_t> random_nums;
 
     netdata_thread_t detection_thread;
+    std::atomic<bool> detection_stop;
+
+    size_t num_training_threads;
+
+    std::vector<ml_training_thread_t> training_threads;
+    std::atomic<bool> training_stop;
+
+    bool enable_statistics_charts;
 } ml_config_t;
 
 void ml_config_load(ml_config_t *cfg);

+ 599 - 293
ml/ml.cc

@@ -7,15 +7,18 @@
 #include <random>
 
 #include "ad_charts.h"
+#include "database/sqlite/sqlite3.h"
 
-typedef struct {
-    calculated_number_t *training_cns;
-    calculated_number_t *scratch_training_cns;
-
-    std::vector<DSample> training_samples;
-} ml_tls_data_t;
+#define WORKER_TRAIN_QUEUE_POP         0
+#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
+#define WORKER_TRAIN_QUERY             2
+#define WORKER_TRAIN_KMEANS            3
+#define WORKER_TRAIN_UPDATE_MODELS     4
+#define WORKER_TRAIN_RELEASE_DIMENSION 5
+#define WORKER_TRAIN_UPDATE_HOST       6
+#define WORKER_TRAIN_LOAD_MODELS       7
 
-static thread_local ml_tls_data_t tls_data;
+static sqlite3 *db = NULL;
 
 /*
  * Functions to convert enums to strings
@@ -173,26 +176,26 @@ ml_features_preprocess(ml_features_t *features)
 */
 
 static void
-ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations)
+ml_kmeans_init(ml_kmeans_t *kmeans)
 {
-    kmeans->num_clusters = num_clusters;
-    kmeans->max_iterations = max_iterations;
-
-    kmeans->cluster_centers.reserve(kmeans->num_clusters);
+    kmeans->cluster_centers.reserve(2);
     kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
     kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
 }
 
 static void
-ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
 {
+    kmeans->after = (uint32_t) after;
+    kmeans->before = (uint32_t) before;
+
     kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
     kmeans->max_dist  = std::numeric_limits<calculated_number_t>::min();
 
     kmeans->cluster_centers.clear();
 
-    dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features);
-    dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations);
+    dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
+    dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
 
     for (const auto &preprocessed_feature : features->preprocessed_features) {
         calculated_number_t mean_dist = 0.0;
@@ -201,7 +204,7 @@ ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
             mean_dist += dlib::length(cluster_center - preprocessed_feature);
         }
 
-        mean_dist /= kmeans->num_clusters;
+        mean_dist /= kmeans->cluster_centers.size();
 
         if (mean_dist < kmeans->min_dist)
             kmeans->min_dist = mean_dist;
@@ -218,7 +221,7 @@ ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
     for (const auto &CC: kmeans->cluster_centers)
         mean_dist += dlib::length(CC - DS);
 
-    mean_dist /= kmeans->num_clusters;
+    mean_dist /= kmeans->cluster_centers.size();
 
     if (kmeans->max_dist == kmeans->min_dist)
         return 0.0;
@@ -264,7 +267,14 @@ ml_queue_pop(ml_queue_t *q)
 {
     netdata_mutex_lock(&q->mutex);
 
-    ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+    ml_training_request_t req = {
+        NULL, // host_id
+        NULL, // chart id
+        NULL, // dimension id
+        0, // current time
+        0, // first entry
+        0  // last entry
+    };
 
     while (q->internal.empty()) {
         pthread_cond_wait(&q->cond_var, &q->mutex);
@@ -307,7 +317,7 @@ ml_queue_signal(ml_queue_t *q)
 */
 
 static std::pair<calculated_number_t *, ml_training_response_t>
-ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
 {
     ml_training_response_t training_response = {};
 
@@ -348,7 +358,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
               STORAGE_PRIORITY_BEST_EFFORT);
 
     size_t idx = 0;
-    memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+    memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
     calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
 
     while (!storage_engine_query_is_finished(&handle)) {
@@ -365,11 +375,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
                 training_response.db_after_t = timestamp;
             training_response.db_before_t = timestamp;
 
-            tls_data.training_cns[idx] = value;
-            last_value = tls_data.training_cns[idx];
+            training_thread->training_cns[idx] = value;
+            last_value = training_thread->training_cns[idx];
             training_response.collected_values++;
         } else
-            tls_data.training_cns[idx] = last_value;
+            training_thread->training_cns[idx] = last_value;
 
         idx++;
     }
@@ -384,20 +394,270 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
     }
 
     // Find first non-NaN value.
-    for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+    for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
 
     // Overwrite NaN values.
     if (idx != 0)
-        memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+        memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
 
     training_response.result = TRAINING_RESULT_OK;
-    return { tls_data.training_cns, training_response };
+    return { training_thread->training_cns, training_response };
+}
+
+const char *db_models_create_table =
+    "CREATE TABLE IF NOT EXISTS models("
+    "    dim_id BLOB, dim_str TEXT, after INT, before INT,"
+    "    min_dist REAL, max_dist REAL,"
+    "    c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
+    "    c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
+    "    PRIMARY KEY(dim_id, after)"
+    ");";
+
+const char *db_models_add_model =
+    "INSERT OR REPLACE INTO models("
+    "    dim_id, dim_str, after, before,"
+    "    min_dist, max_dist,"
+    "    c00, c01, c02, c03, c04, c05,"
+    "    c10, c11, c12, c13, c14, c15)"
+    "VALUES("
+    "    @dim_id, @dim_str, @after, @before,"
+    "    @min_dist, @max_dist,"
+    "    @c00, @c01, @c02, @c03, @c04, @c05,"
+    "    @c10, @c11, @c12, @c13, @c14, @c15);";
+
+const char *db_models_load =
+    "SELECT * FROM models "
+    "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;";
+
+const char *db_models_delete =
+    "DELETE FROM models "
+    "WHERE dim_id = @dim_id AND before < @before;";
+
+static int
+ml_dimension_add_model(ml_dimension_t *dim)
+{
+    static __thread sqlite3_stmt *res = NULL;
+    int param = 0;
+    int rc = 0;
+
+    if (unlikely(!db)) {
+        error_report("Database has not been initialized");
+        return 1;
+    }
+
+    if (unlikely(!res)) {
+        rc = prepare_statement(db, db_models_add_model, &res);
+        if (unlikely(rc != SQLITE_OK)) {
+            error_report("Failed to prepare statement to store model, rc = %d", rc);
+            return 1;
+        }
+    }
+
+    rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    char id[1024];
+    snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd));
+    rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    if (dim->kmeans.cluster_centers.size() != 2)
+        fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size());
+
+    for (const DSample &ds : dim->kmeans.cluster_centers) {
+        if (ds.size() != 6)
+            fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
+
+        for (long idx = 0; idx != ds.size(); idx++) {
+            calculated_number_t cn = ds(idx);
+            int rc = sqlite3_bind_double(res, ++param, cn);
+            if (unlikely(rc != SQLITE_OK))
+                goto bind_fail;
+        }
+    }
+
+    rc = execute_insert(res);
+    if (unlikely(rc != SQLITE_DONE))
+        error_report("Failed to store model, rc = %d", rc);
+
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement when storing model, rc = %d", rc);
+
+    return 0;
+
+bind_fail:
+    error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement to store model, rc = %d", rc);
+    return 1;
+}
+
+static int
+ml_dimension_delete_models(ml_dimension_t *dim)
+{
+    static __thread sqlite3_stmt *res = NULL;
+    int rc = 0;
+    int param = 0;
+
+    if (unlikely(!db)) {
+        error_report("Database has not been initialized");
+        return 1;
+    }
+
+    if (unlikely(!res)) {
+        rc = prepare_statement(db, db_models_delete, &res);
+        if (unlikely(rc != SQLITE_OK)) {
+            error_report("Failed to prepare statement to delete models, rc = %d", rc);
+            return 1;
+        }
+    }
+
+    rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = execute_insert(res);
+    if (unlikely(rc != SQLITE_DONE))
+        error_report("Failed to delete models, rc = %d", rc);
+
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement when deleting models, rc = %d", rc);
+
+    return 0;
+
+bind_fail:
+    error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement to delete models, rc = %d", rc);
+    return 1;
+}
+
+static int
+ml_dimension_load_models(ml_dimension_t *dim) {
+    std::vector<ml_kmeans_t> V;
+
+    static __thread sqlite3_stmt *res = NULL;
+    int rc = 0;
+    int param = 0;
+
+    if (unlikely(!db)) {
+        error_report("Database has not been initialized");
+        return 1;
+    }
+
+    if (unlikely(!res)) {
+        rc = prepare_statement(db, db_models_load, &res);
+        if (unlikely(rc != SQLITE_OK)) {
+            error_report("Failed to prepare statement to load models, rc = %d", rc);
+            return 1;
+        }
+    }
+
+    rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
+    if (unlikely(rc != SQLITE_OK))
+        goto bind_fail;
+
+    dim->km_contexts.reserve(Cfg.num_models_to_use);
+    while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
+        ml_kmeans_t km;
+
+        km.after = sqlite3_column_int(res, 2);
+        km.before = sqlite3_column_int(res, 3);
+
+        km.min_dist = sqlite3_column_int(res, 4);
+        km.max_dist = sqlite3_column_int(res, 5);
+
+        km.cluster_centers.resize(2);
+
+        km.cluster_centers[0].set_size(Cfg.lag_n + 1);
+        km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
+        km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
+        km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
+        km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
+        km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
+        km.cluster_centers[0](5) = sqlite3_column_double(res, 11);
+
+        km.cluster_centers[1].set_size(Cfg.lag_n + 1);
+        km.cluster_centers[1](0) = sqlite3_column_double(res, 12);
+        km.cluster_centers[1](1) = sqlite3_column_double(res, 13);
+        km.cluster_centers[1](2) = sqlite3_column_double(res, 14);
+        km.cluster_centers[1](3) = sqlite3_column_double(res, 15);
+        km.cluster_centers[1](4) = sqlite3_column_double(res, 16);
+        km.cluster_centers[1](5) = sqlite3_column_double(res, 17);
+
+        dim->km_contexts.push_back(km);
+    }
+
+    if (unlikely(rc != SQLITE_DONE))
+        error_report("Failed to load models, rc = %d", rc);
+
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement when loading models, rc = %d", rc);
+
+    return 0;
+
+bind_fail:
+    error_report("Failed to bind parameter %d to load models, rc = %d", param, rc);
+    rc = sqlite3_reset(res);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Failed to reset statement to load models, rc = %d", rc);
+    return 1;
+}
+
+static int
+ml_dimension_update_models(ml_dimension_t *dim)
+{
+    int rc;
+
+    if (dim->km_contexts.empty()) {
+        rc = ml_dimension_load_models(dim);
+        if (rc)
+            return rc;
+    }
+
+    rc = ml_dimension_add_model(dim);
+    if (rc)
+        return rc;
+
+    return ml_dimension_delete_models(dim);
 }
 
 static enum ml_training_result
-ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
 {
-    auto P = ml_dimension_calculated_numbers(dim, training_request);
+    worker_is_busy(WORKER_TRAIN_QUERY);
+    auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
     ml_training_response_t training_response = P.second;
 
     if (training_response.result != TRAINING_RESULT_OK) {
@@ -426,31 +686,56 @@ ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &train
     }
 
     // compute kmeans
+    worker_is_busy(WORKER_TRAIN_KMEANS);
     {
-        memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
+        memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
                training_response.total_values * sizeof(calculated_number_t));
 
         ml_features_t features = {
             Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
-            tls_data.scratch_training_cns, training_response.total_values,
-            tls_data.training_cns, training_response.total_values,
-            tls_data.training_samples
+            training_thread->scratch_training_cns, training_response.total_values,
+            training_thread->training_cns, training_response.total_values,
+            training_thread->training_samples
         };
         ml_features_preprocess(&features);
 
-        ml_kmeans_init(&dim->kmeans, 2, 1000);
-        ml_kmeans_train(&dim->kmeans, &features);
+        ml_kmeans_init(&dim->kmeans);
+        ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t);
     }
 
-    // update kmeans models
+    // update models
     {
         netdata_mutex_lock(&dim->mutex);
 
+        worker_is_busy(WORKER_TRAIN_LOAD_MODELS);
+
+        int rc = ml_dimension_update_models(dim);
+        if (rc) {
+            error("Failed to update models for %s [%u, %u]", rrddim_id(dim->rd), dim->kmeans.after, dim->kmeans.before);
+        }
+
+        worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
+
         if (dim->km_contexts.size() < Cfg.num_models_to_use) {
             dim->km_contexts.push_back(std::move(dim->kmeans));
         } else {
-            std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
-            dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+            bool can_drop_middle_km = false;
+
+            if (Cfg.num_models_to_use > 2) {
+                const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1];
+                const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2];
+                const ml_kmeans_t *new_km = &dim->kmeans;
+
+                can_drop_middle_km = (middle_km->after < old_km->before) &&
+                                     (middle_km->before > new_km->after);
+            }
+
+            if (can_drop_middle_km) {
+                dim->km_contexts.back() = dim->kmeans;
+            } else {
+                std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
+                dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+            }
         }
 
         dim->mt = METRIC_TYPE_CONSTANT;
@@ -494,11 +779,16 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
     }
 
     if (schedule_for_training) {
-        ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
         ml_training_request_t req = {
-            string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
-            curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
+            string_dup(dim->rd->rrdset->rrdhost->hostname),
+            string_dup(dim->rd->rrdset->id),
+            string_dup(dim->rd->id),
+            curr_time,
+            rrddim_first_entry_s(dim->rd),
+            rrddim_last_entry_s(dim->rd),
         };
+
+        ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
         ml_queue_push(host->training_queue, req);
     }
 }
@@ -674,7 +964,6 @@ ml_host_detect_once(ml_host_t *host)
 
     host->mls = {};
     ml_machine_learning_stats_t mls_copy = {};
-    ml_training_stats_t ts_copy = {};
 
     {
         netdata_mutex_lock(&host->mutex);
@@ -718,54 +1007,14 @@ ml_host_detect_once(ml_host_t *host)
 
         mls_copy = host->mls;
 
-        /*
-         * training stats
-        */
-        ts_copy = host->ts;
-
-        host->ts.queue_size = 0;
-        host->ts.num_popped_items = 0;
-
-        host->ts.allotted_ut = 0;
-        host->ts.consumed_ut = 0;
-        host->ts.remaining_ut = 0;
-
-        host->ts.training_result_ok = 0;
-        host->ts.training_result_invalid_query_time_range = 0;
-        host->ts.training_result_not_enough_collected_values = 0;
-        host->ts.training_result_null_acquired_dimension = 0;
-        host->ts.training_result_chart_under_replication = 0;
-
         netdata_mutex_unlock(&host->mutex);
     }
 
-    // Calc the avg values
-    if (ts_copy.num_popped_items) {
-        ts_copy.queue_size /= ts_copy.num_popped_items;
-        ts_copy.allotted_ut /= ts_copy.num_popped_items;
-        ts_copy.consumed_ut /= ts_copy.num_popped_items;
-        ts_copy.remaining_ut /= ts_copy.num_popped_items;
-
-        ts_copy.training_result_ok /= ts_copy.num_popped_items;
-        ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items;
-        ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items;
-        ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items;
-        ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items;
-    } else {
-        ts_copy.queue_size = 0;
-        ts_copy.allotted_ut = 0;
-        ts_copy.consumed_ut = 0;
-        ts_copy.remaining_ut = 0;
-    }
-
     worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
     ml_update_dimensions_chart(host, mls_copy);
 
     worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
     ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
-
-    worker_is_busy(WORKER_JOB_DETECTION_STATS);
-    ml_update_training_statistics_chart(host, ts_copy);
 }
 
 typedef struct {
@@ -774,18 +1023,21 @@ typedef struct {
 } ml_acquired_dimension_t;
 
 static ml_acquired_dimension_t
-ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id)
+ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id)
 {
     RRDDIM_ACQUIRED *acq_rd = NULL;
     ml_dimension_t *dim = NULL;
 
-    RRDSET *rs = rrdset_find(rh, string2str(chart_id));
-    if (rs) {
-        acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
-        if (acq_rd) {
-            RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
-            if (rd)
-                dim = (ml_dimension_t *) rd->ml_dimension;
+    RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id));
+    if (rh) {
+        RRDSET *rs = rrdset_find(rh, string2str(chart_id));
+        if (rs) {
+            acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+            if (acq_rd) {
+                RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+                if (rd)
+                    dim = (ml_dimension_t *) rd->ml_dimension;
+            }
         }
     }
 
@@ -806,110 +1058,12 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
 }
 
 static enum ml_training_result
-ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR)
+ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
 {
     if (!acq_dim.dim)
         return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
 
-    return ml_dimension_train_model(acq_dim.dim, TR);
-}
-
-#define WORKER_JOB_TRAINING_FIND 0
-#define WORKER_JOB_TRAINING_TRAIN 1
-#define WORKER_JOB_TRAINING_STATS 2
-
-static void
-ml_host_train(ml_host_t *host)
-{
-    worker_register("MLTRAIN");
-    worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
-    worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
-    worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
-
-    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true);
-
-    while (service_running(SERVICE_ML_TRAINING)) {
-        ml_training_request_t training_req = ml_queue_pop(host->training_queue);
-        size_t queue_size = ml_queue_size(host->training_queue) + 1;
-
-        if (host->threads_cancelled) {
-            info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh));
-            break;
-        }
-
-        usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size;
-        if (allotted_ut > USEC_PER_SEC)
-            allotted_ut = USEC_PER_SEC;
-
-        usec_t start_ut = now_monotonic_usec();
-        enum ml_training_result training_res;
-        {
-            worker_is_busy(WORKER_JOB_TRAINING_FIND);
-            ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id);
-
-            worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
-            training_res = ml_acquired_dimension_train(acq_dim, training_req);
-
-            string_freez(training_req.chart_id);
-            string_freez(training_req.dimension_id);
-
-            ml_acquired_dimension_release(acq_dim);
-        }
-        usec_t consumed_ut = now_monotonic_usec() - start_ut;
-
-        worker_is_busy(WORKER_JOB_TRAINING_STATS);
-
-        usec_t remaining_ut = 0;
-        if (consumed_ut < allotted_ut)
-            remaining_ut = allotted_ut - consumed_ut;
-
-        {
-            netdata_mutex_lock(&host->mutex);
-
-            host->ts.queue_size += queue_size;
-            host->ts.num_popped_items += 1;
-
-            host->ts.allotted_ut += allotted_ut;
-            host->ts.consumed_ut += consumed_ut;
-            host->ts.remaining_ut += remaining_ut;
-
-            switch (training_res) {
-                case TRAINING_RESULT_OK:
-                    host->ts.training_result_ok += 1;
-                    break;
-                case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
-                    host->ts.training_result_invalid_query_time_range += 1;
-                    break;
-                case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
-                    host->ts.training_result_not_enough_collected_values += 1;
-                    break;
-                case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
-                    host->ts.training_result_null_acquired_dimension += 1;
-                    break;
-                case TRAINING_RESULT_CHART_UNDER_REPLICATION:
-                    host->ts.training_result_chart_under_replication += 1;
-                    break;
-            }
-
-            netdata_mutex_unlock(&host->mutex);
-        }
-
-        worker_is_idle();
-        std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
-        worker_is_busy(0);
-    }
-}
-
-static void *
-train_main(void *arg)
-{
-    size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
-    tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training]();
-    tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
-
-    ml_host_t *host = (ml_host_t *) arg;
-    ml_host_train(host);
-    return NULL;
+    return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
 }
 
 static void *
@@ -923,25 +1077,55 @@ ml_detect_main(void *arg)
     worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
     worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
 
-    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true);
-
     heartbeat_t hb;
     heartbeat_init(&hb);
 
-    while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
+    while (!Cfg.detection_stop) {
         worker_is_idle();
         heartbeat_next(&hb, USEC_PER_SEC);
 
-        void *rhp;
-        dfe_start_reentrant(rrdhost_root_index, rhp) {
-            RRDHOST *rh = (RRDHOST *) rhp;
-
+        RRDHOST *rh;
+        rrd_rdlock();
+        rrdhost_foreach_read(rh) {
             if (!rh->ml_host)
                 continue;
 
             ml_host_detect_once((ml_host_t *) rh->ml_host);
         }
-        dfe_done(rhp);
+        rrd_unlock();
+
+        if (Cfg.enable_statistics_charts) {
+            // collect and update training thread stats
+            for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+                ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+                netdata_mutex_lock(&training_thread->nd_mutex);
+                ml_training_stats_t training_stats = training_thread->training_stats;
+                training_thread->training_stats = {};
+                netdata_mutex_unlock(&training_thread->nd_mutex);
+
+                // calc the avg values
+                if (training_stats.num_popped_items) {
+                    training_stats.queue_size /= training_stats.num_popped_items;
+                    training_stats.allotted_ut /= training_stats.num_popped_items;
+                    training_stats.consumed_ut /= training_stats.num_popped_items;
+                    training_stats.remaining_ut /= training_stats.num_popped_items;
+                } else {
+                    training_stats.queue_size = 0;
+                    training_stats.allotted_ut = 0;
+                    training_stats.consumed_ut = 0;
+                    training_stats.remaining_ut = 0;
+
+                    training_stats.training_result_ok = 0;
+                    training_stats.training_result_invalid_query_time_range = 0;
+                    training_stats.training_result_not_enough_collected_values = 0;
+                    training_stats.training_result_null_acquired_dimension = 0;
+                    training_stats.training_result_chart_under_replication = 0;
+                }
+
+                ml_update_training_statistics_chart(training_thread, training_stats);
+            }
+        }
     }
 
     return NULL;
@@ -975,31 +1159,6 @@ bool ml_streaming_enabled()
     return Cfg.stream_anomaly_detection_charts;
 }
 
-void ml_init()
-{
-    // Read config values
-    ml_config_load(&Cfg);
-
-    if (!Cfg.enable_anomaly_detection)
-        return;
-
-    // Generate random numbers to efficiently sample the features we need
-    // for KMeans clustering.
-    std::random_device RD;
-    std::mt19937 Gen(RD());
-
-    Cfg.random_nums.reserve(Cfg.max_train_samples);
-    for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
-        Cfg.random_nums.push_back(Gen());
-
-
-    // start detection & training threads
-    char tag[NETDATA_THREAD_TAG_MAX + 1];
-
-    snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
-    netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
-}
-
 void ml_host_new(RRDHOST *rh)
 {
     if (!ml_enabled(rh))
@@ -1009,14 +1168,12 @@ void ml_host_new(RRDHOST *rh)
 
     host->rh = rh;
     host->mls = ml_machine_learning_stats_t();
-    host->ts = ml_training_stats_t();
+    //host->ts = ml_training_stats_t();
 
-    host->host_anomaly_rate = 0.0;
-    host->threads_running = false;
-    host->threads_cancelled = false;
-    host->threads_joined = false;
+    static std::atomic<size_t> times_called(0);
+    host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
 
-    host->training_queue = ml_queue_init();
+    host->host_anomaly_rate = 0.0;
 
     netdata_mutex_init(&host->mutex);
 
@@ -1030,7 +1187,6 @@ void ml_host_delete(RRDHOST *rh)
         return;
 
     netdata_mutex_destroy(&host->mutex);
-    ml_queue_destroy(host->training_queue);
 
     delete host;
     rh->ml_host = NULL;
@@ -1097,69 +1253,6 @@ void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
     error("Fetching KMeans models is not supported yet");
 }
 
-void ml_host_start_training_thread(RRDHOST *rh)
-{
-    if (!rh || !rh->ml_host)
-        return;
-
-    ml_host_t *host = (ml_host_t *) rh->ml_host;
-
-    if (host->threads_running) {
-        error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(host->rh));
-        return;
-    }
-
-    host->threads_running = true;
-    host->threads_cancelled = false;
-    host->threads_joined = false;
-
-    char tag[NETDATA_THREAD_TAG_MAX + 1];
-
-    snprintfz(tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(host->rh));
-    netdata_thread_create(&host->training_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(host));
-}
-
-void ml_host_cancel_training_thread(RRDHOST *rh)
-{
-    if (!rh || !rh->ml_host)
-        return;
-
-    ml_host_t *host = (ml_host_t *) rh->ml_host;
-
-    if (!host->threads_running) {
-        error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(host->rh));
-        return;
-    }
-
-    if (!host->threads_cancelled) {
-        host->threads_cancelled = true;
-
-        // Signal the training queue to stop popping-items
-        ml_queue_signal(host->training_queue);
-        netdata_thread_cancel(host->training_thread);
-    }
-}
-
-void ml_host_stop_training_thread(RRDHOST *rh)
-{
-    if (!rh || !rh->ml_host)
-        return;
-
-    ml_host_cancel_training_thread(rh);
-
-    ml_host_t *host = (ml_host_t *) rh->ml_host;
-
-    if (!host->threads_joined) {
-        host->threads_joined = true;
-        host->threads_running = false;
-
-        delete[] tls_data.training_cns;
-        delete[] tls_data.scratch_training_cns;
-
-        netdata_thread_join(host->training_thread, NULL);
-    }
-}
-
 void ml_chart_new(RRDSET *rs)
 {
     ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
@@ -1225,7 +1318,7 @@ void ml_dimension_new(RRDDIM *rd)
 
     dim->last_training_time = 0;
 
-    ml_kmeans_init(&dim->kmeans, 2, 1000);
+    ml_kmeans_init(&dim->kmeans);
 
     if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset)))
         dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART;
@@ -1264,3 +1357,216 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
 
     return is_anomalous;
 }
+
+static void *ml_train_main(void *arg) {
+    ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
+
+    char worker_name[1024];
+    snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
+    worker_register("MLTRAIN");
+
+    worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
+    worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
+    worker_register_job_name(WORKER_TRAIN_QUERY, "query");
+    worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
+    worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
+    worker_register_job_name(WORKER_TRAIN_LOAD_MODELS, "load models");
+    worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
+    worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
+
+    while (!Cfg.training_stop) {
+        worker_is_busy(WORKER_TRAIN_QUEUE_POP);
+
+        ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
+
+        // we know this thread has been cancelled, when the queue starts
+        // returning "null" requests without blocking on queue's pop().
+        if (training_req.host_id == NULL)
+            break;
+
+        size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
+
+        usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size;
+        if (allotted_ut > USEC_PER_SEC)
+            allotted_ut = USEC_PER_SEC;
+
+        usec_t start_ut = now_monotonic_usec();
+
+        enum ml_training_result training_res;
+        {
+            worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
+            ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
+                training_req.host_id,
+                training_req.chart_id,
+                training_req.dimension_id);
+
+            training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
+
+            string_freez(training_req.host_id);
+            string_freez(training_req.chart_id);
+            string_freez(training_req.dimension_id);
+
+            worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION);
+            ml_acquired_dimension_release(acq_dim);
+        }
+
+        usec_t consumed_ut = now_monotonic_usec() - start_ut;
+
+        usec_t remaining_ut = 0;
+        if (consumed_ut < allotted_ut)
+            remaining_ut = allotted_ut - consumed_ut;
+
+        if (Cfg.enable_statistics_charts) {
+            worker_is_busy(WORKER_TRAIN_UPDATE_HOST);
+
+            netdata_mutex_lock(&training_thread->nd_mutex);
+
+            training_thread->training_stats.queue_size += queue_size;
+            training_thread->training_stats.num_popped_items += 1;
+
+            training_thread->training_stats.allotted_ut += allotted_ut;
+            training_thread->training_stats.consumed_ut += consumed_ut;
+            training_thread->training_stats.remaining_ut += remaining_ut;
+
+            switch (training_res) {
+                case TRAINING_RESULT_OK:
+                    training_thread->training_stats.training_result_ok += 1;
+                    break;
+                case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+                    training_thread->training_stats.training_result_invalid_query_time_range += 1;
+                    break;
+                case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+                    training_thread->training_stats.training_result_not_enough_collected_values += 1;
+                    break;
+                case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+                    training_thread->training_stats.training_result_null_acquired_dimension += 1;
+                    break;
+                case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+                    training_thread->training_stats.training_result_chart_under_replication += 1;
+                    break;
+            }
+
+            netdata_mutex_unlock(&training_thread->nd_mutex);
+        }
+
+        worker_is_idle();
+        std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
+    }
+
+    return NULL;
+}
+
+void ml_init()
+{
+    // Read config values
+    ml_config_load(&Cfg);
+
+    if (!Cfg.enable_anomaly_detection)
+        return;
+
+    // Generate random numbers to efficiently sample the features we need
+    // for KMeans clustering.
+    std::random_device RD;
+    std::mt19937 Gen(RD());
+
+    Cfg.random_nums.reserve(Cfg.max_train_samples);
+    for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
+        Cfg.random_nums.push_back(Gen());
+
+    // init training thread-specific data
+    Cfg.training_threads.resize(Cfg.num_training_threads);
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+        size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
+        training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training]();
+        training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
+
+        training_thread->id = idx;
+        training_thread->training_queue = ml_queue_init();
+        netdata_mutex_init(&training_thread->nd_mutex);
+    }
+
+    // open sqlite db
+    char path[FILENAME_MAX];
+    snprintfz(path, FILENAME_MAX - 1, "%s/%s", netdata_configured_cache_dir, "ml.db");
+    int rc = sqlite3_open(path, &db);
+    if (rc != SQLITE_OK) {
+        error_report("Failed to initialize database at %s, due to \"%s\"", path, sqlite3_errstr(rc));
+        sqlite3_close(db);
+        db = NULL;
+    }
+
+    if (db) {
+        char *err = NULL;
+        int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
+        if (rc != SQLITE_OK) {
+            error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
+            sqlite3_close(db);
+            db = NULL;
+        }
+    }
+}
+
+void ml_fini() {
+    int rc = sqlite3_close_v2(db);
+    if (unlikely(rc != SQLITE_OK))
+        error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
+}
+
+void ml_start_threads() {
+    // start detection & training threads
+    Cfg.detection_stop = false;
+    Cfg.training_stop = false;
+
+    char tag[NETDATA_THREAD_TAG_MAX + 1];
+
+    snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
+    netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
+
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+        snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
+        netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
+    }
+}
+
+void ml_stop_threads()
+{
+    Cfg.detection_stop = true;
+    Cfg.training_stop = true;
+
+    netdata_thread_cancel(Cfg.detection_thread);
+    netdata_thread_join(Cfg.detection_thread, NULL);
+
+    // signal the training queue of each thread
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+        ml_queue_signal(training_thread->training_queue);
+    }
+
+    // cancel training threads
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+        netdata_thread_cancel(training_thread->nd_thread);
+    }
+
+    // join training threads
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+        netdata_thread_join(training_thread->nd_thread, NULL);
+    }
+
+    // clear training thread data
+    for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+        ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+        delete[] training_thread->training_cns;
+        delete[] training_thread->scratch_training_cns;
+        ml_queue_destroy(training_thread->training_queue);
+        netdata_mutex_destroy(&training_thread->nd_mutex);
+    }
+}

Some files were not shown because too many files changed in this diff