ml.cc 58 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include <dlib/clustering.h>
  3. #include "ml-private.h"
  4. #include <random>
  5. #include "ad_charts.h"
  6. #include "database/sqlite/sqlite3.h"
  7. #define ML_METADATA_VERSION 2
  8. #define WORKER_TRAIN_QUEUE_POP 0
  9. #define WORKER_TRAIN_ACQUIRE_DIMENSION 1
  10. #define WORKER_TRAIN_QUERY 2
  11. #define WORKER_TRAIN_KMEANS 3
  12. #define WORKER_TRAIN_UPDATE_MODELS 4
  13. #define WORKER_TRAIN_RELEASE_DIMENSION 5
  14. #define WORKER_TRAIN_UPDATE_HOST 6
  15. #define WORKER_TRAIN_FLUSH_MODELS 7
  16. static sqlite3 *db = NULL;
  17. static netdata_mutex_t db_mutex = NETDATA_MUTEX_INITIALIZER;
  18. /*
  19. * Functions to convert enums to strings
  20. */
  21. __attribute__((unused)) static const char *
  22. ml_machine_learning_status_to_string(enum ml_machine_learning_status mls)
  23. {
  24. switch (mls) {
  25. case MACHINE_LEARNING_STATUS_ENABLED:
  26. return "enabled";
  27. case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
  28. return "disabled-sp";
  29. default:
  30. return "unknown";
  31. }
  32. }
  33. __attribute__((unused)) static const char *
  34. ml_metric_type_to_string(enum ml_metric_type mt)
  35. {
  36. switch (mt) {
  37. case METRIC_TYPE_CONSTANT:
  38. return "constant";
  39. case METRIC_TYPE_VARIABLE:
  40. return "variable";
  41. default:
  42. return "unknown";
  43. }
  44. }
  45. __attribute__((unused)) static const char *
  46. ml_training_status_to_string(enum ml_training_status ts)
  47. {
  48. switch (ts) {
  49. case TRAINING_STATUS_PENDING_WITH_MODEL:
  50. return "pending-with-model";
  51. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  52. return "pending-without-model";
  53. case TRAINING_STATUS_TRAINED:
  54. return "trained";
  55. case TRAINING_STATUS_UNTRAINED:
  56. return "untrained";
  57. case TRAINING_STATUS_SILENCED:
  58. return "silenced";
  59. default:
  60. return "unknown";
  61. }
  62. }
  63. __attribute__((unused)) static const char *
  64. ml_training_result_to_string(enum ml_training_result tr)
  65. {
  66. switch (tr) {
  67. case TRAINING_RESULT_OK:
  68. return "ok";
  69. case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
  70. return "invalid-query";
  71. case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
  72. return "missing-values";
  73. case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
  74. return "null-acquired-dim";
  75. case TRAINING_RESULT_CHART_UNDER_REPLICATION:
  76. return "chart-under-replication";
  77. default:
  78. return "unknown";
  79. }
  80. }
  81. /*
  82. * Features
  83. */
  84. // subtract elements that are `diff_n` positions apart
  85. static void
  86. ml_features_diff(ml_features_t *features)
  87. {
  88. if (features->diff_n == 0)
  89. return;
  90. for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) {
  91. size_t high = (features->src_n - 1) - idx;
  92. size_t low = high - features->diff_n;
  93. features->dst[low] = features->src[high] - features->src[low];
  94. }
  95. size_t n = features->src_n - features->diff_n;
  96. memcpy(features->src, features->dst, n * sizeof(calculated_number_t));
  97. for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++)
  98. features->src[idx] = 0.0;
  99. }
  100. // a function that computes the window average of an array inplace
  101. static void
  102. ml_features_smooth(ml_features_t *features)
  103. {
  104. calculated_number_t sum = 0.0;
  105. size_t idx = 0;
  106. for (; idx != features->smooth_n - 1; idx++)
  107. sum += features->src[idx];
  108. for (; idx != (features->src_n - features->diff_n); idx++) {
  109. sum += features->src[idx];
  110. calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)];
  111. features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n;
  112. sum -= prev_cn;
  113. }
  114. for (idx = 0; idx != features->smooth_n; idx++)
  115. features->src[(features->src_n - 1) - idx] = 0.0;
  116. }
  117. // create lag'd vectors out of the preprocessed buffer
  118. static void
  119. ml_features_lag(ml_features_t *features)
  120. {
  121. size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n;
  122. features->preprocessed_features.resize(n);
  123. unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio;
  124. double sampling_ratio = std::min(static_cast<double>(target_num_samples) / n, 1.0);
  125. uint32_t max_mt = std::numeric_limits<uint32_t>::max();
  126. uint32_t cutoff = static_cast<double>(max_mt) * sampling_ratio;
  127. size_t sample_idx = 0;
  128. for (size_t idx = 0; idx != n; idx++) {
  129. DSample &DS = features->preprocessed_features[sample_idx++];
  130. DS.set_size(features->lag_n);
  131. if (Cfg.random_nums[idx] > cutoff) {
  132. sample_idx--;
  133. continue;
  134. }
  135. for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++)
  136. DS(feature_idx) = features->src[idx + feature_idx];
  137. }
  138. features->preprocessed_features.resize(sample_idx);
  139. }
  140. static void
  141. ml_features_preprocess(ml_features_t *features)
  142. {
  143. ml_features_diff(features);
  144. ml_features_smooth(features);
  145. ml_features_lag(features);
  146. }
  147. /*
  148. * KMeans
  149. */
  150. static void
  151. ml_kmeans_init(ml_kmeans_t *kmeans)
  152. {
  153. kmeans->cluster_centers.reserve(2);
  154. kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
  155. kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
  156. }
  157. static void
  158. ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
  159. {
  160. kmeans->after = (uint32_t) after;
  161. kmeans->before = (uint32_t) before;
  162. kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
  163. kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
  164. kmeans->cluster_centers.clear();
  165. dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
  166. dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
  167. for (const auto &preprocessed_feature : features->preprocessed_features) {
  168. calculated_number_t mean_dist = 0.0;
  169. for (const auto &cluster_center : kmeans->cluster_centers) {
  170. mean_dist += dlib::length(cluster_center - preprocessed_feature);
  171. }
  172. mean_dist /= kmeans->cluster_centers.size();
  173. if (mean_dist < kmeans->min_dist)
  174. kmeans->min_dist = mean_dist;
  175. if (mean_dist > kmeans->max_dist)
  176. kmeans->max_dist = mean_dist;
  177. }
  178. }
  179. static calculated_number_t
  180. ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
  181. {
  182. calculated_number_t mean_dist = 0.0;
  183. for (const auto &CC: kmeans->cluster_centers)
  184. mean_dist += dlib::length(CC - DS);
  185. mean_dist /= kmeans->cluster_centers.size();
  186. if (kmeans->max_dist == kmeans->min_dist)
  187. return 0.0;
  188. calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist));
  189. return (anomaly_score > 100.0) ? 100.0 : anomaly_score;
  190. }
  191. /*
  192. * Queue
  193. */
  194. static ml_queue_t *
  195. ml_queue_init()
  196. {
  197. ml_queue_t *q = new ml_queue_t();
  198. netdata_mutex_init(&q->mutex);
  199. pthread_cond_init(&q->cond_var, NULL);
  200. q->exit = false;
  201. return q;
  202. }
  203. static void
  204. ml_queue_destroy(ml_queue_t *q)
  205. {
  206. netdata_mutex_destroy(&q->mutex);
  207. pthread_cond_destroy(&q->cond_var);
  208. delete q;
  209. }
  210. static void
  211. ml_queue_push(ml_queue_t *q, const ml_training_request_t req)
  212. {
  213. netdata_mutex_lock(&q->mutex);
  214. q->internal.push(req);
  215. pthread_cond_signal(&q->cond_var);
  216. netdata_mutex_unlock(&q->mutex);
  217. }
  218. static ml_training_request_t
  219. ml_queue_pop(ml_queue_t *q)
  220. {
  221. netdata_mutex_lock(&q->mutex);
  222. ml_training_request_t req = {
  223. {'\0'}, // machine_guid
  224. NULL, // chart id
  225. NULL, // dimension id
  226. 0, // current time
  227. 0, // first entry
  228. 0 // last entry
  229. };
  230. while (q->internal.empty()) {
  231. pthread_cond_wait(&q->cond_var, &q->mutex);
  232. if (q->exit) {
  233. netdata_mutex_unlock(&q->mutex);
  234. // We return a dummy request because the queue has been signaled
  235. return req;
  236. }
  237. }
  238. req = q->internal.front();
  239. q->internal.pop();
  240. netdata_mutex_unlock(&q->mutex);
  241. return req;
  242. }
  243. static size_t
  244. ml_queue_size(ml_queue_t *q)
  245. {
  246. netdata_mutex_lock(&q->mutex);
  247. size_t size = q->internal.size();
  248. netdata_mutex_unlock(&q->mutex);
  249. return size;
  250. }
  251. static void
  252. ml_queue_signal(ml_queue_t *q)
  253. {
  254. netdata_mutex_lock(&q->mutex);
  255. q->exit = true;
  256. pthread_cond_signal(&q->cond_var);
  257. netdata_mutex_unlock(&q->mutex);
  258. }
  259. /*
  260. * Dimension
  261. */
  262. static std::pair<calculated_number_t *, ml_training_response_t>
  263. ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
  264. {
  265. ml_training_response_t training_response = {};
  266. training_response.request_time = training_request.request_time;
  267. training_response.first_entry_on_request = training_request.first_entry_on_request;
  268. training_response.last_entry_on_request = training_request.last_entry_on_request;
  269. training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0);
  270. training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0);
  271. size_t min_n = Cfg.min_train_samples;
  272. size_t max_n = Cfg.max_train_samples;
  273. // Figure out what our time window should be.
  274. training_response.query_before_t = training_response.last_entry_on_response;
  275. training_response.query_after_t = std::max(
  276. training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->rrdset->update_every),
  277. training_response.first_entry_on_response
  278. );
  279. if (training_response.query_after_t >= training_response.query_before_t) {
  280. training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE;
  281. return { NULL, training_response };
  282. }
  283. if (rrdset_is_replicating(dim->rd->rrdset)) {
  284. training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION;
  285. return { NULL, training_response };
  286. }
  287. /*
  288. * Execute the query
  289. */
  290. struct storage_engine_query_handle handle;
  291. storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle,
  292. training_response.query_after_t, training_response.query_before_t,
  293. STORAGE_PRIORITY_BEST_EFFORT);
  294. size_t idx = 0;
  295. memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
  296. calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
  297. while (!storage_engine_query_is_finished(&handle)) {
  298. if (idx == max_n)
  299. break;
  300. STORAGE_POINT sp = storage_engine_query_next_metric(&handle);
  301. time_t timestamp = sp.end_time_s;
  302. calculated_number_t value = sp.sum / sp.count;
  303. if (netdata_double_isnumber(value)) {
  304. if (!training_response.db_after_t)
  305. training_response.db_after_t = timestamp;
  306. training_response.db_before_t = timestamp;
  307. training_thread->training_cns[idx] = value;
  308. last_value = training_thread->training_cns[idx];
  309. training_response.collected_values++;
  310. } else
  311. training_thread->training_cns[idx] = last_value;
  312. idx++;
  313. }
  314. storage_engine_query_finalize(&handle);
  315. global_statistics_ml_query_completed(/* points_read */ idx);
  316. training_response.total_values = idx;
  317. if (training_response.collected_values < min_n) {
  318. training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES;
  319. return { NULL, training_response };
  320. }
  321. // Find first non-NaN value.
  322. for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
  323. // Overwrite NaN values.
  324. if (idx != 0)
  325. memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
  326. training_response.result = TRAINING_RESULT_OK;
  327. return { training_thread->training_cns, training_response };
  328. }
  329. const char *db_models_create_table =
  330. "CREATE TABLE IF NOT EXISTS models("
  331. " dim_id BLOB, after INT, before INT,"
  332. " min_dist REAL, max_dist REAL,"
  333. " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
  334. " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
  335. " PRIMARY KEY(dim_id, after)"
  336. ");";
  337. const char *db_models_add_model =
  338. "INSERT OR REPLACE INTO models("
  339. " dim_id, after, before,"
  340. " min_dist, max_dist,"
  341. " c00, c01, c02, c03, c04, c05,"
  342. " c10, c11, c12, c13, c14, c15)"
  343. "VALUES("
  344. " @dim_id, @after, @before,"
  345. " @min_dist, @max_dist,"
  346. " @c00, @c01, @c02, @c03, @c04, @c05,"
  347. " @c10, @c11, @c12, @c13, @c14, @c15);";
  348. const char *db_models_load =
  349. "SELECT * FROM models "
  350. "WHERE dim_id = @dim_id AND after >= @after ORDER BY before ASC;";
  351. const char *db_models_delete =
  352. "DELETE FROM models "
  353. "WHERE dim_id = @dim_id AND before < @before;";
  354. const char *db_models_prune =
  355. "DELETE FROM models "
  356. "WHERE after < @after LIMIT @n;";
  357. static int
  358. ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
  359. {
  360. static __thread sqlite3_stmt *res = NULL;
  361. int param = 0;
  362. int rc = 0;
  363. if (unlikely(!db)) {
  364. error_report("Database has not been initialized");
  365. return 1;
  366. }
  367. if (unlikely(!res)) {
  368. rc = prepare_statement(db, db_models_add_model, &res);
  369. if (unlikely(rc != SQLITE_OK)) {
  370. error_report("Failed to prepare statement to store model, rc = %d", rc);
  371. return 1;
  372. }
  373. }
  374. rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
  375. if (unlikely(rc != SQLITE_OK))
  376. goto bind_fail;
  377. rc = sqlite3_bind_int(res, ++param, (int) km->after);
  378. if (unlikely(rc != SQLITE_OK))
  379. goto bind_fail;
  380. rc = sqlite3_bind_int(res, ++param, (int) km->before);
  381. if (unlikely(rc != SQLITE_OK))
  382. goto bind_fail;
  383. rc = sqlite3_bind_double(res, ++param, km->min_dist);
  384. if (unlikely(rc != SQLITE_OK))
  385. goto bind_fail;
  386. rc = sqlite3_bind_double(res, ++param, km->max_dist);
  387. if (unlikely(rc != SQLITE_OK))
  388. goto bind_fail;
  389. if (km->cluster_centers.size() != 2)
  390. fatal("Expected 2 cluster centers, got %zu", km->cluster_centers.size());
  391. for (const DSample &ds : km->cluster_centers) {
  392. if (ds.size() != 6)
  393. fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
  394. for (long idx = 0; idx != ds.size(); idx++) {
  395. calculated_number_t cn = ds(idx);
  396. int rc = sqlite3_bind_double(res, ++param, cn);
  397. if (unlikely(rc != SQLITE_OK))
  398. goto bind_fail;
  399. }
  400. }
  401. rc = execute_insert(res);
  402. if (unlikely(rc != SQLITE_DONE)) {
  403. error_report("Failed to store model, rc = %d", rc);
  404. return rc;
  405. }
  406. rc = sqlite3_reset(res);
  407. if (unlikely(rc != SQLITE_OK)) {
  408. error_report("Failed to reset statement when storing model, rc = %d", rc);
  409. return rc;
  410. }
  411. return 0;
  412. bind_fail:
  413. error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
  414. rc = sqlite3_reset(res);
  415. if (unlikely(rc != SQLITE_OK))
  416. error_report("Failed to reset statement to store model, rc = %d", rc);
  417. return rc;
  418. }
  419. static int
  420. ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
  421. {
  422. static __thread sqlite3_stmt *res = NULL;
  423. int rc = 0;
  424. int param = 0;
  425. if (unlikely(!db)) {
  426. error_report("Database has not been initialized");
  427. return 1;
  428. }
  429. if (unlikely(!res)) {
  430. rc = prepare_statement(db, db_models_delete, &res);
  431. if (unlikely(rc != SQLITE_OK)) {
  432. error_report("Failed to prepare statement to delete models, rc = %d", rc);
  433. return rc;
  434. }
  435. }
  436. rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
  437. if (unlikely(rc != SQLITE_OK))
  438. goto bind_fail;
  439. rc = sqlite3_bind_int(res, ++param, (int) before);
  440. if (unlikely(rc != SQLITE_OK))
  441. goto bind_fail;
  442. rc = execute_insert(res);
  443. if (unlikely(rc != SQLITE_DONE)) {
  444. error_report("Failed to delete models, rc = %d", rc);
  445. return rc;
  446. }
  447. rc = sqlite3_reset(res);
  448. if (unlikely(rc != SQLITE_OK)) {
  449. error_report("Failed to reset statement when deleting models, rc = %d", rc);
  450. return rc;
  451. }
  452. return 0;
  453. bind_fail:
  454. error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
  455. rc = sqlite3_reset(res);
  456. if (unlikely(rc != SQLITE_OK))
  457. error_report("Failed to reset statement to delete models, rc = %d", rc);
  458. return rc;
  459. }
  460. static int
  461. ml_prune_old_models(size_t num_models_to_prune)
  462. {
  463. static __thread sqlite3_stmt *res = NULL;
  464. int rc = 0;
  465. int param = 0;
  466. if (unlikely(!db)) {
  467. error_report("Database has not been initialized");
  468. return 1;
  469. }
  470. if (unlikely(!res)) {
  471. rc = prepare_statement(db, db_models_prune, &res);
  472. if (unlikely(rc != SQLITE_OK)) {
  473. error_report("Failed to prepare statement to prune models, rc = %d", rc);
  474. return rc;
  475. }
  476. }
  477. int after = (int) (now_realtime_sec() - Cfg.delete_models_older_than);
  478. rc = sqlite3_bind_int(res, ++param, after);
  479. if (unlikely(rc != SQLITE_OK))
  480. goto bind_fail;
  481. rc = sqlite3_bind_int(res, ++param, num_models_to_prune);
  482. if (unlikely(rc != SQLITE_OK))
  483. goto bind_fail;
  484. rc = execute_insert(res);
  485. if (unlikely(rc != SQLITE_DONE)) {
  486. error_report("Failed to prune old models, rc = %d", rc);
  487. return rc;
  488. }
  489. rc = sqlite3_reset(res);
  490. if (unlikely(rc != SQLITE_OK)) {
  491. error_report("Failed to reset statement when pruning old models, rc = %d", rc);
  492. return rc;
  493. }
  494. return 0;
  495. bind_fail:
  496. error_report("Failed to bind parameter %d to prune old models, rc = %d", param, rc);
  497. rc = sqlite3_reset(res);
  498. if (unlikely(rc != SQLITE_OK))
  499. error_report("Failed to reset statement to prune old models, rc = %d", rc);
  500. return rc;
  501. }
  502. int ml_dimension_load_models(RRDDIM *rd) {
  503. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  504. if (!dim)
  505. return 0;
  506. spinlock_lock(&dim->slock);
  507. bool is_empty = dim->km_contexts.empty();
  508. spinlock_unlock(&dim->slock);
  509. if (!is_empty)
  510. return 0;
  511. std::vector<ml_kmeans_t> V;
  512. static __thread sqlite3_stmt *res = NULL;
  513. int rc = 0;
  514. int param = 0;
  515. if (unlikely(!db)) {
  516. error_report("Database has not been initialized");
  517. return 1;
  518. }
  519. if (unlikely(!res)) {
  520. rc = prepare_statement(db, db_models_load, &res);
  521. if (unlikely(rc != SQLITE_OK)) {
  522. error_report("Failed to prepare statement to load models, rc = %d", rc);
  523. return 1;
  524. }
  525. }
  526. rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
  527. if (unlikely(rc != SQLITE_OK))
  528. goto bind_fail;
  529. rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
  530. if (unlikely(rc != SQLITE_OK))
  531. goto bind_fail;
  532. spinlock_lock(&dim->slock);
  533. dim->km_contexts.reserve(Cfg.num_models_to_use);
  534. while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
  535. ml_kmeans_t km;
  536. km.after = sqlite3_column_int(res, 2);
  537. km.before = sqlite3_column_int(res, 3);
  538. km.min_dist = sqlite3_column_int(res, 4);
  539. km.max_dist = sqlite3_column_int(res, 5);
  540. km.cluster_centers.resize(2);
  541. km.cluster_centers[0].set_size(Cfg.lag_n + 1);
  542. km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
  543. km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
  544. km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
  545. km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
  546. km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
  547. km.cluster_centers[0](5) = sqlite3_column_double(res, 11);
  548. km.cluster_centers[1].set_size(Cfg.lag_n + 1);
  549. km.cluster_centers[1](0) = sqlite3_column_double(res, 12);
  550. km.cluster_centers[1](1) = sqlite3_column_double(res, 13);
  551. km.cluster_centers[1](2) = sqlite3_column_double(res, 14);
  552. km.cluster_centers[1](3) = sqlite3_column_double(res, 15);
  553. km.cluster_centers[1](4) = sqlite3_column_double(res, 16);
  554. km.cluster_centers[1](5) = sqlite3_column_double(res, 17);
  555. dim->km_contexts.push_back(km);
  556. }
  557. if (!dim->km_contexts.empty()) {
  558. dim->ts = TRAINING_STATUS_TRAINED;
  559. }
  560. spinlock_unlock(&dim->slock);
  561. if (unlikely(rc != SQLITE_DONE))
  562. error_report("Failed to load models, rc = %d", rc);
  563. rc = sqlite3_reset(res);
  564. if (unlikely(rc != SQLITE_OK))
  565. error_report("Failed to reset statement when loading models, rc = %d", rc);
  566. return 0;
  567. bind_fail:
  568. error_report("Failed to bind parameter %d to load models, rc = %d", param, rc);
  569. rc = sqlite3_reset(res);
  570. if (unlikely(rc != SQLITE_OK))
  571. error_report("Failed to reset statement to load models, rc = %d", rc);
  572. return 1;
  573. }
  574. static enum ml_training_result
  575. ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
  576. {
  577. worker_is_busy(WORKER_TRAIN_QUERY);
  578. auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
  579. ml_training_response_t training_response = P.second;
  580. if (training_response.result != TRAINING_RESULT_OK) {
  581. spinlock_lock(&dim->slock);
  582. dim->mt = METRIC_TYPE_CONSTANT;
  583. switch (dim->ts) {
  584. case TRAINING_STATUS_PENDING_WITH_MODEL:
  585. dim->ts = TRAINING_STATUS_TRAINED;
  586. break;
  587. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  588. dim->ts = TRAINING_STATUS_UNTRAINED;
  589. break;
  590. default:
  591. break;
  592. }
  593. dim->suppression_anomaly_counter = 0;
  594. dim->suppression_window_counter = 0;
  595. dim->tr = training_response;
  596. dim->last_training_time = training_response.last_entry_on_response;
  597. enum ml_training_result result = training_response.result;
  598. spinlock_unlock(&dim->slock);
  599. return result;
  600. }
  601. // compute kmeans
  602. worker_is_busy(WORKER_TRAIN_KMEANS);
  603. {
  604. memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
  605. training_response.total_values * sizeof(calculated_number_t));
  606. ml_features_t features = {
  607. Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
  608. training_thread->scratch_training_cns, training_response.total_values,
  609. training_thread->training_cns, training_response.total_values,
  610. training_thread->training_samples
  611. };
  612. ml_features_preprocess(&features);
  613. ml_kmeans_init(&dim->kmeans);
  614. ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t);
  615. }
  616. // update models
  617. worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
  618. {
  619. spinlock_lock(&dim->slock);
  620. if (dim->km_contexts.size() < Cfg.num_models_to_use) {
  621. dim->km_contexts.push_back(std::move(dim->kmeans));
  622. } else {
  623. bool can_drop_middle_km = false;
  624. if (Cfg.num_models_to_use > 2) {
  625. const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1];
  626. const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2];
  627. const ml_kmeans_t *new_km = &dim->kmeans;
  628. can_drop_middle_km = (middle_km->after < old_km->before) &&
  629. (middle_km->before > new_km->after);
  630. }
  631. if (can_drop_middle_km) {
  632. dim->km_contexts.back() = dim->kmeans;
  633. } else {
  634. std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
  635. dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
  636. }
  637. }
  638. dim->mt = METRIC_TYPE_CONSTANT;
  639. dim->ts = TRAINING_STATUS_TRAINED;
  640. dim->suppression_anomaly_counter = 0;
  641. dim->suppression_window_counter = 0;
  642. dim->tr = training_response;
  643. dim->last_training_time = rrddim_last_entry_s(dim->rd);
  644. // Add the newly generated model to the list of pending models to flush
  645. ml_model_info_t model_info;
  646. uuid_copy(model_info.metric_uuid, dim->rd->metric_uuid);
  647. model_info.kmeans = dim->km_contexts.back();
  648. training_thread->pending_model_info.push_back(model_info);
  649. spinlock_unlock(&dim->slock);
  650. }
  651. return training_response.result;
  652. }
  653. static void
  654. ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
  655. {
  656. switch (dim->mt) {
  657. case METRIC_TYPE_CONSTANT:
  658. return;
  659. default:
  660. break;
  661. }
  662. bool schedule_for_training = false;
  663. switch (dim->ts) {
  664. case TRAINING_STATUS_PENDING_WITH_MODEL:
  665. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  666. schedule_for_training = false;
  667. break;
  668. case TRAINING_STATUS_UNTRAINED:
  669. schedule_for_training = true;
  670. dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL;
  671. break;
  672. case TRAINING_STATUS_SILENCED:
  673. case TRAINING_STATUS_TRAINED:
  674. if ((dim->last_training_time + (Cfg.train_every * dim->rd->rrdset->update_every)) < curr_time) {
  675. schedule_for_training = true;
  676. dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
  677. }
  678. break;
  679. }
  680. if (schedule_for_training) {
  681. ml_training_request_t req;
  682. memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1);
  683. req.chart_id = string_dup(dim->rd->rrdset->id);
  684. req.dimension_id = string_dup(dim->rd->id);
  685. req.request_time = curr_time;
  686. req.first_entry_on_request = rrddim_first_entry_s(dim->rd);
  687. req.last_entry_on_request = rrddim_last_entry_s(dim->rd);
  688. ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
  689. ml_queue_push(host->training_queue, req);
  690. }
  691. }
  692. static bool
  693. ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists)
  694. {
  695. // Nothing to do if ML is disabled for this dimension
  696. if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED)
  697. return false;
  698. // Don't treat values that don't exist as anomalous
  699. if (!exists) {
  700. dim->cns.clear();
  701. return false;
  702. }
  703. // Save the value and return if we don't have enough values for a sample
  704. unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n;
  705. if (dim->cns.size() < n) {
  706. dim->cns.push_back(value);
  707. return false;
  708. }
  709. // Push the value and check if it's different from the last one
  710. bool same_value = true;
  711. std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns));
  712. if (dim->cns[n - 1] != value)
  713. same_value = false;
  714. dim->cns[n - 1] = value;
  715. // Create the sample
  716. assert((n * (Cfg.lag_n + 1) <= 128) &&
  717. "Static buffers too small to perform prediction. "
  718. "This should not be possible with the default clamping of feature extraction options");
  719. calculated_number_t src_cns[128];
  720. calculated_number_t dst_cns[128];
  721. memset(src_cns, 0, n * (Cfg.lag_n + 1) * sizeof(calculated_number_t));
  722. memcpy(src_cns, dim->cns.data(), n * sizeof(calculated_number_t));
  723. memcpy(dst_cns, dim->cns.data(), n * sizeof(calculated_number_t));
  724. ml_features_t features = {
  725. Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
  726. dst_cns, n, src_cns, n,
  727. dim->feature
  728. };
  729. ml_features_preprocess(&features);
  730. /*
  731. * Lock to predict and possibly schedule the dimension for training
  732. */
  733. if (spinlock_trylock(&dim->slock) == 0)
  734. return false;
  735. // Mark the metric time as variable if we received different values
  736. if (!same_value)
  737. dim->mt = METRIC_TYPE_VARIABLE;
  738. // Decide if the dimension needs to be scheduled for training
  739. ml_dimension_schedule_for_training(dim, curr_time);
  740. // Nothing to do if we don't have a model
  741. switch (dim->ts) {
  742. case TRAINING_STATUS_UNTRAINED:
  743. case TRAINING_STATUS_PENDING_WITHOUT_MODEL: {
  744. case TRAINING_STATUS_SILENCED:
  745. spinlock_unlock(&dim->slock);
  746. return false;
  747. }
  748. default:
  749. break;
  750. }
  751. dim->suppression_window_counter++;
  752. /*
  753. * Use the KMeans models to check if the value is anomalous
  754. */
  755. size_t sum = 0;
  756. size_t models_consulted = 0;
  757. for (const auto &km_ctx : dim->km_contexts) {
  758. models_consulted++;
  759. calculated_number_t anomaly_score = ml_kmeans_anomaly_score(&km_ctx, features.preprocessed_features[0]);
  760. if (anomaly_score == std::numeric_limits<calculated_number_t>::quiet_NaN())
  761. continue;
  762. if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) {
  763. global_statistics_ml_models_consulted(models_consulted);
  764. spinlock_unlock(&dim->slock);
  765. return false;
  766. }
  767. sum += 1;
  768. }
  769. dim->suppression_anomaly_counter += sum ? 1 : 0;
  770. if ((dim->suppression_anomaly_counter >= Cfg.suppression_threshold) &&
  771. (dim->suppression_window_counter >= Cfg.suppression_window)) {
  772. dim->ts = TRAINING_STATUS_SILENCED;
  773. }
  774. spinlock_unlock(&dim->slock);
  775. global_statistics_ml_models_consulted(models_consulted);
  776. return sum;
  777. }
  778. /*
  779. * Chart
  780. */
  781. static bool
  782. ml_chart_is_available_for_ml(ml_chart_t *chart)
  783. {
  784. return rrdset_is_available_for_exporting_and_alarms(chart->rs);
  785. }
  786. void
  787. ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous)
  788. {
  789. switch (dim->mls) {
  790. case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
  791. chart->mls.num_machine_learning_status_disabled_sp++;
  792. return;
  793. case MACHINE_LEARNING_STATUS_ENABLED: {
  794. chart->mls.num_machine_learning_status_enabled++;
  795. switch (dim->mt) {
  796. case METRIC_TYPE_CONSTANT:
  797. chart->mls.num_metric_type_constant++;
  798. chart->mls.num_training_status_trained++;
  799. chart->mls.num_normal_dimensions++;
  800. return;
  801. case METRIC_TYPE_VARIABLE:
  802. chart->mls.num_metric_type_variable++;
  803. break;
  804. }
  805. switch (dim->ts) {
  806. case TRAINING_STATUS_UNTRAINED:
  807. chart->mls.num_training_status_untrained++;
  808. return;
  809. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  810. chart->mls.num_training_status_pending_without_model++;
  811. return;
  812. case TRAINING_STATUS_TRAINED:
  813. chart->mls.num_training_status_trained++;
  814. chart->mls.num_anomalous_dimensions += is_anomalous;
  815. chart->mls.num_normal_dimensions += !is_anomalous;
  816. return;
  817. case TRAINING_STATUS_PENDING_WITH_MODEL:
  818. chart->mls.num_training_status_pending_with_model++;
  819. chart->mls.num_anomalous_dimensions += is_anomalous;
  820. chart->mls.num_normal_dimensions += !is_anomalous;
  821. return;
  822. case TRAINING_STATUS_SILENCED:
  823. chart->mls.num_training_status_silenced++;
  824. chart->mls.num_training_status_trained++;
  825. chart->mls.num_anomalous_dimensions += is_anomalous;
  826. chart->mls.num_normal_dimensions += !is_anomalous;
  827. return;
  828. }
  829. return;
  830. }
  831. }
  832. }
  833. /*
  834. * Host detection & training functions
  835. */
  836. #define WORKER_JOB_DETECTION_COLLECT_STATS 0
  837. #define WORKER_JOB_DETECTION_DIM_CHART 1
  838. #define WORKER_JOB_DETECTION_HOST_CHART 2
  839. #define WORKER_JOB_DETECTION_STATS 3
  840. static void
  841. ml_host_detect_once(ml_host_t *host)
  842. {
  843. worker_is_busy(WORKER_JOB_DETECTION_COLLECT_STATS);
  844. host->mls = {};
  845. ml_machine_learning_stats_t mls_copy = {};
  846. if (host->ml_running) {
  847. netdata_mutex_lock(&host->mutex);
  848. /*
  849. * prediction/detection stats
  850. */
  851. void *rsp = NULL;
  852. rrdset_foreach_read(rsp, host->rh) {
  853. RRDSET *rs = static_cast<RRDSET *>(rsp);
  854. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  855. if (!chart)
  856. continue;
  857. if (!ml_chart_is_available_for_ml(chart))
  858. continue;
  859. ml_machine_learning_stats_t chart_mls = chart->mls;
  860. host->mls.num_machine_learning_status_enabled += chart_mls.num_machine_learning_status_enabled;
  861. host->mls.num_machine_learning_status_disabled_sp += chart_mls.num_machine_learning_status_disabled_sp;
  862. host->mls.num_metric_type_constant += chart_mls.num_metric_type_constant;
  863. host->mls.num_metric_type_variable += chart_mls.num_metric_type_variable;
  864. host->mls.num_training_status_untrained += chart_mls.num_training_status_untrained;
  865. host->mls.num_training_status_pending_without_model += chart_mls.num_training_status_pending_without_model;
  866. host->mls.num_training_status_trained += chart_mls.num_training_status_trained;
  867. host->mls.num_training_status_pending_with_model += chart_mls.num_training_status_pending_with_model;
  868. host->mls.num_training_status_silenced += chart_mls.num_training_status_silenced;
  869. host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
  870. host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
  871. STRING *key = rs->parts.type;
  872. auto &um = host->type_anomaly_rate;
  873. auto it = um.find(key);
  874. if (it == um.end()) {
  875. um[key] = ml_type_anomaly_rate_t {
  876. .rd = NULL,
  877. .normal_dimensions = 0,
  878. .anomalous_dimensions = 0
  879. };
  880. it = um.find(key);
  881. }
  882. it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
  883. it->second.normal_dimensions += chart_mls.num_normal_dimensions;
  884. }
  885. rrdset_foreach_done(rsp);
  886. host->host_anomaly_rate = 0.0;
  887. size_t NumActiveDimensions = host->mls.num_anomalous_dimensions + host->mls.num_normal_dimensions;
  888. if (NumActiveDimensions)
  889. host->host_anomaly_rate = static_cast<double>(host->mls.num_anomalous_dimensions) / NumActiveDimensions;
  890. mls_copy = host->mls;
  891. netdata_mutex_unlock(&host->mutex);
  892. } else {
  893. host->host_anomaly_rate = 0.0;
  894. auto &um = host->type_anomaly_rate;
  895. for (auto &entry: um) {
  896. entry.second = ml_type_anomaly_rate_t {
  897. .rd = NULL,
  898. .normal_dimensions = 0,
  899. .anomalous_dimensions = 0
  900. };
  901. }
  902. }
  903. worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
  904. ml_update_dimensions_chart(host, mls_copy);
  905. worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
  906. ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
  907. }
  908. typedef struct {
  909. RRDHOST_ACQUIRED *acq_rh;
  910. RRDSET_ACQUIRED *acq_rs;
  911. RRDDIM_ACQUIRED *acq_rd;
  912. ml_dimension_t *dim;
  913. } ml_acquired_dimension_t;
  914. static ml_acquired_dimension_t
  915. ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id)
  916. {
  917. RRDHOST_ACQUIRED *acq_rh = NULL;
  918. RRDSET_ACQUIRED *acq_rs = NULL;
  919. RRDDIM_ACQUIRED *acq_rd = NULL;
  920. ml_dimension_t *dim = NULL;
  921. rrd_rdlock();
  922. acq_rh = rrdhost_find_and_acquire(machine_guid);
  923. if (acq_rh) {
  924. RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh);
  925. if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) {
  926. acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id));
  927. if (acq_rs) {
  928. RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs);
  929. if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_OBSOLETE)) {
  930. acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
  931. if (acq_rd) {
  932. RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
  933. if (rd)
  934. dim = (ml_dimension_t *) rd->ml_dimension;
  935. }
  936. }
  937. }
  938. }
  939. }
  940. rrd_unlock();
  941. ml_acquired_dimension_t acq_dim = {
  942. acq_rh, acq_rs, acq_rd, dim
  943. };
  944. return acq_dim;
  945. }
  946. static void
  947. ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
  948. {
  949. if (acq_dim.acq_rd)
  950. rrddim_acquired_release(acq_dim.acq_rd);
  951. if (acq_dim.acq_rs)
  952. rrdset_acquired_release(acq_dim.acq_rs);
  953. if (acq_dim.acq_rh)
  954. rrdhost_acquired_release(acq_dim.acq_rh);
  955. }
  956. static enum ml_training_result
  957. ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
  958. {
  959. if (!acq_dim.dim)
  960. return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
  961. return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
  962. }
  963. static void *
  964. ml_detect_main(void *arg)
  965. {
  966. UNUSED(arg);
  967. worker_register("MLDETECT");
  968. worker_register_job_name(WORKER_JOB_DETECTION_COLLECT_STATS, "collect stats");
  969. worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
  970. worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
  971. worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
  972. heartbeat_t hb;
  973. heartbeat_init(&hb);
  974. while (!Cfg.detection_stop) {
  975. worker_is_idle();
  976. heartbeat_next(&hb, USEC_PER_SEC);
  977. RRDHOST *rh;
  978. rrd_rdlock();
  979. rrdhost_foreach_read(rh) {
  980. if (!rh->ml_host)
  981. continue;
  982. ml_host_detect_once((ml_host_t *) rh->ml_host);
  983. }
  984. rrd_unlock();
  985. if (Cfg.enable_statistics_charts) {
  986. // collect and update training thread stats
  987. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  988. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  989. netdata_mutex_lock(&training_thread->nd_mutex);
  990. ml_training_stats_t training_stats = training_thread->training_stats;
  991. training_thread->training_stats = {};
  992. netdata_mutex_unlock(&training_thread->nd_mutex);
  993. // calc the avg values
  994. if (training_stats.num_popped_items) {
  995. training_stats.queue_size /= training_stats.num_popped_items;
  996. training_stats.allotted_ut /= training_stats.num_popped_items;
  997. training_stats.consumed_ut /= training_stats.num_popped_items;
  998. training_stats.remaining_ut /= training_stats.num_popped_items;
  999. } else {
  1000. training_stats.queue_size = ml_queue_size(training_thread->training_queue);
  1001. training_stats.consumed_ut = 0;
  1002. training_stats.remaining_ut = training_stats.allotted_ut;
  1003. training_stats.training_result_ok = 0;
  1004. training_stats.training_result_invalid_query_time_range = 0;
  1005. training_stats.training_result_not_enough_collected_values = 0;
  1006. training_stats.training_result_null_acquired_dimension = 0;
  1007. training_stats.training_result_chart_under_replication = 0;
  1008. }
  1009. ml_update_training_statistics_chart(training_thread, training_stats);
  1010. }
  1011. }
  1012. }
  1013. return NULL;
  1014. }
  1015. /*
  1016. * Public API
  1017. */
  1018. bool ml_capable()
  1019. {
  1020. return true;
  1021. }
  1022. bool ml_enabled(RRDHOST *rh)
  1023. {
  1024. if (!rh)
  1025. return false;
  1026. if (!Cfg.enable_anomaly_detection)
  1027. return false;
  1028. if (simple_pattern_matches(Cfg.sp_host_to_skip, rrdhost_hostname(rh)))
  1029. return false;
  1030. return true;
  1031. }
  1032. bool ml_streaming_enabled()
  1033. {
  1034. return Cfg.stream_anomaly_detection_charts;
  1035. }
  1036. void ml_host_new(RRDHOST *rh)
  1037. {
  1038. if (!ml_enabled(rh))
  1039. return;
  1040. ml_host_t *host = new ml_host_t();
  1041. host->rh = rh;
  1042. host->mls = ml_machine_learning_stats_t();
  1043. host->host_anomaly_rate = 0.0;
  1044. host->anomaly_rate_rs = NULL;
  1045. static std::atomic<size_t> times_called(0);
  1046. host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
  1047. netdata_mutex_init(&host->mutex);
  1048. host->ml_running = true;
  1049. rh->ml_host = (rrd_ml_host_t *) host;
  1050. }
  1051. void ml_host_delete(RRDHOST *rh)
  1052. {
  1053. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1054. if (!host)
  1055. return;
  1056. netdata_mutex_destroy(&host->mutex);
  1057. delete host;
  1058. rh->ml_host = NULL;
  1059. }
  1060. void ml_host_start(RRDHOST *rh) {
  1061. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1062. if (!host)
  1063. return;
  1064. host->ml_running = true;
  1065. }
  1066. void ml_host_stop(RRDHOST *rh) {
  1067. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1068. if (!host || !host->ml_running)
  1069. return;
  1070. netdata_mutex_lock(&host->mutex);
  1071. // reset host stats
  1072. host->mls = ml_machine_learning_stats_t();
  1073. // reset charts/dims
  1074. void *rsp = NULL;
  1075. rrdset_foreach_read(rsp, host->rh) {
  1076. RRDSET *rs = static_cast<RRDSET *>(rsp);
  1077. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1078. if (!chart)
  1079. continue;
  1080. // reset chart
  1081. chart->mls = ml_machine_learning_stats_t();
  1082. void *rdp = NULL;
  1083. rrddim_foreach_read(rdp, rs) {
  1084. RRDDIM *rd = static_cast<RRDDIM *>(rdp);
  1085. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  1086. if (!dim)
  1087. continue;
  1088. spinlock_lock(&dim->slock);
  1089. // reset dim
  1090. // TODO: should we drop in-mem models, or mark them as stale? Is it
  1091. // okay to resume training straight away?
  1092. dim->mt = METRIC_TYPE_CONSTANT;
  1093. dim->ts = TRAINING_STATUS_UNTRAINED;
  1094. dim->last_training_time = 0;
  1095. dim->suppression_anomaly_counter = 0;
  1096. dim->suppression_window_counter = 0;
  1097. dim->cns.clear();
  1098. ml_kmeans_init(&dim->kmeans);
  1099. spinlock_unlock(&dim->slock);
  1100. }
  1101. rrddim_foreach_done(rdp);
  1102. }
  1103. rrdset_foreach_done(rsp);
  1104. netdata_mutex_unlock(&host->mutex);
  1105. host->ml_running = false;
  1106. }
  1107. void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
  1108. {
  1109. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1110. if (!host) {
  1111. buffer_json_member_add_boolean(wb, "enabled", false);
  1112. return;
  1113. }
  1114. buffer_json_member_add_uint64(wb, "version", 1);
  1115. buffer_json_member_add_boolean(wb, "enabled", Cfg.enable_anomaly_detection);
  1116. buffer_json_member_add_uint64(wb, "min-train-samples", Cfg.min_train_samples);
  1117. buffer_json_member_add_uint64(wb, "max-train-samples", Cfg.max_train_samples);
  1118. buffer_json_member_add_uint64(wb, "train-every", Cfg.train_every);
  1119. buffer_json_member_add_uint64(wb, "diff-n", Cfg.diff_n);
  1120. buffer_json_member_add_uint64(wb, "smooth-n", Cfg.smooth_n);
  1121. buffer_json_member_add_uint64(wb, "lag-n", Cfg.lag_n);
  1122. buffer_json_member_add_double(wb, "random-sampling-ratio", Cfg.random_sampling_ratio);
  1123. buffer_json_member_add_uint64(wb, "max-kmeans-iters", Cfg.random_sampling_ratio);
  1124. buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold);
  1125. buffer_json_member_add_string(wb, "anomaly-detection-grouping-method",
  1126. time_grouping_method2string(Cfg.anomaly_detection_grouping_method));
  1127. buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration);
  1128. buffer_json_member_add_string(wb, "hosts-to-skip", Cfg.hosts_to_skip.c_str());
  1129. buffer_json_member_add_string(wb, "charts-to-skip", Cfg.charts_to_skip.c_str());
  1130. }
  1131. void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
  1132. {
  1133. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1134. if (!host)
  1135. return;
  1136. netdata_mutex_lock(&host->mutex);
  1137. buffer_json_member_add_uint64(wb, "version", 2);
  1138. buffer_json_member_add_uint64(wb, "ml-running", host->ml_running);
  1139. buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions);
  1140. buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions);
  1141. buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions +
  1142. host->mls.num_normal_dimensions);
  1143. buffer_json_member_add_uint64(wb, "trained-dimensions", host->mls.num_training_status_trained +
  1144. host->mls.num_training_status_pending_with_model);
  1145. netdata_mutex_unlock(&host->mutex);
  1146. }
  1147. bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm) {
  1148. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1149. if (!host) {
  1150. memset(mlm, 0, sizeof(*mlm));
  1151. return false;
  1152. }
  1153. netdata_mutex_lock(&host->mutex);
  1154. mlm->anomalous = host->mls.num_anomalous_dimensions;
  1155. mlm->normal = host->mls.num_normal_dimensions;
  1156. mlm->trained = host->mls.num_training_status_trained + host->mls.num_training_status_pending_with_model;
  1157. mlm->pending = host->mls.num_training_status_untrained + host->mls.num_training_status_pending_without_model;
  1158. mlm->silenced = host->mls.num_training_status_silenced;
  1159. netdata_mutex_unlock(&host->mutex);
  1160. return true;
  1161. }
  1162. bool ml_host_running(RRDHOST *rh) {
  1163. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1164. if(!host)
  1165. return false;
  1166. return true;
  1167. }
  1168. void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
  1169. {
  1170. UNUSED(rh);
  1171. UNUSED(wb);
  1172. // TODO: To be implemented
  1173. netdata_log_error("Fetching KMeans models is not supported yet");
  1174. }
  1175. void ml_chart_new(RRDSET *rs)
  1176. {
  1177. ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
  1178. if (!host)
  1179. return;
  1180. ml_chart_t *chart = new ml_chart_t();
  1181. chart->rs = rs;
  1182. chart->mls = ml_machine_learning_stats_t();
  1183. rs->ml_chart = (rrd_ml_chart_t *) chart;
  1184. }
  1185. void ml_chart_delete(RRDSET *rs)
  1186. {
  1187. ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
  1188. if (!host)
  1189. return;
  1190. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1191. delete chart;
  1192. rs->ml_chart = NULL;
  1193. }
  1194. bool ml_chart_update_begin(RRDSET *rs)
  1195. {
  1196. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1197. if (!chart)
  1198. return false;
  1199. chart->mls = {};
  1200. return true;
  1201. }
  1202. void ml_chart_update_end(RRDSET *rs)
  1203. {
  1204. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1205. if (!chart)
  1206. return;
  1207. }
  1208. void ml_dimension_new(RRDDIM *rd)
  1209. {
  1210. ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
  1211. if (!chart)
  1212. return;
  1213. ml_dimension_t *dim = new ml_dimension_t();
  1214. dim->rd = rd;
  1215. dim->mt = METRIC_TYPE_CONSTANT;
  1216. dim->ts = TRAINING_STATUS_UNTRAINED;
  1217. dim->last_training_time = 0;
  1218. dim->suppression_anomaly_counter = 0;
  1219. dim->suppression_window_counter = 0;
  1220. ml_kmeans_init(&dim->kmeans);
  1221. if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset)))
  1222. dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART;
  1223. else
  1224. dim->mls = MACHINE_LEARNING_STATUS_ENABLED;
  1225. spinlock_init(&dim->slock);
  1226. dim->km_contexts.reserve(Cfg.num_models_to_use);
  1227. rd->ml_dimension = (rrd_ml_dimension_t *) dim;
  1228. metaqueue_ml_load_models(rd);
  1229. }
  1230. void ml_dimension_delete(RRDDIM *rd)
  1231. {
  1232. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  1233. if (!dim)
  1234. return;
  1235. delete dim;
  1236. rd->ml_dimension = NULL;
  1237. }
  1238. bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists)
  1239. {
  1240. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  1241. if (!dim)
  1242. return false;
  1243. ml_host_t *host = (ml_host_t *) rd->rrdset->rrdhost->ml_host;
  1244. if (!host->ml_running)
  1245. return false;
  1246. ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
  1247. bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists);
  1248. ml_chart_update_dimension(chart, dim, is_anomalous);
  1249. return is_anomalous;
  1250. }
  1251. static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
  1252. int op_no = 1;
  1253. // begin transaction
  1254. int rc = db_execute(db, "BEGIN TRANSACTION;");
  1255. // add/delete models
  1256. if (!rc) {
  1257. op_no++;
  1258. for (const auto &pending_model: training_thread->pending_model_info) {
  1259. if (!rc)
  1260. rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
  1261. if (!rc)
  1262. rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
  1263. }
  1264. }
  1265. // prune old models
  1266. if (!rc) {
  1267. if ((training_thread->num_db_transactions % 64) == 0) {
  1268. rc = ml_prune_old_models(training_thread->num_models_to_prune);
  1269. if (!rc)
  1270. training_thread->num_models_to_prune = 0;
  1271. }
  1272. }
  1273. // commit transaction
  1274. if (!rc) {
  1275. op_no++;
  1276. rc = db_execute(db, "COMMIT TRANSACTION;");
  1277. }
  1278. // rollback transaction on failure
  1279. if (rc) {
  1280. netdata_log_error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
  1281. op_no++;
  1282. rc = db_execute(db, "ROLLBACK;");
  1283. if (rc)
  1284. netdata_log_error("ML transaction rollback failed with rc=%d", rc);
  1285. }
  1286. if (!rc) {
  1287. training_thread->num_db_transactions++;
  1288. training_thread->num_models_to_prune += training_thread->pending_model_info.size();
  1289. }
  1290. vacuum_database(db, "ML", 0, 0);
  1291. training_thread->pending_model_info.clear();
  1292. }
  1293. static void *ml_train_main(void *arg) {
  1294. ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
  1295. char worker_name[1024];
  1296. snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
  1297. worker_register("MLTRAIN");
  1298. worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
  1299. worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
  1300. worker_register_job_name(WORKER_TRAIN_QUERY, "query");
  1301. worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
  1302. worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
  1303. worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
  1304. worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
  1305. worker_register_job_name(WORKER_TRAIN_FLUSH_MODELS, "flush models");
  1306. while (!Cfg.training_stop) {
  1307. worker_is_busy(WORKER_TRAIN_QUEUE_POP);
  1308. ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
  1309. // we know this thread has been cancelled, when the queue starts
  1310. // returning "null" requests without blocking on queue's pop().
  1311. if (training_req.chart_id == NULL)
  1312. break;
  1313. size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
  1314. usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size;
  1315. if (allotted_ut > USEC_PER_SEC)
  1316. allotted_ut = USEC_PER_SEC;
  1317. usec_t start_ut = now_monotonic_usec();
  1318. enum ml_training_result training_res;
  1319. {
  1320. worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
  1321. ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
  1322. training_req.machine_guid,
  1323. training_req.chart_id,
  1324. training_req.dimension_id);
  1325. training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
  1326. string_freez(training_req.chart_id);
  1327. string_freez(training_req.dimension_id);
  1328. worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION);
  1329. ml_acquired_dimension_release(acq_dim);
  1330. }
  1331. usec_t consumed_ut = now_monotonic_usec() - start_ut;
  1332. usec_t remaining_ut = 0;
  1333. if (consumed_ut < allotted_ut)
  1334. remaining_ut = allotted_ut - consumed_ut;
  1335. if (Cfg.enable_statistics_charts) {
  1336. worker_is_busy(WORKER_TRAIN_UPDATE_HOST);
  1337. netdata_mutex_lock(&training_thread->nd_mutex);
  1338. training_thread->training_stats.queue_size += queue_size;
  1339. training_thread->training_stats.num_popped_items += 1;
  1340. training_thread->training_stats.allotted_ut += allotted_ut;
  1341. training_thread->training_stats.consumed_ut += consumed_ut;
  1342. training_thread->training_stats.remaining_ut += remaining_ut;
  1343. switch (training_res) {
  1344. case TRAINING_RESULT_OK:
  1345. training_thread->training_stats.training_result_ok += 1;
  1346. break;
  1347. case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
  1348. training_thread->training_stats.training_result_invalid_query_time_range += 1;
  1349. break;
  1350. case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
  1351. training_thread->training_stats.training_result_not_enough_collected_values += 1;
  1352. break;
  1353. case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
  1354. training_thread->training_stats.training_result_null_acquired_dimension += 1;
  1355. break;
  1356. case TRAINING_RESULT_CHART_UNDER_REPLICATION:
  1357. training_thread->training_stats.training_result_chart_under_replication += 1;
  1358. break;
  1359. }
  1360. netdata_mutex_unlock(&training_thread->nd_mutex);
  1361. }
  1362. if (training_thread->pending_model_info.size() >= Cfg.flush_models_batch_size) {
  1363. worker_is_busy(WORKER_TRAIN_FLUSH_MODELS);
  1364. netdata_mutex_lock(&db_mutex);
  1365. ml_flush_pending_models(training_thread);
  1366. netdata_mutex_unlock(&db_mutex);
  1367. continue;
  1368. }
  1369. worker_is_idle();
  1370. std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
  1371. }
  1372. return NULL;
  1373. }
  1374. void ml_init()
  1375. {
  1376. // Read config values
  1377. ml_config_load(&Cfg);
  1378. if (!Cfg.enable_anomaly_detection)
  1379. return;
  1380. // Generate random numbers to efficiently sample the features we need
  1381. // for KMeans clustering.
  1382. std::random_device RD;
  1383. std::mt19937 Gen(RD());
  1384. Cfg.random_nums.reserve(Cfg.max_train_samples);
  1385. for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
  1386. Cfg.random_nums.push_back(Gen());
  1387. // init training thread-specific data
  1388. Cfg.training_threads.resize(Cfg.num_training_threads);
  1389. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1390. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1391. size_t max_elements_needed_for_training = (size_t) Cfg.max_train_samples * (size_t) (Cfg.lag_n + 1);
  1392. training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training]();
  1393. training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
  1394. training_thread->id = idx;
  1395. training_thread->training_queue = ml_queue_init();
  1396. training_thread->pending_model_info.reserve(Cfg.flush_models_batch_size);
  1397. netdata_mutex_init(&training_thread->nd_mutex);
  1398. }
  1399. // open sqlite db
  1400. char path[FILENAME_MAX];
  1401. snprintfz(path, FILENAME_MAX - 1, "%s/%s", netdata_configured_cache_dir, "ml.db");
  1402. int rc = sqlite3_open(path, &db);
  1403. if (rc != SQLITE_OK) {
  1404. error_report("Failed to initialize database at %s, due to \"%s\"", path, sqlite3_errstr(rc));
  1405. sqlite3_close(db);
  1406. db = NULL;
  1407. }
  1408. // create table
  1409. if (db) {
  1410. int target_version = perform_ml_database_migration(db, ML_METADATA_VERSION);
  1411. if (configure_sqlite_database(db, target_version)) {
  1412. error_report("Failed to setup ML database");
  1413. sqlite3_close(db);
  1414. db = NULL;
  1415. }
  1416. else {
  1417. char *err = NULL;
  1418. int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
  1419. if (rc != SQLITE_OK) {
  1420. error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
  1421. sqlite3_close(db);
  1422. sqlite3_free(err);
  1423. db = NULL;
  1424. }
  1425. }
  1426. }
  1427. }
  1428. void ml_fini() {
  1429. if (!Cfg.enable_anomaly_detection)
  1430. return;
  1431. int rc = sqlite3_close_v2(db);
  1432. if (unlikely(rc != SQLITE_OK))
  1433. error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
  1434. }
  1435. void ml_start_threads() {
  1436. if (!Cfg.enable_anomaly_detection)
  1437. return;
  1438. // start detection & training threads
  1439. Cfg.detection_stop = false;
  1440. Cfg.training_stop = false;
  1441. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1442. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
  1443. netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
  1444. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1445. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1446. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
  1447. netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
  1448. }
  1449. }
  1450. void ml_stop_threads()
  1451. {
  1452. if (!Cfg.enable_anomaly_detection)
  1453. return;
  1454. Cfg.detection_stop = true;
  1455. Cfg.training_stop = true;
  1456. if (!Cfg.detection_thread)
  1457. return;
  1458. netdata_thread_cancel(Cfg.detection_thread);
  1459. netdata_thread_join(Cfg.detection_thread, NULL);
  1460. // signal the training queue of each thread
  1461. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1462. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1463. ml_queue_signal(training_thread->training_queue);
  1464. }
  1465. // cancel training threads
  1466. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1467. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1468. netdata_thread_cancel(training_thread->nd_thread);
  1469. }
  1470. // join training threads
  1471. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1472. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1473. netdata_thread_join(training_thread->nd_thread, NULL);
  1474. }
  1475. // clear training thread data
  1476. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1477. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1478. delete[] training_thread->training_cns;
  1479. delete[] training_thread->scratch_training_cns;
  1480. ml_queue_destroy(training_thread->training_queue);
  1481. netdata_mutex_destroy(&training_thread->nd_mutex);
  1482. }
  1483. }