Host.cc 12 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "Config.h"
  3. #include "Host.h"
  4. #include "Queue.h"
  5. #include "ADCharts.h"
  6. #include "json/single_include/nlohmann/json.hpp"
  7. using namespace ml;
  8. void Host::addChart(Chart *C) {
  9. std::lock_guard<Mutex> L(M);
  10. Charts[C->getRS()] = C;
  11. }
  12. void Host::removeChart(Chart *C) {
  13. std::lock_guard<Mutex> L(M);
  14. Charts.erase(C->getRS());
  15. }
  16. void Host::getConfigAsJson(nlohmann::json &Json) const {
  17. Json["version"] = 1;
  18. Json["enabled"] = Cfg.EnableAnomalyDetection;
  19. Json["min-train-samples"] = Cfg.MinTrainSamples;
  20. Json["max-train-samples"] = Cfg.MaxTrainSamples;
  21. Json["train-every"] = Cfg.TrainEvery;
  22. Json["diff-n"] = Cfg.DiffN;
  23. Json["smooth-n"] = Cfg.SmoothN;
  24. Json["lag-n"] = Cfg.LagN;
  25. Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
  26. Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
  27. Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
  28. Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
  29. Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod);
  30. Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration;
  31. Json["hosts-to-skip"] = Cfg.HostsToSkip;
  32. Json["charts-to-skip"] = Cfg.ChartsToSkip;
  33. }
  34. void Host::getModelsAsJson(nlohmann::json &Json) {
  35. std::lock_guard<Mutex> L(M);
  36. for (auto &CP : Charts) {
  37. Chart *C = CP.second;
  38. C->getModelsAsJson(Json);
  39. }
  40. }
  41. #define WORKER_JOB_DETECTION_PREP 0
  42. #define WORKER_JOB_DETECTION_DIM_CHART 1
  43. #define WORKER_JOB_DETECTION_HOST_CHART 2
  44. #define WORKER_JOB_DETECTION_STATS 3
  45. #define WORKER_JOB_DETECTION_RESOURCES 4
  46. void Host::detectOnce() {
  47. worker_is_busy(WORKER_JOB_DETECTION_PREP);
  48. MLS = {};
  49. MachineLearningStats MLSCopy = {};
  50. TrainingStats TSCopy = {};
  51. {
  52. std::lock_guard<Mutex> L(M);
  53. /*
  54. * prediction/detection stats
  55. */
  56. for (auto &CP : Charts) {
  57. Chart *C = CP.second;
  58. if (!C->isAvailableForML())
  59. continue;
  60. MachineLearningStats ChartMLS = C->getMLS();
  61. MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled;
  62. MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE;
  63. MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP;
  64. MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant;
  65. MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable;
  66. MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained;
  67. MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel;
  68. MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained;
  69. MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel;
  70. MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions;
  71. MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions;
  72. }
  73. HostAnomalyRate = 0.0;
  74. size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
  75. if (NumActiveDimensions)
  76. HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions;
  77. MLSCopy = MLS;
  78. /*
  79. * training stats
  80. */
  81. TSCopy = TS;
  82. TS.QueueSize = 0;
  83. TS.NumPoppedItems = 0;
  84. TS.AllottedUT = 0;
  85. TS.ConsumedUT = 0;
  86. TS.RemainingUT = 0;
  87. TS.TrainingResultOk = 0;
  88. TS.TrainingResultInvalidQueryTimeRange = 0;
  89. TS.TrainingResultNotEnoughCollectedValues = 0;
  90. TS.TrainingResultNullAcquiredDimension = 0;
  91. TS.TrainingResultChartUnderReplication = 0;
  92. }
  93. // Calc the avg values
  94. if (TSCopy.NumPoppedItems) {
  95. TSCopy.QueueSize /= TSCopy.NumPoppedItems;
  96. TSCopy.AllottedUT /= TSCopy.NumPoppedItems;
  97. TSCopy.ConsumedUT /= TSCopy.NumPoppedItems;
  98. TSCopy.RemainingUT /= TSCopy.NumPoppedItems;
  99. TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems;
  100. TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems;
  101. TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems;
  102. TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems;
  103. TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems;
  104. } else {
  105. TSCopy.QueueSize = 0;
  106. TSCopy.AllottedUT = 0;
  107. TSCopy.ConsumedUT = 0;
  108. TSCopy.RemainingUT = 0;
  109. }
  110. if(!RH)
  111. return;
  112. worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
  113. updateDimensionsChart(RH, MLSCopy);
  114. worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
  115. updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
  116. #ifdef NETDATA_ML_RESOURCE_CHARTS
  117. worker_is_busy(WORKER_JOB_DETECTION_RESOURCES);
  118. struct rusage PredictionRU;
  119. getrusage(RUSAGE_THREAD, &PredictionRU);
  120. updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
  121. #endif
  122. worker_is_busy(WORKER_JOB_DETECTION_STATS);
  123. updateTrainingStatisticsChart(RH, TSCopy);
  124. }
  125. class AcquiredDimension {
  126. public:
  127. static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) {
  128. RRDDIM_ACQUIRED *AcqRD = nullptr;
  129. Dimension *D = nullptr;
  130. RRDSET *RS = rrdset_find(RH, string2str(ChartId));
  131. if (RS) {
  132. AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
  133. if (AcqRD) {
  134. RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD);
  135. if (RD)
  136. D = reinterpret_cast<Dimension *>(RD->ml_dimension);
  137. }
  138. }
  139. return AcquiredDimension(AcqRD, D);
  140. }
  141. private:
  142. AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {}
  143. public:
  144. TrainingResult train(const TrainingRequest &TR) {
  145. if (!D)
  146. return TrainingResult::NullAcquiredDimension;
  147. return D->trainModel(TR);
  148. }
  149. ~AcquiredDimension() {
  150. if (AcqRD)
  151. rrddim_acquired_release(AcqRD);
  152. }
  153. private:
  154. RRDDIM_ACQUIRED *AcqRD;
  155. Dimension *D;
  156. };
  157. void Host::scheduleForTraining(TrainingRequest TR) {
  158. TrainingQueue.push(TR);
  159. }
  160. #define WORKER_JOB_TRAINING_FIND 0
  161. #define WORKER_JOB_TRAINING_TRAIN 1
  162. #define WORKER_JOB_TRAINING_STATS 2
  163. void Host::train() {
  164. worker_register("MLTRAIN");
  165. worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
  166. worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
  167. worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
  168. service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
  169. while (service_running(SERVICE_ML_TRAINING)) {
  170. auto P = TrainingQueue.pop();
  171. TrainingRequest TrainingReq = P.first;
  172. size_t Size = P.second;
  173. if (ThreadsCancelled) {
  174. info("Stopping training thread because it was cancelled.");
  175. break;
  176. }
  177. usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
  178. if (AllottedUT > USEC_PER_SEC)
  179. AllottedUT = USEC_PER_SEC;
  180. usec_t StartUT = now_monotonic_usec();
  181. TrainingResult TrainingRes;
  182. {
  183. worker_is_busy(WORKER_JOB_TRAINING_FIND);
  184. AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
  185. worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
  186. TrainingRes = AcqDim.train(TrainingReq);
  187. string_freez(TrainingReq.ChartId);
  188. string_freez(TrainingReq.DimensionId);
  189. }
  190. usec_t ConsumedUT = now_monotonic_usec() - StartUT;
  191. worker_is_busy(WORKER_JOB_TRAINING_STATS);
  192. usec_t RemainingUT = 0;
  193. if (ConsumedUT < AllottedUT)
  194. RemainingUT = AllottedUT - ConsumedUT;
  195. {
  196. std::lock_guard<Mutex> L(M);
  197. if (TS.AllottedUT == 0) {
  198. struct rusage TRU;
  199. getrusage(RUSAGE_THREAD, &TRU);
  200. TS.TrainingRU = TRU;
  201. }
  202. TS.QueueSize += Size;
  203. TS.NumPoppedItems += 1;
  204. TS.AllottedUT += AllottedUT;
  205. TS.ConsumedUT += ConsumedUT;
  206. TS.RemainingUT += RemainingUT;
  207. switch (TrainingRes) {
  208. case TrainingResult::Ok:
  209. TS.TrainingResultOk += 1;
  210. break;
  211. case TrainingResult::InvalidQueryTimeRange:
  212. TS.TrainingResultInvalidQueryTimeRange += 1;
  213. break;
  214. case TrainingResult::NotEnoughCollectedValues:
  215. TS.TrainingResultNotEnoughCollectedValues += 1;
  216. break;
  217. case TrainingResult::NullAcquiredDimension:
  218. TS.TrainingResultNullAcquiredDimension += 1;
  219. break;
  220. case TrainingResult::ChartUnderReplication:
  221. TS.TrainingResultChartUnderReplication += 1;
  222. break;
  223. }
  224. }
  225. worker_is_idle();
  226. std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
  227. worker_is_busy(0);
  228. }
  229. }
  230. void Host::detect() {
  231. worker_register("MLDETECT");
  232. worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep");
  233. worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
  234. worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
  235. worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
  236. worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
  237. service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
  238. heartbeat_t HB;
  239. heartbeat_init(&HB);
  240. while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
  241. worker_is_idle();
  242. heartbeat_next(&HB, (RH ? RH->rrd_update_every : default_rrd_update_every) * USEC_PER_SEC);
  243. detectOnce();
  244. }
  245. }
  246. void Host::getDetectionInfoAsJson(nlohmann::json &Json) const {
  247. Json["version"] = 1;
  248. Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions;
  249. Json["normal-dimensions"] = MLS.NumNormalDimensions;
  250. Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
  251. Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel;
  252. }
  253. void *train_main(void *Arg) {
  254. Host *H = reinterpret_cast<Host *>(Arg);
  255. H->train();
  256. return nullptr;
  257. }
  258. void *detect_main(void *Arg) {
  259. Host *H = reinterpret_cast<Host *>(Arg);
  260. H->detect();
  261. return nullptr;
  262. }
  263. void Host::startAnomalyDetectionThreads() {
  264. if (ThreadsRunning) {
  265. error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH));
  266. return;
  267. }
  268. ThreadsRunning = true;
  269. ThreadsCancelled = false;
  270. ThreadsJoined = false;
  271. char Tag[NETDATA_THREAD_TAG_MAX + 1];
  272. // #define ML_DISABLE_JOINING
  273. snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH));
  274. netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
  275. snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH));
  276. netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
  277. }
  278. void Host::stopAnomalyDetectionThreads(bool join) {
  279. if (!ThreadsRunning) {
  280. error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
  281. return;
  282. }
  283. if(!ThreadsCancelled) {
  284. ThreadsCancelled = true;
  285. // Signal the training queue to stop popping-items
  286. TrainingQueue.signal();
  287. netdata_thread_cancel(TrainingThread);
  288. netdata_thread_cancel(DetectionThread);
  289. }
  290. if (join && !ThreadsJoined) {
  291. ThreadsJoined = true;
  292. ThreadsRunning = false;
  293. // these fail on alpine linux and our CI hangs forever
  294. // failing to compile static builds
  295. // commenting them, until we find a solution
  296. // to enable again:
  297. // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE
  298. netdata_thread_join(TrainingThread, nullptr);
  299. netdata_thread_join(DetectionThread, nullptr);
  300. }
  301. }