ml.cc 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include <dlib/clustering.h>
  3. #include "ml-private.h"
  4. #include <random>
  5. #include "ad_charts.h"
  6. #include "database/sqlite/sqlite3.h"
  7. #define WORKER_TRAIN_QUEUE_POP 0
  8. #define WORKER_TRAIN_ACQUIRE_DIMENSION 1
  9. #define WORKER_TRAIN_QUERY 2
  10. #define WORKER_TRAIN_KMEANS 3
  11. #define WORKER_TRAIN_UPDATE_MODELS 4
  12. #define WORKER_TRAIN_RELEASE_DIMENSION 5
  13. #define WORKER_TRAIN_UPDATE_HOST 6
  14. #define WORKER_TRAIN_FLUSH_MODELS 7
  15. static sqlite3 *db = NULL;
  16. static netdata_mutex_t db_mutex = NETDATA_MUTEX_INITIALIZER;
  17. /*
  18. * Functions to convert enums to strings
  19. */
  20. __attribute__((unused)) static const char *
  21. ml_machine_learning_status_to_string(enum ml_machine_learning_status mls)
  22. {
  23. switch (mls) {
  24. case MACHINE_LEARNING_STATUS_ENABLED:
  25. return "enabled";
  26. case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
  27. return "disabled-sp";
  28. default:
  29. return "unknown";
  30. }
  31. }
  32. __attribute__((unused)) static const char *
  33. ml_metric_type_to_string(enum ml_metric_type mt)
  34. {
  35. switch (mt) {
  36. case METRIC_TYPE_CONSTANT:
  37. return "constant";
  38. case METRIC_TYPE_VARIABLE:
  39. return "variable";
  40. default:
  41. return "unknown";
  42. }
  43. }
  44. __attribute__((unused)) static const char *
  45. ml_training_status_to_string(enum ml_training_status ts)
  46. {
  47. switch (ts) {
  48. case TRAINING_STATUS_PENDING_WITH_MODEL:
  49. return "pending-with-model";
  50. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  51. return "pending-without-model";
  52. case TRAINING_STATUS_TRAINED:
  53. return "trained";
  54. case TRAINING_STATUS_UNTRAINED:
  55. return "untrained";
  56. case TRAINING_STATUS_SILENCED:
  57. return "silenced";
  58. default:
  59. return "unknown";
  60. }
  61. }
  62. __attribute__((unused)) static const char *
  63. ml_training_result_to_string(enum ml_training_result tr)
  64. {
  65. switch (tr) {
  66. case TRAINING_RESULT_OK:
  67. return "ok";
  68. case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
  69. return "invalid-query";
  70. case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
  71. return "missing-values";
  72. case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
  73. return "null-acquired-dim";
  74. case TRAINING_RESULT_CHART_UNDER_REPLICATION:
  75. return "chart-under-replication";
  76. default:
  77. return "unknown";
  78. }
  79. }
  80. /*
  81. * Features
  82. */
  83. // subtract elements that are `diff_n` positions apart
  84. static void
  85. ml_features_diff(ml_features_t *features)
  86. {
  87. if (features->diff_n == 0)
  88. return;
  89. for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) {
  90. size_t high = (features->src_n - 1) - idx;
  91. size_t low = high - features->diff_n;
  92. features->dst[low] = features->src[high] - features->src[low];
  93. }
  94. size_t n = features->src_n - features->diff_n;
  95. memcpy(features->src, features->dst, n * sizeof(calculated_number_t));
  96. for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++)
  97. features->src[idx] = 0.0;
  98. }
  99. // a function that computes the window average of an array inplace
  100. static void
  101. ml_features_smooth(ml_features_t *features)
  102. {
  103. calculated_number_t sum = 0.0;
  104. size_t idx = 0;
  105. for (; idx != features->smooth_n - 1; idx++)
  106. sum += features->src[idx];
  107. for (; idx != (features->src_n - features->diff_n); idx++) {
  108. sum += features->src[idx];
  109. calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)];
  110. features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n;
  111. sum -= prev_cn;
  112. }
  113. for (idx = 0; idx != features->smooth_n; idx++)
  114. features->src[(features->src_n - 1) - idx] = 0.0;
  115. }
  116. // create lag'd vectors out of the preprocessed buffer
  117. static void
  118. ml_features_lag(ml_features_t *features)
  119. {
  120. size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n;
  121. features->preprocessed_features.resize(n);
  122. unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio;
  123. double sampling_ratio = std::min(static_cast<double>(target_num_samples) / n, 1.0);
  124. uint32_t max_mt = std::numeric_limits<uint32_t>::max();
  125. uint32_t cutoff = static_cast<double>(max_mt) * sampling_ratio;
  126. size_t sample_idx = 0;
  127. for (size_t idx = 0; idx != n; idx++) {
  128. DSample &DS = features->preprocessed_features[sample_idx++];
  129. DS.set_size(features->lag_n);
  130. if (Cfg.random_nums[idx] > cutoff) {
  131. sample_idx--;
  132. continue;
  133. }
  134. for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++)
  135. DS(feature_idx) = features->src[idx + feature_idx];
  136. }
  137. features->preprocessed_features.resize(sample_idx);
  138. }
  139. static void
  140. ml_features_preprocess(ml_features_t *features)
  141. {
  142. ml_features_diff(features);
  143. ml_features_smooth(features);
  144. ml_features_lag(features);
  145. }
  146. /*
  147. * KMeans
  148. */
  149. static void
  150. ml_kmeans_init(ml_kmeans_t *kmeans)
  151. {
  152. kmeans->cluster_centers.reserve(2);
  153. kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
  154. kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
  155. }
  156. static void
  157. ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
  158. {
  159. kmeans->after = (uint32_t) after;
  160. kmeans->before = (uint32_t) before;
  161. kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
  162. kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
  163. kmeans->cluster_centers.clear();
  164. dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
  165. dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
  166. for (const auto &preprocessed_feature : features->preprocessed_features) {
  167. calculated_number_t mean_dist = 0.0;
  168. for (const auto &cluster_center : kmeans->cluster_centers) {
  169. mean_dist += dlib::length(cluster_center - preprocessed_feature);
  170. }
  171. mean_dist /= kmeans->cluster_centers.size();
  172. if (mean_dist < kmeans->min_dist)
  173. kmeans->min_dist = mean_dist;
  174. if (mean_dist > kmeans->max_dist)
  175. kmeans->max_dist = mean_dist;
  176. }
  177. }
  178. static calculated_number_t
  179. ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
  180. {
  181. calculated_number_t mean_dist = 0.0;
  182. for (const auto &CC: kmeans->cluster_centers)
  183. mean_dist += dlib::length(CC - DS);
  184. mean_dist /= kmeans->cluster_centers.size();
  185. if (kmeans->max_dist == kmeans->min_dist)
  186. return 0.0;
  187. calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist));
  188. return (anomaly_score > 100.0) ? 100.0 : anomaly_score;
  189. }
  190. /*
  191. * Queue
  192. */
  193. static ml_queue_t *
  194. ml_queue_init()
  195. {
  196. ml_queue_t *q = new ml_queue_t();
  197. netdata_mutex_init(&q->mutex);
  198. pthread_cond_init(&q->cond_var, NULL);
  199. q->exit = false;
  200. return q;
  201. }
  202. static void
  203. ml_queue_destroy(ml_queue_t *q)
  204. {
  205. netdata_mutex_destroy(&q->mutex);
  206. pthread_cond_destroy(&q->cond_var);
  207. delete q;
  208. }
  209. static void
  210. ml_queue_push(ml_queue_t *q, const ml_training_request_t req)
  211. {
  212. netdata_mutex_lock(&q->mutex);
  213. q->internal.push(req);
  214. pthread_cond_signal(&q->cond_var);
  215. netdata_mutex_unlock(&q->mutex);
  216. }
  217. static ml_training_request_t
  218. ml_queue_pop(ml_queue_t *q)
  219. {
  220. netdata_mutex_lock(&q->mutex);
  221. ml_training_request_t req = {
  222. {'\0'}, // machine_guid
  223. NULL, // chart id
  224. NULL, // dimension id
  225. 0, // current time
  226. 0, // first entry
  227. 0 // last entry
  228. };
  229. while (q->internal.empty()) {
  230. pthread_cond_wait(&q->cond_var, &q->mutex);
  231. if (q->exit) {
  232. netdata_mutex_unlock(&q->mutex);
  233. // We return a dummy request because the queue has been signaled
  234. return req;
  235. }
  236. }
  237. req = q->internal.front();
  238. q->internal.pop();
  239. netdata_mutex_unlock(&q->mutex);
  240. return req;
  241. }
  242. static size_t
  243. ml_queue_size(ml_queue_t *q)
  244. {
  245. netdata_mutex_lock(&q->mutex);
  246. size_t size = q->internal.size();
  247. netdata_mutex_unlock(&q->mutex);
  248. return size;
  249. }
  250. static void
  251. ml_queue_signal(ml_queue_t *q)
  252. {
  253. netdata_mutex_lock(&q->mutex);
  254. q->exit = true;
  255. pthread_cond_signal(&q->cond_var);
  256. netdata_mutex_unlock(&q->mutex);
  257. }
  258. /*
  259. * Dimension
  260. */
  261. static std::pair<calculated_number_t *, ml_training_response_t>
  262. ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
  263. {
  264. ml_training_response_t training_response = {};
  265. training_response.request_time = training_request.request_time;
  266. training_response.first_entry_on_request = training_request.first_entry_on_request;
  267. training_response.last_entry_on_request = training_request.last_entry_on_request;
  268. training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0);
  269. training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0);
  270. size_t min_n = Cfg.min_train_samples;
  271. size_t max_n = Cfg.max_train_samples;
  272. // Figure out what our time window should be.
  273. training_response.query_before_t = training_response.last_entry_on_response;
  274. training_response.query_after_t = std::max(
  275. training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every),
  276. training_response.first_entry_on_response
  277. );
  278. if (training_response.query_after_t >= training_response.query_before_t) {
  279. training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE;
  280. return { NULL, training_response };
  281. }
  282. if (rrdset_is_replicating(dim->rd->rrdset)) {
  283. training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION;
  284. return { NULL, training_response };
  285. }
  286. /*
  287. * Execute the query
  288. */
  289. struct storage_engine_query_handle handle;
  290. storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle,
  291. training_response.query_after_t, training_response.query_before_t,
  292. STORAGE_PRIORITY_BEST_EFFORT);
  293. size_t idx = 0;
  294. memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
  295. calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
  296. while (!storage_engine_query_is_finished(&handle)) {
  297. if (idx == max_n)
  298. break;
  299. STORAGE_POINT sp = storage_engine_query_next_metric(&handle);
  300. time_t timestamp = sp.end_time_s;
  301. calculated_number_t value = sp.sum / sp.count;
  302. if (netdata_double_isnumber(value)) {
  303. if (!training_response.db_after_t)
  304. training_response.db_after_t = timestamp;
  305. training_response.db_before_t = timestamp;
  306. training_thread->training_cns[idx] = value;
  307. last_value = training_thread->training_cns[idx];
  308. training_response.collected_values++;
  309. } else
  310. training_thread->training_cns[idx] = last_value;
  311. idx++;
  312. }
  313. storage_engine_query_finalize(&handle);
  314. global_statistics_ml_query_completed(/* points_read */ idx);
  315. training_response.total_values = idx;
  316. if (training_response.collected_values < min_n) {
  317. training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES;
  318. return { NULL, training_response };
  319. }
  320. // Find first non-NaN value.
  321. for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
  322. // Overwrite NaN values.
  323. if (idx != 0)
  324. memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
  325. training_response.result = TRAINING_RESULT_OK;
  326. return { training_thread->training_cns, training_response };
  327. }
  328. const char *db_models_create_table =
  329. "CREATE TABLE IF NOT EXISTS models("
  330. " dim_id BLOB, after INT, before INT,"
  331. " min_dist REAL, max_dist REAL,"
  332. " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
  333. " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
  334. " PRIMARY KEY(dim_id, after)"
  335. ");";
  336. const char *db_models_add_model =
  337. "INSERT OR REPLACE INTO models("
  338. " dim_id, after, before,"
  339. " min_dist, max_dist,"
  340. " c00, c01, c02, c03, c04, c05,"
  341. " c10, c11, c12, c13, c14, c15)"
  342. "VALUES("
  343. " @dim_id, @after, @before,"
  344. " @min_dist, @max_dist,"
  345. " @c00, @c01, @c02, @c03, @c04, @c05,"
  346. " @c10, @c11, @c12, @c13, @c14, @c15);";
  347. const char *db_models_load =
  348. "SELECT * FROM models "
  349. "WHERE dim_id = @dim_id AND after >= @after ORDER BY before ASC;";
  350. const char *db_models_delete =
  351. "DELETE FROM models "
  352. "WHERE dim_id = @dim_id AND before < @before;";
  353. static int
  354. ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
  355. {
  356. static __thread sqlite3_stmt *res = NULL;
  357. int param = 0;
  358. int rc = 0;
  359. if (unlikely(!db)) {
  360. error_report("Database has not been initialized");
  361. return 1;
  362. }
  363. if (unlikely(!res)) {
  364. rc = prepare_statement(db, db_models_add_model, &res);
  365. if (unlikely(rc != SQLITE_OK)) {
  366. error_report("Failed to prepare statement to store model, rc = %d", rc);
  367. return 1;
  368. }
  369. }
  370. rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
  371. if (unlikely(rc != SQLITE_OK))
  372. goto bind_fail;
  373. rc = sqlite3_bind_int(res, ++param, (int) km->after);
  374. if (unlikely(rc != SQLITE_OK))
  375. goto bind_fail;
  376. rc = sqlite3_bind_int(res, ++param, (int) km->before);
  377. if (unlikely(rc != SQLITE_OK))
  378. goto bind_fail;
  379. rc = sqlite3_bind_double(res, ++param, km->min_dist);
  380. if (unlikely(rc != SQLITE_OK))
  381. goto bind_fail;
  382. rc = sqlite3_bind_double(res, ++param, km->max_dist);
  383. if (unlikely(rc != SQLITE_OK))
  384. goto bind_fail;
  385. if (km->cluster_centers.size() != 2)
  386. fatal("Expected 2 cluster centers, got %zu", km->cluster_centers.size());
  387. for (const DSample &ds : km->cluster_centers) {
  388. if (ds.size() != 6)
  389. fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
  390. for (long idx = 0; idx != ds.size(); idx++) {
  391. calculated_number_t cn = ds(idx);
  392. int rc = sqlite3_bind_double(res, ++param, cn);
  393. if (unlikely(rc != SQLITE_OK))
  394. goto bind_fail;
  395. }
  396. }
  397. rc = execute_insert(res);
  398. if (unlikely(rc != SQLITE_DONE)) {
  399. error_report("Failed to store model, rc = %d", rc);
  400. return rc;
  401. }
  402. rc = sqlite3_reset(res);
  403. if (unlikely(rc != SQLITE_OK)) {
  404. error_report("Failed to reset statement when storing model, rc = %d", rc);
  405. return rc;
  406. }
  407. return 0;
  408. bind_fail:
  409. error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
  410. rc = sqlite3_reset(res);
  411. if (unlikely(rc != SQLITE_OK))
  412. error_report("Failed to reset statement to store model, rc = %d", rc);
  413. return rc;
  414. }
  415. static int
  416. ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
  417. {
  418. static __thread sqlite3_stmt *res = NULL;
  419. int rc = 0;
  420. int param = 0;
  421. if (unlikely(!db)) {
  422. error_report("Database has not been initialized");
  423. return 1;
  424. }
  425. if (unlikely(!res)) {
  426. rc = prepare_statement(db, db_models_delete, &res);
  427. if (unlikely(rc != SQLITE_OK)) {
  428. error_report("Failed to prepare statement to delete models, rc = %d", rc);
  429. return rc;
  430. }
  431. }
  432. rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
  433. if (unlikely(rc != SQLITE_OK))
  434. goto bind_fail;
  435. rc = sqlite3_bind_int(res, ++param, (int) before);
  436. if (unlikely(rc != SQLITE_OK))
  437. goto bind_fail;
  438. rc = execute_insert(res);
  439. if (unlikely(rc != SQLITE_DONE)) {
  440. error_report("Failed to delete models, rc = %d", rc);
  441. return rc;
  442. }
  443. rc = sqlite3_reset(res);
  444. if (unlikely(rc != SQLITE_OK)) {
  445. error_report("Failed to reset statement when deleting models, rc = %d", rc);
  446. return rc;
  447. }
  448. return 0;
  449. bind_fail:
  450. error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
  451. rc = sqlite3_reset(res);
  452. if (unlikely(rc != SQLITE_OK))
  453. error_report("Failed to reset statement to delete models, rc = %d", rc);
  454. return rc;
  455. }
  456. int ml_dimension_load_models(RRDDIM *rd) {
  457. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  458. if (!dim)
  459. return 0;
  460. netdata_mutex_lock(&dim->mutex);
  461. bool is_empty = dim->km_contexts.empty();
  462. netdata_mutex_unlock(&dim->mutex);
  463. if (!is_empty)
  464. return 0;
  465. std::vector<ml_kmeans_t> V;
  466. static __thread sqlite3_stmt *res = NULL;
  467. int rc = 0;
  468. int param = 0;
  469. if (unlikely(!db)) {
  470. error_report("Database has not been initialized");
  471. return 1;
  472. }
  473. if (unlikely(!res)) {
  474. rc = prepare_statement(db, db_models_load, &res);
  475. if (unlikely(rc != SQLITE_OK)) {
  476. error_report("Failed to prepare statement to load models, rc = %d", rc);
  477. return 1;
  478. }
  479. }
  480. rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
  481. if (unlikely(rc != SQLITE_OK))
  482. goto bind_fail;
  483. rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
  484. if (unlikely(rc != SQLITE_OK))
  485. goto bind_fail;
  486. netdata_mutex_lock(&dim->mutex);
  487. dim->km_contexts.reserve(Cfg.num_models_to_use);
  488. while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
  489. ml_kmeans_t km;
  490. km.after = sqlite3_column_int(res, 2);
  491. km.before = sqlite3_column_int(res, 3);
  492. km.min_dist = sqlite3_column_int(res, 4);
  493. km.max_dist = sqlite3_column_int(res, 5);
  494. km.cluster_centers.resize(2);
  495. km.cluster_centers[0].set_size(Cfg.lag_n + 1);
  496. km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
  497. km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
  498. km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
  499. km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
  500. km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
  501. km.cluster_centers[0](5) = sqlite3_column_double(res, 11);
  502. km.cluster_centers[1].set_size(Cfg.lag_n + 1);
  503. km.cluster_centers[1](0) = sqlite3_column_double(res, 12);
  504. km.cluster_centers[1](1) = sqlite3_column_double(res, 13);
  505. km.cluster_centers[1](2) = sqlite3_column_double(res, 14);
  506. km.cluster_centers[1](3) = sqlite3_column_double(res, 15);
  507. km.cluster_centers[1](4) = sqlite3_column_double(res, 16);
  508. km.cluster_centers[1](5) = sqlite3_column_double(res, 17);
  509. dim->km_contexts.push_back(km);
  510. }
  511. if (!dim->km_contexts.empty()) {
  512. dim->ts = TRAINING_STATUS_TRAINED;
  513. }
  514. netdata_mutex_unlock(&dim->mutex);
  515. if (unlikely(rc != SQLITE_DONE))
  516. error_report("Failed to load models, rc = %d", rc);
  517. rc = sqlite3_reset(res);
  518. if (unlikely(rc != SQLITE_OK))
  519. error_report("Failed to reset statement when loading models, rc = %d", rc);
  520. return 0;
  521. bind_fail:
  522. error_report("Failed to bind parameter %d to load models, rc = %d", param, rc);
  523. rc = sqlite3_reset(res);
  524. if (unlikely(rc != SQLITE_OK))
  525. error_report("Failed to reset statement to load models, rc = %d", rc);
  526. return 1;
  527. }
  528. static enum ml_training_result
  529. ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
  530. {
  531. worker_is_busy(WORKER_TRAIN_QUERY);
  532. auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
  533. ml_training_response_t training_response = P.second;
  534. if (training_response.result != TRAINING_RESULT_OK) {
  535. netdata_mutex_lock(&dim->mutex);
  536. dim->mt = METRIC_TYPE_CONSTANT;
  537. switch (dim->ts) {
  538. case TRAINING_STATUS_PENDING_WITH_MODEL:
  539. dim->ts = TRAINING_STATUS_TRAINED;
  540. break;
  541. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  542. dim->ts = TRAINING_STATUS_UNTRAINED;
  543. break;
  544. default:
  545. break;
  546. }
  547. dim->suppression_anomaly_counter = 0;
  548. dim->suppression_window_counter = 0;
  549. dim->tr = training_response;
  550. dim->last_training_time = training_response.last_entry_on_response;
  551. enum ml_training_result result = training_response.result;
  552. netdata_mutex_unlock(&dim->mutex);
  553. return result;
  554. }
  555. // compute kmeans
  556. worker_is_busy(WORKER_TRAIN_KMEANS);
  557. {
  558. memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
  559. training_response.total_values * sizeof(calculated_number_t));
  560. ml_features_t features = {
  561. Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
  562. training_thread->scratch_training_cns, training_response.total_values,
  563. training_thread->training_cns, training_response.total_values,
  564. training_thread->training_samples
  565. };
  566. ml_features_preprocess(&features);
  567. ml_kmeans_init(&dim->kmeans);
  568. ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t);
  569. }
  570. // update models
  571. worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
  572. {
  573. netdata_mutex_lock(&dim->mutex);
  574. if (dim->km_contexts.size() < Cfg.num_models_to_use) {
  575. dim->km_contexts.push_back(std::move(dim->kmeans));
  576. } else {
  577. bool can_drop_middle_km = false;
  578. if (Cfg.num_models_to_use > 2) {
  579. const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1];
  580. const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2];
  581. const ml_kmeans_t *new_km = &dim->kmeans;
  582. can_drop_middle_km = (middle_km->after < old_km->before) &&
  583. (middle_km->before > new_km->after);
  584. }
  585. if (can_drop_middle_km) {
  586. dim->km_contexts.back() = dim->kmeans;
  587. } else {
  588. std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
  589. dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
  590. }
  591. }
  592. dim->mt = METRIC_TYPE_CONSTANT;
  593. dim->ts = TRAINING_STATUS_TRAINED;
  594. dim->suppression_anomaly_counter = 0;
  595. dim->suppression_window_counter = 0;
  596. dim->tr = training_response;
  597. dim->last_training_time = rrddim_last_entry_s(dim->rd);
  598. // Add the newly generated model to the list of pending models to flush
  599. ml_model_info_t model_info;
  600. uuid_copy(model_info.metric_uuid, dim->rd->metric_uuid);
  601. model_info.kmeans = dim->km_contexts.back();
  602. training_thread->pending_model_info.push_back(model_info);
  603. netdata_mutex_unlock(&dim->mutex);
  604. }
  605. return training_response.result;
  606. }
  607. static void
  608. ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
  609. {
  610. switch (dim->mt) {
  611. case METRIC_TYPE_CONSTANT:
  612. return;
  613. default:
  614. break;
  615. }
  616. bool schedule_for_training = false;
  617. switch (dim->ts) {
  618. case TRAINING_STATUS_PENDING_WITH_MODEL:
  619. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  620. schedule_for_training = false;
  621. break;
  622. case TRAINING_STATUS_UNTRAINED:
  623. schedule_for_training = true;
  624. dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL;
  625. break;
  626. case TRAINING_STATUS_SILENCED:
  627. case TRAINING_STATUS_TRAINED:
  628. if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) {
  629. schedule_for_training = true;
  630. dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
  631. }
  632. break;
  633. }
  634. if (schedule_for_training) {
  635. ml_training_request_t req;
  636. memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1);
  637. req.chart_id = string_dup(dim->rd->rrdset->id);
  638. req.dimension_id = string_dup(dim->rd->id);
  639. req.request_time = curr_time;
  640. req.first_entry_on_request = rrddim_first_entry_s(dim->rd);
  641. req.last_entry_on_request = rrddim_last_entry_s(dim->rd);
  642. ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
  643. ml_queue_push(host->training_queue, req);
  644. }
  645. }
  646. static bool
  647. ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists)
  648. {
  649. // Nothing to do if ML is disabled for this dimension
  650. if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED)
  651. return false;
  652. // Don't treat values that don't exist as anomalous
  653. if (!exists) {
  654. dim->cns.clear();
  655. return false;
  656. }
  657. // Save the value and return if we don't have enough values for a sample
  658. unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n;
  659. if (dim->cns.size() < n) {
  660. dim->cns.push_back(value);
  661. return false;
  662. }
  663. // Push the value and check if it's different from the last one
  664. bool same_value = true;
  665. std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns));
  666. if (dim->cns[n - 1] != value)
  667. same_value = false;
  668. dim->cns[n - 1] = value;
  669. // Create the sample
  670. assert((n * (Cfg.lag_n + 1) <= 128) &&
  671. "Static buffers too small to perform prediction. "
  672. "This should not be possible with the default clamping of feature extraction options");
  673. calculated_number_t src_cns[128];
  674. calculated_number_t dst_cns[128];
  675. memset(src_cns, 0, n * (Cfg.lag_n + 1) * sizeof(calculated_number_t));
  676. memcpy(src_cns, dim->cns.data(), n * sizeof(calculated_number_t));
  677. memcpy(dst_cns, dim->cns.data(), n * sizeof(calculated_number_t));
  678. ml_features_t features = {
  679. Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
  680. dst_cns, n, src_cns, n,
  681. dim->feature
  682. };
  683. ml_features_preprocess(&features);
  684. /*
  685. * Lock to predict and possibly schedule the dimension for training
  686. */
  687. if (netdata_mutex_trylock(&dim->mutex) != 0)
  688. return false;
  689. // Mark the metric time as variable if we received different values
  690. if (!same_value)
  691. dim->mt = METRIC_TYPE_VARIABLE;
  692. // Decide if the dimension needs to be scheduled for training
  693. ml_dimension_schedule_for_training(dim, curr_time);
  694. // Nothing to do if we don't have a model
  695. switch (dim->ts) {
  696. case TRAINING_STATUS_UNTRAINED:
  697. case TRAINING_STATUS_PENDING_WITHOUT_MODEL: {
  698. case TRAINING_STATUS_SILENCED:
  699. netdata_mutex_unlock(&dim->mutex);
  700. return false;
  701. }
  702. default:
  703. break;
  704. }
  705. dim->suppression_window_counter++;
  706. /*
  707. * Use the KMeans models to check if the value is anomalous
  708. */
  709. size_t sum = 0;
  710. size_t models_consulted = 0;
  711. for (const auto &km_ctx : dim->km_contexts) {
  712. models_consulted++;
  713. calculated_number_t anomaly_score = ml_kmeans_anomaly_score(&km_ctx, features.preprocessed_features[0]);
  714. if (anomaly_score == std::numeric_limits<calculated_number_t>::quiet_NaN())
  715. continue;
  716. if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) {
  717. global_statistics_ml_models_consulted(models_consulted);
  718. netdata_mutex_unlock(&dim->mutex);
  719. return false;
  720. }
  721. sum += 1;
  722. }
  723. dim->suppression_anomaly_counter += sum ? 1 : 0;
  724. if ((dim->suppression_anomaly_counter >= Cfg.suppression_threshold) &&
  725. (dim->suppression_window_counter >= Cfg.suppression_window)) {
  726. dim->ts = TRAINING_STATUS_SILENCED;
  727. }
  728. netdata_mutex_unlock(&dim->mutex);
  729. global_statistics_ml_models_consulted(models_consulted);
  730. return sum;
  731. }
  732. /*
  733. * Chart
  734. */
  735. static bool
  736. ml_chart_is_available_for_ml(ml_chart_t *chart)
  737. {
  738. return rrdset_is_available_for_exporting_and_alarms(chart->rs);
  739. }
  740. void
  741. ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous)
  742. {
  743. switch (dim->mls) {
  744. case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
  745. chart->mls.num_machine_learning_status_disabled_sp++;
  746. return;
  747. case MACHINE_LEARNING_STATUS_ENABLED: {
  748. chart->mls.num_machine_learning_status_enabled++;
  749. switch (dim->mt) {
  750. case METRIC_TYPE_CONSTANT:
  751. chart->mls.num_metric_type_constant++;
  752. chart->mls.num_training_status_trained++;
  753. chart->mls.num_normal_dimensions++;
  754. return;
  755. case METRIC_TYPE_VARIABLE:
  756. chart->mls.num_metric_type_variable++;
  757. break;
  758. }
  759. switch (dim->ts) {
  760. case TRAINING_STATUS_UNTRAINED:
  761. chart->mls.num_training_status_untrained++;
  762. return;
  763. case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
  764. chart->mls.num_training_status_pending_without_model++;
  765. return;
  766. case TRAINING_STATUS_TRAINED:
  767. chart->mls.num_training_status_trained++;
  768. chart->mls.num_anomalous_dimensions += is_anomalous;
  769. chart->mls.num_normal_dimensions += !is_anomalous;
  770. return;
  771. case TRAINING_STATUS_PENDING_WITH_MODEL:
  772. chart->mls.num_training_status_pending_with_model++;
  773. chart->mls.num_anomalous_dimensions += is_anomalous;
  774. chart->mls.num_normal_dimensions += !is_anomalous;
  775. return;
  776. case TRAINING_STATUS_SILENCED:
  777. chart->mls.num_training_status_silenced++;
  778. chart->mls.num_training_status_trained++;
  779. chart->mls.num_anomalous_dimensions += is_anomalous;
  780. chart->mls.num_normal_dimensions += !is_anomalous;
  781. return;
  782. }
  783. return;
  784. }
  785. }
  786. }
  787. /*
  788. * Host detection & training functions
  789. */
  790. #define WORKER_JOB_DETECTION_COLLECT_STATS 0
  791. #define WORKER_JOB_DETECTION_DIM_CHART 1
  792. #define WORKER_JOB_DETECTION_HOST_CHART 2
  793. #define WORKER_JOB_DETECTION_STATS 3
  794. static void
  795. ml_host_detect_once(ml_host_t *host)
  796. {
  797. worker_is_busy(WORKER_JOB_DETECTION_COLLECT_STATS);
  798. host->mls = {};
  799. ml_machine_learning_stats_t mls_copy = {};
  800. {
  801. netdata_mutex_lock(&host->mutex);
  802. /*
  803. * prediction/detection stats
  804. */
  805. void *rsp = NULL;
  806. rrdset_foreach_read(rsp, host->rh) {
  807. RRDSET *rs = static_cast<RRDSET *>(rsp);
  808. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  809. if (!chart)
  810. continue;
  811. if (!ml_chart_is_available_for_ml(chart))
  812. continue;
  813. ml_machine_learning_stats_t chart_mls = chart->mls;
  814. host->mls.num_machine_learning_status_enabled += chart_mls.num_machine_learning_status_enabled;
  815. host->mls.num_machine_learning_status_disabled_sp += chart_mls.num_machine_learning_status_disabled_sp;
  816. host->mls.num_metric_type_constant += chart_mls.num_metric_type_constant;
  817. host->mls.num_metric_type_variable += chart_mls.num_metric_type_variable;
  818. host->mls.num_training_status_untrained += chart_mls.num_training_status_untrained;
  819. host->mls.num_training_status_pending_without_model += chart_mls.num_training_status_pending_without_model;
  820. host->mls.num_training_status_trained += chart_mls.num_training_status_trained;
  821. host->mls.num_training_status_pending_with_model += chart_mls.num_training_status_pending_with_model;
  822. host->mls.num_training_status_silenced += chart_mls.num_training_status_silenced;
  823. host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
  824. host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
  825. }
  826. rrdset_foreach_done(rsp);
  827. host->host_anomaly_rate = 0.0;
  828. size_t NumActiveDimensions = host->mls.num_anomalous_dimensions + host->mls.num_normal_dimensions;
  829. if (NumActiveDimensions)
  830. host->host_anomaly_rate = static_cast<double>(host->mls.num_anomalous_dimensions) / NumActiveDimensions;
  831. mls_copy = host->mls;
  832. netdata_mutex_unlock(&host->mutex);
  833. }
  834. worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
  835. ml_update_dimensions_chart(host, mls_copy);
  836. worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
  837. ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
  838. }
  839. typedef struct {
  840. RRDHOST_ACQUIRED *acq_rh;
  841. RRDSET_ACQUIRED *acq_rs;
  842. RRDDIM_ACQUIRED *acq_rd;
  843. ml_dimension_t *dim;
  844. } ml_acquired_dimension_t;
  845. static ml_acquired_dimension_t
  846. ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id)
  847. {
  848. RRDHOST_ACQUIRED *acq_rh = NULL;
  849. RRDSET_ACQUIRED *acq_rs = NULL;
  850. RRDDIM_ACQUIRED *acq_rd = NULL;
  851. ml_dimension_t *dim = NULL;
  852. rrd_rdlock();
  853. acq_rh = rrdhost_find_and_acquire(machine_guid);
  854. if (acq_rh) {
  855. RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh);
  856. if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) {
  857. acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id));
  858. if (acq_rs) {
  859. RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs);
  860. if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) {
  861. acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
  862. if (acq_rd) {
  863. RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
  864. if (rd)
  865. dim = (ml_dimension_t *) rd->ml_dimension;
  866. }
  867. }
  868. }
  869. }
  870. }
  871. rrd_unlock();
  872. ml_acquired_dimension_t acq_dim = {
  873. acq_rh, acq_rs, acq_rd, dim
  874. };
  875. return acq_dim;
  876. }
  877. static void
  878. ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
  879. {
  880. if (acq_dim.acq_rd)
  881. rrddim_acquired_release(acq_dim.acq_rd);
  882. if (acq_dim.acq_rs)
  883. rrdset_acquired_release(acq_dim.acq_rs);
  884. if (acq_dim.acq_rh)
  885. rrdhost_acquired_release(acq_dim.acq_rh);
  886. }
  887. static enum ml_training_result
  888. ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
  889. {
  890. if (!acq_dim.dim)
  891. return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
  892. return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
  893. }
  894. static void *
  895. ml_detect_main(void *arg)
  896. {
  897. UNUSED(arg);
  898. worker_register("MLDETECT");
  899. worker_register_job_name(WORKER_JOB_DETECTION_COLLECT_STATS, "collect stats");
  900. worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
  901. worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
  902. worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
  903. heartbeat_t hb;
  904. heartbeat_init(&hb);
  905. while (!Cfg.detection_stop) {
  906. worker_is_idle();
  907. heartbeat_next(&hb, USEC_PER_SEC);
  908. RRDHOST *rh;
  909. rrd_rdlock();
  910. rrdhost_foreach_read(rh) {
  911. if (!rh->ml_host)
  912. continue;
  913. ml_host_detect_once((ml_host_t *) rh->ml_host);
  914. }
  915. rrd_unlock();
  916. if (Cfg.enable_statistics_charts) {
  917. // collect and update training thread stats
  918. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  919. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  920. netdata_mutex_lock(&training_thread->nd_mutex);
  921. ml_training_stats_t training_stats = training_thread->training_stats;
  922. training_thread->training_stats = {};
  923. netdata_mutex_unlock(&training_thread->nd_mutex);
  924. // calc the avg values
  925. if (training_stats.num_popped_items) {
  926. training_stats.queue_size /= training_stats.num_popped_items;
  927. training_stats.allotted_ut /= training_stats.num_popped_items;
  928. training_stats.consumed_ut /= training_stats.num_popped_items;
  929. training_stats.remaining_ut /= training_stats.num_popped_items;
  930. } else {
  931. training_stats.queue_size = ml_queue_size(training_thread->training_queue);
  932. training_stats.consumed_ut = 0;
  933. training_stats.remaining_ut = training_stats.allotted_ut;
  934. training_stats.training_result_ok = 0;
  935. training_stats.training_result_invalid_query_time_range = 0;
  936. training_stats.training_result_not_enough_collected_values = 0;
  937. training_stats.training_result_null_acquired_dimension = 0;
  938. training_stats.training_result_chart_under_replication = 0;
  939. }
  940. ml_update_training_statistics_chart(training_thread, training_stats);
  941. }
  942. }
  943. }
  944. return NULL;
  945. }
  946. /*
  947. * Public API
  948. */
  949. bool ml_capable()
  950. {
  951. return true;
  952. }
  953. bool ml_enabled(RRDHOST *rh)
  954. {
  955. if (!rh)
  956. return false;
  957. if (!Cfg.enable_anomaly_detection)
  958. return false;
  959. if (simple_pattern_matches(Cfg.sp_host_to_skip, rrdhost_hostname(rh)))
  960. return false;
  961. return true;
  962. }
  963. bool ml_streaming_enabled()
  964. {
  965. return Cfg.stream_anomaly_detection_charts;
  966. }
  967. void ml_host_new(RRDHOST *rh)
  968. {
  969. if (!ml_enabled(rh))
  970. return;
  971. ml_host_t *host = new ml_host_t();
  972. host->rh = rh;
  973. host->mls = ml_machine_learning_stats_t();
  974. //host->ts = ml_training_stats_t();
  975. static std::atomic<size_t> times_called(0);
  976. host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
  977. host->host_anomaly_rate = 0.0;
  978. netdata_mutex_init(&host->mutex);
  979. rh->ml_host = (rrd_ml_host_t *) host;
  980. }
  981. void ml_host_delete(RRDHOST *rh)
  982. {
  983. ml_host_t *host = (ml_host_t *) rh->ml_host;
  984. if (!host)
  985. return;
  986. netdata_mutex_destroy(&host->mutex);
  987. delete host;
  988. rh->ml_host = NULL;
  989. }
  990. void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
  991. {
  992. ml_host_t *host = (ml_host_t *) rh->ml_host;
  993. if (!host) {
  994. buffer_json_member_add_boolean(wb, "enabled", false);
  995. return;
  996. }
  997. buffer_json_member_add_uint64(wb, "version", 1);
  998. buffer_json_member_add_boolean(wb, "enabled", Cfg.enable_anomaly_detection);
  999. buffer_json_member_add_uint64(wb, "min-train-samples", Cfg.min_train_samples);
  1000. buffer_json_member_add_uint64(wb, "max-train-samples", Cfg.max_train_samples);
  1001. buffer_json_member_add_uint64(wb, "train-every", Cfg.train_every);
  1002. buffer_json_member_add_uint64(wb, "diff-n", Cfg.diff_n);
  1003. buffer_json_member_add_uint64(wb, "smooth-n", Cfg.smooth_n);
  1004. buffer_json_member_add_uint64(wb, "lag-n", Cfg.lag_n);
  1005. buffer_json_member_add_double(wb, "random-sampling-ratio", Cfg.random_sampling_ratio);
  1006. buffer_json_member_add_uint64(wb, "max-kmeans-iters", Cfg.random_sampling_ratio);
  1007. buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold);
  1008. buffer_json_member_add_string(wb, "anomaly-detection-grouping-method",
  1009. time_grouping_method2string(Cfg.anomaly_detection_grouping_method));
  1010. buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration);
  1011. buffer_json_member_add_string(wb, "hosts-to-skip", Cfg.hosts_to_skip.c_str());
  1012. buffer_json_member_add_string(wb, "charts-to-skip", Cfg.charts_to_skip.c_str());
  1013. }
  1014. void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
  1015. {
  1016. ml_host_t *host = (ml_host_t *) rh->ml_host;
  1017. if (!host)
  1018. return;
  1019. netdata_mutex_lock(&host->mutex);
  1020. buffer_json_member_add_uint64(wb, "version", 1);
  1021. buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions);
  1022. buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions);
  1023. buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions +
  1024. host->mls.num_normal_dimensions);
  1025. buffer_json_member_add_uint64(wb, "trained-dimensions", host->mls.num_training_status_trained +
  1026. host->mls.num_training_status_pending_with_model);
  1027. netdata_mutex_unlock(&host->mutex);
  1028. }
  1029. void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
  1030. {
  1031. UNUSED(rh);
  1032. UNUSED(wb);
  1033. // TODO: To be implemented
  1034. error("Fetching KMeans models is not supported yet");
  1035. }
  1036. void ml_chart_new(RRDSET *rs)
  1037. {
  1038. ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
  1039. if (!host)
  1040. return;
  1041. ml_chart_t *chart = new ml_chart_t();
  1042. chart->rs = rs;
  1043. chart->mls = ml_machine_learning_stats_t();
  1044. netdata_mutex_init(&chart->mutex);
  1045. rs->ml_chart = (rrd_ml_chart_t *) chart;
  1046. }
  1047. void ml_chart_delete(RRDSET *rs)
  1048. {
  1049. ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
  1050. if (!host)
  1051. return;
  1052. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1053. netdata_mutex_destroy(&chart->mutex);
  1054. delete chart;
  1055. rs->ml_chart = NULL;
  1056. }
  1057. bool ml_chart_update_begin(RRDSET *rs)
  1058. {
  1059. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1060. if (!chart)
  1061. return false;
  1062. netdata_mutex_lock(&chart->mutex);
  1063. chart->mls = {};
  1064. return true;
  1065. }
  1066. void ml_chart_update_end(RRDSET *rs)
  1067. {
  1068. ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
  1069. if (!chart)
  1070. return;
  1071. netdata_mutex_unlock(&chart->mutex);
  1072. }
  1073. void ml_dimension_new(RRDDIM *rd)
  1074. {
  1075. ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
  1076. if (!chart)
  1077. return;
  1078. ml_dimension_t *dim = new ml_dimension_t();
  1079. dim->rd = rd;
  1080. dim->mt = METRIC_TYPE_CONSTANT;
  1081. dim->ts = TRAINING_STATUS_UNTRAINED;
  1082. dim->last_training_time = 0;
  1083. ml_kmeans_init(&dim->kmeans);
  1084. if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset)))
  1085. dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART;
  1086. else
  1087. dim->mls = MACHINE_LEARNING_STATUS_ENABLED;
  1088. netdata_mutex_init(&dim->mutex);
  1089. dim->km_contexts.reserve(Cfg.num_models_to_use);
  1090. rd->ml_dimension = (rrd_ml_dimension_t *) dim;
  1091. metaqueue_ml_load_models(rd);
  1092. }
  1093. void ml_dimension_delete(RRDDIM *rd)
  1094. {
  1095. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  1096. if (!dim)
  1097. return;
  1098. netdata_mutex_destroy(&dim->mutex);
  1099. delete dim;
  1100. rd->ml_dimension = NULL;
  1101. }
  1102. bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists)
  1103. {
  1104. ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
  1105. if (!dim)
  1106. return false;
  1107. ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
  1108. bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists);
  1109. ml_chart_update_dimension(chart, dim, is_anomalous);
  1110. return is_anomalous;
  1111. }
  1112. static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
  1113. int rc = db_execute(db, "BEGIN TRANSACTION;");
  1114. int op_no = 1;
  1115. if (!rc) {
  1116. op_no++;
  1117. for (const auto &pending_model: training_thread->pending_model_info) {
  1118. if (!rc)
  1119. rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
  1120. if (!rc)
  1121. rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
  1122. }
  1123. }
  1124. if (!rc) {
  1125. op_no++;
  1126. rc = db_execute(db, "COMMIT TRANSACTION;");
  1127. }
  1128. // try to rollback transaction if we got any failures
  1129. if (rc) {
  1130. error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
  1131. op_no++;
  1132. rc = db_execute(db, "ROLLBACK;");
  1133. if (rc)
  1134. error("ML transaction rollback failed with rc=%d", rc);
  1135. }
  1136. training_thread->pending_model_info.clear();
  1137. }
  1138. static void *ml_train_main(void *arg) {
  1139. ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
  1140. char worker_name[1024];
  1141. snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
  1142. worker_register("MLTRAIN");
  1143. worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
  1144. worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
  1145. worker_register_job_name(WORKER_TRAIN_QUERY, "query");
  1146. worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
  1147. worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
  1148. worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
  1149. worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
  1150. worker_register_job_name(WORKER_TRAIN_FLUSH_MODELS, "flush models");
  1151. while (!Cfg.training_stop) {
  1152. worker_is_busy(WORKER_TRAIN_QUEUE_POP);
  1153. ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
  1154. // we know this thread has been cancelled, when the queue starts
  1155. // returning "null" requests without blocking on queue's pop().
  1156. if (training_req.chart_id == NULL)
  1157. break;
  1158. size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
  1159. usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size;
  1160. if (allotted_ut > USEC_PER_SEC)
  1161. allotted_ut = USEC_PER_SEC;
  1162. usec_t start_ut = now_monotonic_usec();
  1163. enum ml_training_result training_res;
  1164. {
  1165. worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
  1166. ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
  1167. training_req.machine_guid,
  1168. training_req.chart_id,
  1169. training_req.dimension_id);
  1170. training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
  1171. string_freez(training_req.chart_id);
  1172. string_freez(training_req.dimension_id);
  1173. worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION);
  1174. ml_acquired_dimension_release(acq_dim);
  1175. }
  1176. usec_t consumed_ut = now_monotonic_usec() - start_ut;
  1177. usec_t remaining_ut = 0;
  1178. if (consumed_ut < allotted_ut)
  1179. remaining_ut = allotted_ut - consumed_ut;
  1180. if (Cfg.enable_statistics_charts) {
  1181. worker_is_busy(WORKER_TRAIN_UPDATE_HOST);
  1182. netdata_mutex_lock(&training_thread->nd_mutex);
  1183. training_thread->training_stats.queue_size += queue_size;
  1184. training_thread->training_stats.num_popped_items += 1;
  1185. training_thread->training_stats.allotted_ut += allotted_ut;
  1186. training_thread->training_stats.consumed_ut += consumed_ut;
  1187. training_thread->training_stats.remaining_ut += remaining_ut;
  1188. switch (training_res) {
  1189. case TRAINING_RESULT_OK:
  1190. training_thread->training_stats.training_result_ok += 1;
  1191. break;
  1192. case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
  1193. training_thread->training_stats.training_result_invalid_query_time_range += 1;
  1194. break;
  1195. case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
  1196. training_thread->training_stats.training_result_not_enough_collected_values += 1;
  1197. break;
  1198. case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
  1199. training_thread->training_stats.training_result_null_acquired_dimension += 1;
  1200. break;
  1201. case TRAINING_RESULT_CHART_UNDER_REPLICATION:
  1202. training_thread->training_stats.training_result_chart_under_replication += 1;
  1203. break;
  1204. }
  1205. netdata_mutex_unlock(&training_thread->nd_mutex);
  1206. }
  1207. if (training_thread->pending_model_info.size() >= Cfg.flush_models_batch_size) {
  1208. worker_is_busy(WORKER_TRAIN_FLUSH_MODELS);
  1209. netdata_mutex_lock(&db_mutex);
  1210. ml_flush_pending_models(training_thread);
  1211. netdata_mutex_unlock(&db_mutex);
  1212. continue;
  1213. }
  1214. worker_is_idle();
  1215. std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
  1216. }
  1217. return NULL;
  1218. }
  1219. void ml_init()
  1220. {
  1221. // Read config values
  1222. ml_config_load(&Cfg);
  1223. if (!Cfg.enable_anomaly_detection)
  1224. return;
  1225. // Generate random numbers to efficiently sample the features we need
  1226. // for KMeans clustering.
  1227. std::random_device RD;
  1228. std::mt19937 Gen(RD());
  1229. Cfg.random_nums.reserve(Cfg.max_train_samples);
  1230. for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
  1231. Cfg.random_nums.push_back(Gen());
  1232. // init training thread-specific data
  1233. Cfg.training_threads.resize(Cfg.num_training_threads);
  1234. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1235. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1236. size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
  1237. training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training]();
  1238. training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
  1239. training_thread->id = idx;
  1240. training_thread->training_queue = ml_queue_init();
  1241. training_thread->pending_model_info.reserve(Cfg.flush_models_batch_size);
  1242. netdata_mutex_init(&training_thread->nd_mutex);
  1243. }
  1244. // open sqlite db
  1245. char path[FILENAME_MAX];
  1246. snprintfz(path, FILENAME_MAX - 1, "%s/%s", netdata_configured_cache_dir, "ml.db");
  1247. int rc = sqlite3_open(path, &db);
  1248. if (rc != SQLITE_OK) {
  1249. error_report("Failed to initialize database at %s, due to \"%s\"", path, sqlite3_errstr(rc));
  1250. sqlite3_close(db);
  1251. db = NULL;
  1252. }
  1253. if (db) {
  1254. char *err = NULL;
  1255. int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
  1256. if (rc != SQLITE_OK) {
  1257. error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
  1258. sqlite3_close(db);
  1259. sqlite3_free(err);
  1260. db = NULL;
  1261. }
  1262. }
  1263. }
  1264. void ml_fini() {
  1265. if (!Cfg.enable_anomaly_detection)
  1266. return;
  1267. int rc = sqlite3_close_v2(db);
  1268. if (unlikely(rc != SQLITE_OK))
  1269. error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
  1270. }
  1271. void ml_start_threads() {
  1272. if (!Cfg.enable_anomaly_detection)
  1273. return;
  1274. // start detection & training threads
  1275. Cfg.detection_stop = false;
  1276. Cfg.training_stop = false;
  1277. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1278. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
  1279. netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
  1280. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1281. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1282. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
  1283. netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
  1284. }
  1285. }
  1286. void ml_stop_threads()
  1287. {
  1288. if (!Cfg.enable_anomaly_detection)
  1289. return;
  1290. Cfg.detection_stop = true;
  1291. Cfg.training_stop = true;
  1292. netdata_thread_cancel(Cfg.detection_thread);
  1293. netdata_thread_join(Cfg.detection_thread, NULL);
  1294. // signal the training queue of each thread
  1295. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1296. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1297. ml_queue_signal(training_thread->training_queue);
  1298. }
  1299. // cancel training threads
  1300. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1301. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1302. netdata_thread_cancel(training_thread->nd_thread);
  1303. }
  1304. // join training threads
  1305. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1306. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1307. netdata_thread_join(training_thread->nd_thread, NULL);
  1308. }
  1309. // clear training thread data
  1310. for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
  1311. ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
  1312. delete[] training_thread->training_cns;
  1313. delete[] training_thread->scratch_training_cns;
  1314. ml_queue_destroy(training_thread->training_queue);
  1315. netdata_mutex_destroy(&training_thread->nd_mutex);
  1316. }
  1317. }