Browse Source

cancel ml threads on shutdown and join them on host free (#14240)

* cancel ml threads on shutdown and join them on host free

* mark them not running before joining them
Costa Tsaousis 2 years ago
parent
commit
b00b62f0e7
4 changed files with 36 additions and 15 deletions
  1. 20 12
      ml/Host.cc
  2. 7 2
      ml/Host.h
  3. 8 1
      ml/ml.cc
  4. 1 0
      ml/ml.h

+ 20 - 12
ml/Host.cc

@@ -213,7 +213,7 @@ void Host::train() {
     worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
     worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
 
-    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
 
     while (service_running(SERVICE_ML_TRAINING)) {
         auto P = TrainingQueue.pop();
@@ -293,7 +293,7 @@ void Host::detect() {
     worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
     worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
 
-    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+    service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
 
     heartbeat_t HB;
     heartbeat_init(&HB);
@@ -332,29 +332,37 @@ void Host::startAnomalyDetectionThreads() {
     }
 
     ThreadsRunning = true;
+    ThreadsCancelled = false;
+    ThreadsJoined = false;
 
     char Tag[NETDATA_THREAD_TAG_MAX + 1];
 
     snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH));
-    netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
+    netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
 
     snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH));
-    netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
+    netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
 }
 
-void Host::stopAnomalyDetectionThreads() {
+void Host::stopAnomalyDetectionThreads(bool join) {
     if (!ThreadsRunning) {
         error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
         return;
     }
 
-    ThreadsRunning = false;
+    if(!ThreadsCancelled) {
+        ThreadsCancelled = true;
 
-    // Signal the training queue to stop popping-items
-    TrainingQueue.signal();
-    netdata_thread_cancel(TrainingThread);
-    // netdata_thread_join(TrainingThread, nullptr);
+        // Signal the training queue to stop popping-items
+        TrainingQueue.signal();
+        netdata_thread_cancel(TrainingThread);
+        netdata_thread_cancel(DetectionThread);
+    }
 
-    netdata_thread_cancel(DetectionThread);
-    // netdata_thread_join(DetectionThread, nullptr);
+    if(join && !ThreadsJoined) {
+        ThreadsJoined = true;
+        ThreadsRunning = false;
+        netdata_thread_join(TrainingThread, nullptr);
+        netdata_thread_join(DetectionThread, nullptr);
+    }
 }

+ 7 - 2
ml/Host.h

@@ -26,7 +26,10 @@ public:
         MLS(),
         TS(),
         HostAnomalyRate(0.0),
-        ThreadsRunning(false) {}
+        ThreadsRunning(false),
+        ThreadsCancelled(false),
+        ThreadsJoined(false)
+        {}
 
     void addChart(Chart *C);
     void removeChart(Chart *C);
@@ -36,7 +39,7 @@ public:
     void getDetectionInfoAsJson(nlohmann::json &Json) const;
 
     void startAnomalyDetectionThreads();
-    void stopAnomalyDetectionThreads();
+    void stopAnomalyDetectionThreads(bool join);
 
     void scheduleForTraining(TrainingRequest TR);
     void train();
@@ -50,6 +53,8 @@ private:
     TrainingStats TS;
     CalculatedNumber HostAnomalyRate{0.0};
     std::atomic<bool> ThreadsRunning;
+    std::atomic<bool> ThreadsCancelled;
+    std::atomic<bool> ThreadsJoined;
 
     Queue<TrainingRequest> TrainingQueue;
 

+ 8 - 1
ml/ml.cc

@@ -156,7 +156,14 @@ void ml_start_anomaly_detection_threads(RRDHOST *RH) {
 void ml_stop_anomaly_detection_threads(RRDHOST *RH) {
     if (RH && RH->ml_host) {
         Host *H = reinterpret_cast<Host *>(RH->ml_host);
-        H->stopAnomalyDetectionThreads();
+        H->stopAnomalyDetectionThreads(true);
+    }
+}
+
+void ml_cancel_anomaly_detection_threads(RRDHOST *RH) {
+    if (RH && RH->ml_host) {
+        Host *H = reinterpret_cast<Host *>(RH->ml_host);
+        H->stopAnomalyDetectionThreads(false);
     }
 }
 

+ 1 - 0
ml/ml.h

@@ -31,6 +31,7 @@ void ml_dimension_delete(RRDDIM *RD);
 
 void ml_start_anomaly_detection_threads(RRDHOST *RH);
 void ml_stop_anomaly_detection_threads(RRDHOST *RH);
+void ml_cancel_anomaly_detection_threads(RRDHOST *RH);
 
 char *ml_get_host_info(RRDHOST *RH);
 char *ml_get_host_runtime_info(RRDHOST *RH);