123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "Config.h"
- #include "Host.h"
- #include "Queue.h"
- #include "ADCharts.h"
- #include "json/single_include/nlohmann/json.hpp"
- using namespace ml;
- void Host::addChart(Chart *C) {
- std::lock_guard<Mutex> L(M);
- Charts[C->getRS()] = C;
- }
- void Host::removeChart(Chart *C) {
- std::lock_guard<Mutex> L(M);
- Charts.erase(C->getRS());
- }
- void Host::getConfigAsJson(nlohmann::json &Json) const {
- Json["version"] = 1;
- Json["enabled"] = Cfg.EnableAnomalyDetection;
- Json["min-train-samples"] = Cfg.MinTrainSamples;
- Json["max-train-samples"] = Cfg.MaxTrainSamples;
- Json["train-every"] = Cfg.TrainEvery;
- Json["diff-n"] = Cfg.DiffN;
- Json["smooth-n"] = Cfg.SmoothN;
- Json["lag-n"] = Cfg.LagN;
- Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
- Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
- Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
- Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
- Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod);
- Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration;
- Json["hosts-to-skip"] = Cfg.HostsToSkip;
- Json["charts-to-skip"] = Cfg.ChartsToSkip;
- }
- void Host::getModelsAsJson(nlohmann::json &Json) {
- std::lock_guard<Mutex> L(M);
- for (auto &CP : Charts) {
- Chart *C = CP.second;
- C->getModelsAsJson(Json);
- }
- }
- #define WORKER_JOB_DETECTION_PREP 0
- #define WORKER_JOB_DETECTION_DIM_CHART 1
- #define WORKER_JOB_DETECTION_HOST_CHART 2
- #define WORKER_JOB_DETECTION_STATS 3
- #define WORKER_JOB_DETECTION_RESOURCES 4
- void Host::detectOnce() {
- worker_is_busy(WORKER_JOB_DETECTION_PREP);
- MLS = {};
- MachineLearningStats MLSCopy = {};
- TrainingStats TSCopy = {};
- {
- std::lock_guard<Mutex> L(M);
- /*
- * prediction/detection stats
- */
- for (auto &CP : Charts) {
- Chart *C = CP.second;
- if (!C->isAvailableForML())
- continue;
- MachineLearningStats ChartMLS = C->getMLS();
- MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled;
- MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE;
- MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP;
- MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant;
- MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable;
- MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained;
- MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel;
- MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained;
- MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel;
- MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions;
- MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions;
- }
- HostAnomalyRate = 0.0;
- size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
- if (NumActiveDimensions)
- HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions;
- MLSCopy = MLS;
- /*
- * training stats
- */
- TSCopy = TS;
- TS.QueueSize = 0;
- TS.NumPoppedItems = 0;
- TS.AllottedUT = 0;
- TS.ConsumedUT = 0;
- TS.RemainingUT = 0;
- TS.TrainingResultOk = 0;
- TS.TrainingResultInvalidQueryTimeRange = 0;
- TS.TrainingResultNotEnoughCollectedValues = 0;
- TS.TrainingResultNullAcquiredDimension = 0;
- TS.TrainingResultChartUnderReplication = 0;
- }
- // Calc the avg values
- if (TSCopy.NumPoppedItems) {
- TSCopy.QueueSize /= TSCopy.NumPoppedItems;
- TSCopy.AllottedUT /= TSCopy.NumPoppedItems;
- TSCopy.ConsumedUT /= TSCopy.NumPoppedItems;
- TSCopy.RemainingUT /= TSCopy.NumPoppedItems;
- TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems;
- TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems;
- TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems;
- TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems;
- TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems;
- } else {
- TSCopy.QueueSize = 0;
- TSCopy.AllottedUT = 0;
- TSCopy.ConsumedUT = 0;
- TSCopy.RemainingUT = 0;
- }
- if(!RH)
- return;
- worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
- updateDimensionsChart(RH, MLSCopy);
- worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
- updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
- #ifdef NETDATA_ML_RESOURCE_CHARTS
- worker_is_busy(WORKER_JOB_DETECTION_RESOURCES);
- struct rusage PredictionRU;
- getrusage(RUSAGE_THREAD, &PredictionRU);
- updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
- #endif
- worker_is_busy(WORKER_JOB_DETECTION_STATS);
- updateTrainingStatisticsChart(RH, TSCopy);
- }
- class AcquiredDimension {
- public:
- static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) {
- RRDDIM_ACQUIRED *AcqRD = nullptr;
- Dimension *D = nullptr;
- RRDSET *RS = rrdset_find(RH, string2str(ChartId));
- if (RS) {
- AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
- if (AcqRD) {
- RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD);
- if (RD)
- D = reinterpret_cast<Dimension *>(RD->ml_dimension);
- }
- }
- return AcquiredDimension(AcqRD, D);
- }
- private:
- AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {}
- public:
- TrainingResult train(const TrainingRequest &TR) {
- if (!D)
- return TrainingResult::NullAcquiredDimension;
- return D->trainModel(TR);
- }
- ~AcquiredDimension() {
- if (AcqRD)
- rrddim_acquired_release(AcqRD);
- }
- private:
- RRDDIM_ACQUIRED *AcqRD;
- Dimension *D;
- };
- void Host::scheduleForTraining(TrainingRequest TR) {
- TrainingQueue.push(TR);
- }
- #define WORKER_JOB_TRAINING_FIND 0
- #define WORKER_JOB_TRAINING_TRAIN 1
- #define WORKER_JOB_TRAINING_STATS 2
- void Host::train() {
- worker_register("MLTRAIN");
- worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
- worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
- worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
- while (service_running(SERVICE_ML_TRAINING)) {
- auto P = TrainingQueue.pop();
- TrainingRequest TrainingReq = P.first;
- size_t Size = P.second;
- if (ThreadsCancelled) {
- info("Stopping training thread because it was cancelled.");
- break;
- }
- usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
- if (AllottedUT > USEC_PER_SEC)
- AllottedUT = USEC_PER_SEC;
- usec_t StartUT = now_monotonic_usec();
- TrainingResult TrainingRes;
- {
- worker_is_busy(WORKER_JOB_TRAINING_FIND);
- AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
- worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
- TrainingRes = AcqDim.train(TrainingReq);
- string_freez(TrainingReq.ChartId);
- string_freez(TrainingReq.DimensionId);
- }
- usec_t ConsumedUT = now_monotonic_usec() - StartUT;
- worker_is_busy(WORKER_JOB_TRAINING_STATS);
- usec_t RemainingUT = 0;
- if (ConsumedUT < AllottedUT)
- RemainingUT = AllottedUT - ConsumedUT;
- {
- std::lock_guard<Mutex> L(M);
- if (TS.AllottedUT == 0) {
- struct rusage TRU;
- getrusage(RUSAGE_THREAD, &TRU);
- TS.TrainingRU = TRU;
- }
- TS.QueueSize += Size;
- TS.NumPoppedItems += 1;
- TS.AllottedUT += AllottedUT;
- TS.ConsumedUT += ConsumedUT;
- TS.RemainingUT += RemainingUT;
- switch (TrainingRes) {
- case TrainingResult::Ok:
- TS.TrainingResultOk += 1;
- break;
- case TrainingResult::InvalidQueryTimeRange:
- TS.TrainingResultInvalidQueryTimeRange += 1;
- break;
- case TrainingResult::NotEnoughCollectedValues:
- TS.TrainingResultNotEnoughCollectedValues += 1;
- break;
- case TrainingResult::NullAcquiredDimension:
- TS.TrainingResultNullAcquiredDimension += 1;
- break;
- case TrainingResult::ChartUnderReplication:
- TS.TrainingResultChartUnderReplication += 1;
- break;
- }
- }
- worker_is_idle();
- std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
- worker_is_busy(0);
- }
- }
- void Host::detect() {
- worker_register("MLDETECT");
- worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep");
- worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
- worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
- worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
- worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
- heartbeat_t HB;
- heartbeat_init(&HB);
- while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
- worker_is_idle();
- heartbeat_next(&HB, (RH ? RH->rrd_update_every : default_rrd_update_every) * USEC_PER_SEC);
- detectOnce();
- }
- }
- void Host::getDetectionInfoAsJson(nlohmann::json &Json) const {
- Json["version"] = 1;
- Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions;
- Json["normal-dimensions"] = MLS.NumNormalDimensions;
- Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
- Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel;
- }
- void *train_main(void *Arg) {
- Host *H = reinterpret_cast<Host *>(Arg);
- H->train();
- return nullptr;
- }
- void *detect_main(void *Arg) {
- Host *H = reinterpret_cast<Host *>(Arg);
- H->detect();
- return nullptr;
- }
- void Host::startAnomalyDetectionThreads() {
- if (ThreadsRunning) {
- error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH));
- return;
- }
- ThreadsRunning = true;
- ThreadsCancelled = false;
- ThreadsJoined = false;
- char Tag[NETDATA_THREAD_TAG_MAX + 1];
- // #define ML_DISABLE_JOINING
- snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
- snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
- }
- void Host::stopAnomalyDetectionThreads(bool join) {
- if (!ThreadsRunning) {
- error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
- return;
- }
- if(!ThreadsCancelled) {
- ThreadsCancelled = true;
- // Signal the training queue to stop popping-items
- TrainingQueue.signal();
- netdata_thread_cancel(TrainingThread);
- netdata_thread_cancel(DetectionThread);
- }
- if (join && !ThreadsJoined) {
- ThreadsJoined = true;
- ThreadsRunning = false;
- // these fail on alpine linux and our CI hangs forever
- // failing to compile static builds
- // commenting them, until we find a solution
- // to enable again:
- // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE
- netdata_thread_join(TrainingThread, nullptr);
- netdata_thread_join(DetectionThread, nullptr);
- }
- }
|