Host.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include <dlib/statistics.h>
  3. #include "Config.h"
  4. #include "Host.h"
  5. #include "json/single_include/nlohmann/json.hpp"
  6. using namespace ml;
  7. static void updateDimensionsChart(RRDHOST *RH,
  8. collected_number NumTrainedDimensions,
  9. collected_number NumNormalDimensions,
  10. collected_number NumAnomalousDimensions) {
  11. static thread_local RRDSET *RS = nullptr;
  12. static thread_local RRDDIM *NumTotalDimensionsRD = nullptr;
  13. static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr;
  14. static thread_local RRDDIM *NumNormalDimensionsRD = nullptr;
  15. static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
  16. if (!RS) {
  17. std::stringstream IdSS, NameSS;
  18. IdSS << "dimensions_on_" << localhost->machine_guid;
  19. NameSS << "dimensions_on_" << localhost->hostname;
  20. RS = rrdset_create(
  21. RH,
  22. "anomaly_detection", // type
  23. IdSS.str().c_str(), // id
  24. NameSS.str().c_str(), // name
  25. "dimensions", // family
  26. "anomaly_detection.dimensions", // ctx
  27. "Anomaly detection dimensions", // title
  28. "dimensions", // units
  29. "netdata", // plugin
  30. "ml", // module
  31. 39183, // priority
  32. RH->rrd_update_every, // update_every
  33. RRDSET_TYPE_LINE // chart_type
  34. );
  35. rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
  36. NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
  37. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  38. NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL,
  39. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  40. NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL,
  41. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  42. NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL,
  43. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  44. } else
  45. rrdset_next(RS);
  46. rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions);
  47. rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions);
  48. rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions);
  49. rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions);
  50. rrdset_done(RS);
  51. }
  52. static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
  53. static thread_local RRDSET *RS = nullptr;
  54. static thread_local RRDDIM *AnomalyRateRD = nullptr;
  55. if (!RS) {
  56. std::stringstream IdSS, NameSS;
  57. IdSS << "anomaly_rate_on_" << localhost->machine_guid;
  58. NameSS << "anomaly_rate_on_" << localhost->hostname;
  59. RS = rrdset_create(
  60. RH,
  61. "anomaly_detection", // type
  62. IdSS.str().c_str(), // id
  63. NameSS.str().c_str(), // name
  64. "anomaly_rate", // family
  65. "anomaly_detection.anomaly_rate", // ctx
  66. "Percentage of anomalous dimensions", // title
  67. "percentage", // units
  68. "netdata", // plugin
  69. "ml", // module
  70. 39184, // priority
  71. RH->rrd_update_every, // update_every
  72. RRDSET_TYPE_LINE // chart_type
  73. );
  74. rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
  75. AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL,
  76. 1, 100, RRD_ALGORITHM_ABSOLUTE);
  77. } else
  78. rrdset_next(RS);
  79. rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate);
  80. rrdset_done(RS);
  81. }
  82. static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) {
  83. static thread_local RRDSET *RS = nullptr;
  84. static thread_local RRDDIM *WindowLengthRD = nullptr;
  85. if (!RS) {
  86. std::stringstream IdSS, NameSS;
  87. IdSS << "detector_window_on_" << localhost->machine_guid;
  88. NameSS << "detector_window_on_" << localhost->hostname;
  89. RS = rrdset_create(
  90. RH,
  91. "anomaly_detection", // type
  92. IdSS.str().c_str(), // id
  93. NameSS.str().c_str(), // name
  94. "detector_window", // family
  95. "anomaly_detection.detector_window", // ctx
  96. "Anomaly detector window length", // title
  97. "seconds", // units
  98. "netdata", // plugin
  99. "ml", // module
  100. 39185, // priority
  101. RH->rrd_update_every, // update_every
  102. RRDSET_TYPE_LINE // chart_type
  103. );
  104. rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
  105. WindowLengthRD = rrddim_add(RS, "duration", NULL,
  106. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  107. } else
  108. rrdset_next(RS);
  109. rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every);
  110. rrdset_done(RS);
  111. }
  112. static void updateEventsChart(RRDHOST *RH,
  113. std::pair<BitRateWindow::Edge, size_t> P,
  114. bool ResetBitCounter,
  115. bool NewAnomalyEvent) {
  116. static thread_local RRDSET *RS = nullptr;
  117. static thread_local RRDDIM *AboveThresholdRD = nullptr;
  118. static thread_local RRDDIM *ResetBitCounterRD = nullptr;
  119. static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
  120. if (!RS) {
  121. std::stringstream IdSS, NameSS;
  122. IdSS << "detector_events_on_" << localhost->machine_guid;
  123. NameSS << "detector_events_on_" << localhost->hostname;
  124. RS = rrdset_create(
  125. RH,
  126. "anomaly_detection", // type
  127. IdSS.str().c_str(), // id
  128. NameSS.str().c_str(), // name
  129. "detector_events", // family
  130. "anomaly_detection.detector_events", // ctx
  131. "Anomaly events triggered", // title
  132. "boolean", // units
  133. "netdata", // plugin
  134. "ml", // module
  135. 39186, // priority
  136. RH->rrd_update_every, // update_every
  137. RRDSET_TYPE_LINE // chart_type
  138. );
  139. rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
  140. AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL,
  141. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  142. ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL,
  143. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  144. NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL,
  145. 1, 1, RRD_ALGORITHM_ABSOLUTE);
  146. } else
  147. rrdset_next(RS);
  148. BitRateWindow::Edge E = P.first;
  149. bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold;
  150. rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold);
  151. rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter);
  152. rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent);
  153. rrdset_done(RS);
  154. }
  155. static void updateDetectionChart(RRDHOST *RH) {
  156. static thread_local RRDSET *RS = nullptr;
  157. static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
  158. if (!RS) {
  159. std::stringstream IdSS, NameSS;
  160. IdSS << "prediction_stats_" << RH->machine_guid;
  161. NameSS << "prediction_stats_for_" << RH->hostname;
  162. RS = rrdset_create_localhost(
  163. "netdata", // type
  164. IdSS.str().c_str(), // id
  165. NameSS.str().c_str(), // name
  166. "ml", // family
  167. "netdata.prediction_stats", // ctx
  168. "Prediction thread CPU usage", // title
  169. "milliseconds/s", // units
  170. "netdata", // plugin
  171. "ml", // module
  172. 136000, // priority
  173. RH->rrd_update_every, // update_every
  174. RRDSET_TYPE_STACKED // chart_type
  175. );
  176. UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  177. SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  178. } else
  179. rrdset_next(RS);
  180. struct rusage TRU;
  181. getrusage(RUSAGE_THREAD, &TRU);
  182. rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
  183. rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
  184. rrdset_done(RS);
  185. }
  186. static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU)
  187. {
  188. static thread_local RRDSET *RS = nullptr;
  189. static thread_local RRDDIM *UserRD = nullptr;
  190. static thread_local RRDDIM *SystemRD = nullptr;
  191. if (!RS) {
  192. std::stringstream IdSS, NameSS;
  193. IdSS << "training_stats_" << RH->machine_guid;
  194. NameSS << "training_stats_for_" << RH->hostname;
  195. RS = rrdset_create_localhost(
  196. "netdata", // type
  197. IdSS.str().c_str(), // id
  198. NameSS.str().c_str(), // name
  199. "ml", // family
  200. "netdata.training_stats", // ctx
  201. "Training thread CPU usage", // title
  202. "milliseconds/s", // units
  203. "netdata", // plugin
  204. "ml", // module
  205. 136001, // priority
  206. RH->rrd_update_every, // update_every
  207. RRDSET_TYPE_STACKED // chart_type
  208. );
  209. UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  210. SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
  211. } else
  212. rrdset_next(RS);
  213. rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
  214. rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
  215. rrdset_done(RS);
  216. }
  217. void RrdHost::addDimension(Dimension *D) {
  218. RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL,
  219. 1, 1000, RRD_ALGORITHM_ABSOLUTE);
  220. D->setAnomalyRateRD(AnomalyRateRD);
  221. {
  222. std::lock_guard<std::mutex> Lock(Mutex);
  223. DimensionsMap[D->getRD()] = D;
  224. // Default construct mutex for dimension
  225. LocksMap[D];
  226. }
  227. }
  228. void RrdHost::removeDimension(Dimension *D) {
  229. // Remove the dimension from the hosts map.
  230. {
  231. std::lock_guard<std::mutex> Lock(Mutex);
  232. DimensionsMap.erase(D->getRD());
  233. }
  234. // Delete the dimension by locking the mutex that protects it.
  235. {
  236. std::lock_guard<std::mutex> Lock(LocksMap[D]);
  237. delete D;
  238. }
  239. // Remove the lock entry for the deleted dimension.
  240. {
  241. std::lock_guard<std::mutex> Lock(Mutex);
  242. LocksMap.erase(D);
  243. }
  244. }
  245. void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
  246. Json["version"] = 1;
  247. Json["enabled"] = Cfg.EnableAnomalyDetection;
  248. Json["min-train-samples"] = Cfg.MinTrainSamples;
  249. Json["max-train-samples"] = Cfg.MaxTrainSamples;
  250. Json["train-every"] = Cfg.TrainEvery;
  251. Json["diff-n"] = Cfg.DiffN;
  252. Json["smooth-n"] = Cfg.SmoothN;
  253. Json["lag-n"] = Cfg.LagN;
  254. Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
  255. Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
  256. Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
  257. Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
  258. Json["min-window-size"] = Cfg.ADMinWindowSize;
  259. Json["max-window-size"] = Cfg.ADMaxWindowSize;
  260. Json["idle-window-size"] = Cfg.ADIdleWindowSize;
  261. Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold;
  262. Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold;
  263. Json["hosts-to-skip"] = Cfg.HostsToSkip;
  264. Json["charts-to-skip"] = Cfg.ChartsToSkip;
  265. }
  266. std::pair<Dimension *, Duration<double>>
  267. TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
  268. std::lock_guard<std::mutex> Lock(Mutex);
  269. Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
  270. for (auto &DP : DimensionsMap) {
  271. Dimension *D = DP.second;
  272. if (D->shouldTrain(NowTP)) {
  273. LocksMap[D].lock();
  274. return { D, AllottedDuration };
  275. }
  276. }
  277. return { nullptr, AllottedDuration };
  278. }
  279. void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
  280. if (D == nullptr)
  281. return;
  282. D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
  283. D->trainModel();
  284. {
  285. std::lock_guard<std::mutex> Lock(Mutex);
  286. LocksMap[D].unlock();
  287. }
  288. }
  289. void TrainableHost::train() {
  290. Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
  291. worker_register("MLTRAIN");
  292. worker_register_job_name(0, "dimensions");
  293. worker_is_busy(0);
  294. while (!netdata_exit) {
  295. netdata_thread_testcancel();
  296. netdata_thread_disable_cancelability();
  297. updateResourceUsage();
  298. TimePoint NowTP = SteadyClock::now();
  299. auto P = findDimensionToTrain(NowTP);
  300. trainDimension(P.first, NowTP);
  301. netdata_thread_enable_cancelability();
  302. Duration<double> AllottedDuration = P.second;
  303. Duration<double> RealDuration = SteadyClock::now() - NowTP;
  304. Duration<double> SleepFor;
  305. if (RealDuration >= AllottedDuration)
  306. continue;
  307. worker_is_idle();
  308. SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
  309. std::this_thread::sleep_for(SleepFor);
  310. worker_is_busy(0);
  311. }
  312. }
  313. #define WORKER_JOB_DETECT_DIMENSION 0
  314. #define WORKER_JOB_UPDATE_DETECTION_CHART 1
  315. #define WORKER_JOB_UPDATE_ANOMALY_RATES 2
  316. #define WORKER_JOB_UPDATE_CHARTS 3
  317. #define WORKER_JOB_SAVE_ANOMALY_EVENT 4
  318. #if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
  319. #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
  320. #endif
  321. void DetectableHost::detectOnce() {
  322. auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold);
  323. BitRateWindow::Edge Edge = P.first;
  324. size_t WindowLength = P.second;
  325. bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold);
  326. bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) &&
  327. (Edge.second == BitRateWindow::State::Idle);
  328. std::vector<std::pair<double, std::string>> DimsOverThreshold;
  329. size_t NumAnomalousDimensions = 0;
  330. size_t NumNormalDimensions = 0;
  331. size_t NumTrainedDimensions = 0;
  332. size_t NumActiveDimensions = 0;
  333. bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
  334. if (CollectAnomalyRates)
  335. rrdset_next(AnomalyRateRS);
  336. {
  337. std::lock_guard<std::mutex> Lock(Mutex);
  338. DimsOverThreshold.reserve(DimensionsMap.size());
  339. for (auto &DP : DimensionsMap) {
  340. worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
  341. Dimension *D = DP.second;
  342. if (!D->isActive()) {
  343. D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, false);
  344. continue;
  345. }
  346. NumActiveDimensions++;
  347. auto P = D->detect(WindowLength, ResetBitCounter);
  348. bool IsAnomalous = P.first;
  349. double AnomalyScore = P.second;
  350. NumTrainedDimensions += D->isTrained();
  351. if (IsAnomalous)
  352. NumAnomalousDimensions += 1;
  353. if (NewAnomalyEvent && (AnomalyScore >= Cfg.ADDimensionRateThreshold))
  354. DimsOverThreshold.push_back({ AnomalyScore, D->getID() });
  355. D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, IsAnomalous);
  356. }
  357. if (NumAnomalousDimensions)
  358. WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
  359. else
  360. WindowAnomalyRate = 0.0;
  361. NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
  362. }
  363. if (CollectAnomalyRates) {
  364. worker_is_busy(WORKER_JOB_UPDATE_ANOMALY_RATES);
  365. AnomalyRateTimer = 0;
  366. rrdset_done(AnomalyRateRS);
  367. }
  368. this->NumAnomalousDimensions = NumAnomalousDimensions;
  369. this->NumNormalDimensions = NumNormalDimensions;
  370. this->NumTrainedDimensions = NumTrainedDimensions;
  371. this->NumActiveDimensions = NumActiveDimensions;
  372. worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
  373. updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
  374. updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
  375. updateWindowLengthChart(getRH(), WindowLength);
  376. updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
  377. struct rusage TRU;
  378. getResourceUsage(&TRU);
  379. updateTrainingChart(getRH(), &TRU);
  380. if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
  381. return;
  382. worker_is_busy(WORKER_JOB_SAVE_ANOMALY_EVENT);
  383. std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end());
  384. std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end());
  385. // Make sure the JSON response won't grow beyond a specific number
  386. // of dimensions. Log an error message if this happens, because it
  387. // most likely means that the user specified a very-low anomaly rate
  388. // threshold.
  389. size_t NumMaxDimsOverThreshold = 2000;
  390. if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) {
  391. error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.",
  392. DimsOverThreshold.size(), NumMaxDimsOverThreshold);
  393. DimsOverThreshold.resize(NumMaxDimsOverThreshold);
  394. }
  395. nlohmann::json JsonResult = DimsOverThreshold;
  396. time_t Before = now_realtime_sec();
  397. time_t After = Before - (WindowLength * updateEvery());
  398. DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4));
  399. }
  400. void DetectableHost::detect() {
  401. worker_register("MLDETECT");
  402. worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions");
  403. worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
  404. worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
  405. worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
  406. worker_register_job_name(WORKER_JOB_SAVE_ANOMALY_EVENT, "anomaly event");
  407. std::this_thread::sleep_for(Seconds{10});
  408. heartbeat_t HB;
  409. heartbeat_init(&HB);
  410. while (!netdata_exit) {
  411. netdata_thread_testcancel();
  412. worker_is_idle();
  413. heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
  414. netdata_thread_disable_cancelability();
  415. detectOnce();
  416. worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART);
  417. updateDetectionChart(getRH());
  418. netdata_thread_enable_cancelability();
  419. }
  420. }
  421. void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
  422. Json["version"] = 1;
  423. Json["anomalous-dimensions"] = NumAnomalousDimensions;
  424. Json["normal-dimensions"] = NumNormalDimensions;
  425. Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
  426. Json["trained-dimensions"] = NumTrainedDimensions;
  427. }
  428. void DetectableHost::startAnomalyDetectionThreads() {
  429. TrainingThread = std::thread(&TrainableHost::train, this);
  430. DetectionThread = std::thread(&DetectableHost::detect, this);
  431. }
  432. void DetectableHost::stopAnomalyDetectionThreads() {
  433. netdata_thread_cancel(TrainingThread.native_handle());
  434. netdata_thread_cancel(DetectionThread.native_handle());
  435. TrainingThread.join();
  436. DetectionThread.join();
  437. }