Просмотр исходного кода

ML-related changes to address issue/discussion comments. (#12494)

* Increase training thread's max sleep time.

With this change we will only cap the allotted time when it is more than
ten seconds. The previous limit was one second, which had the effect of
scheduling dimensions near the beggining of each training window. This
was not desirable because it would cause high CPU usage on parents with
many children.

* Only exclude netdata.* charts from training.

* Use heartbeat in detection thread.

* Track rusage of prediction thread.

* Track rusage of training thread.

* Add support for random sampling of extracted features.

* Rebase

* Skip RNG when ML is disabled and fix undef behaviour
vkalintiris 2 лет назад
Родитель
Сommit
41a40dc3a4
9 измененных файлов с 89 добавлено и 54 удалено
  1. 4 4
      ml/Config.cc
  2. 2 0
      ml/Config.h
  3. 8 2
      ml/Dimension.cc
  4. 0 5
      ml/Dimension.h
  5. 35 42
      ml/Host.cc
  6. 13 0
      ml/Host.h
  7. 6 0
      ml/kmeans/SamplesBuffer.cc
  8. 6 1
      ml/kmeans/SamplesBuffer.h
  9. 15 0
      ml/ml.cc

+ 4 - 4
ml/Config.cc

@@ -38,6 +38,7 @@ void Config::readMLConfig(void) {
     unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3);
     unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5);
 
+    double RandomSamplingRatio = config_get_float(ConfigSectionML, "random sampling ratio", 1.0 / LagN);
     unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000);
 
     double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99);
@@ -67,6 +68,7 @@ void Config::readMLConfig(void) {
     SmoothN = clamp(SmoothN, 0u, 5u);
     LagN = clamp(LagN, 0u, 5u);
 
+    RandomSamplingRatio = clamp(RandomSamplingRatio, 0.2, 1.0);
     MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u);
 
     DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00);
@@ -112,6 +114,7 @@ void Config::readMLConfig(void) {
     Cfg.SmoothN = SmoothN;
     Cfg.LagN = LagN;
 
+    Cfg.RandomSamplingRatio = RandomSamplingRatio;
     Cfg.MaxKMeansIters = MaxKMeansIters;
 
     Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold;
@@ -128,9 +131,6 @@ void Config::readMLConfig(void) {
 
     // Always exclude anomaly_detection charts from training.
     Cfg.ChartsToSkip = "anomaly_detection.* ";
-    Cfg.ChartsToSkip += config_get(ConfigSectionML, "charts to skip from training",
-            "!system.* !cpu.* !mem.* !disk.* !disk_* "
-            "!ip.* !ipv4.* !ipv6.* !net.* !net_* !netfilter.* "
-            "!services.* !apps.* !groups.* !user.* !ebpf.* !netdata.* *");
+    Cfg.ChartsToSkip += config_get(ConfigSectionML, "charts to skip from training", "netdata.*");
     Cfg.SP_ChartsToSkip = simple_pattern_create(ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
 }

+ 2 - 0
ml/Config.h

@@ -21,6 +21,7 @@ public:
     unsigned SmoothN;
     unsigned LagN;
 
+    double RandomSamplingRatio;
     unsigned MaxKMeansIters;
 
     double DimensionAnomalyScoreThreshold;
@@ -39,6 +40,7 @@ public:
     SIMPLE_PATTERN *SP_ChartsToSkip;
 
     std::string AnomalyDBPath;
+    std::vector<uint32_t> RandomNums;
 
     void readMLConfig();
 };

+ 8 - 2
ml/Dimension.cc

@@ -125,8 +125,13 @@ MLResult TrainableDimension::trainModel() {
     if (!CNs)
         return MLResult::MissingData;
 
-    SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
+    unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio;
+    double SamplingRatio = std::min(static_cast<double>(TargetNumSamples) / N, 1.0);
+
+    SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
+                                     SamplingRatio, Cfg.RandomNums);
     KM.train(SB, Cfg.MaxKMeansIters);
+
     Trained = true;
     ConstantModel = true;
 
@@ -162,7 +167,8 @@ std::pair<MLResult, bool> PredictableDimension::predict() {
     CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
     std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
 
-    SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
+    SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
+                                     1.0, Cfg.RandomNums);
     AnomalyScore = computeAnomalyScore(SB);
     delete[] TmpCNs;
 

+ 0 - 5
ml/Dimension.h

@@ -76,10 +76,6 @@ public:
 
     bool isTrained() const { return Trained; }
 
-    double updateTrainingDuration(double Duration) {
-        return TrainingDuration.exchange(Duration);
-    }
-
 private:
     std::pair<CalculatedNumber *, size_t> getCalculatedNumbers();
 
@@ -94,7 +90,6 @@ private:
     KMeans KM;
 
     std::atomic<bool> Trained{false};
-    std::atomic<double> TrainingDuration{0.0};
 };
 
 class PredictableDimension : public TrainableDimension {

+ 35 - 42
ml/Host.cc

@@ -184,13 +184,13 @@ static void updateEventsChart(RRDHOST *RH,
     rrdset_done(RS);
 }
 
-static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) {
+static void updateDetectionChart(RRDHOST *RH) {
     static thread_local RRDSET *RS = nullptr;
-    static thread_local RRDDIM *PredictiobDurationRD = nullptr;
+    static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
 
     if (!RS) {
         std::string IdPrefix = "prediction_stats";
-        std::string TitlePrefix = "Time it took to run prediction for host";
+        std::string TitlePrefix = "Prediction thread CPU usage for host";
         auto IdTitlePair = getHostSpecificIdAndTitle(RH, IdPrefix, TitlePrefix);
 
         RS = rrdset_create_localhost(
@@ -200,35 +200,36 @@ static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuratio
             "prediction_stats", // family
             "anomaly_detection.prediction_stats", // ctx
             IdTitlePair.second.c_str(), // title
-            "milliseconds", // units
+            "milliseconds/s", // units
             "netdata", // plugin
             "ml", // module
             39187, // priority
             RH->rrd_update_every, // update_every
-            RRDSET_TYPE_LINE // chart_type
+            RRDSET_TYPE_STACKED // chart_type
         );
 
-        PredictiobDurationRD  = rrddim_add(RS, "duration", NULL,
-                1, 1, RRD_ALGORITHM_ABSOLUTE);
+        UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+        SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
     } else
         rrdset_next(RS);
 
-    rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration);
+    struct rusage TRU;
+    getrusage(RUSAGE_THREAD, &TRU);
 
+    rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
+    rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
     rrdset_done(RS);
 }
 
-static void updateTrainingChart(RRDHOST *RH,
-                                collected_number TotalTrainingDuration,
-                                collected_number MaxTrainingDuration)
+static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU)
 {
     static thread_local RRDSET *RS = nullptr;
-    static thread_local RRDDIM *TotalTrainingDurationRD = nullptr;
-    static thread_local RRDDIM *MaxTrainingDurationRD = nullptr;
+    static thread_local RRDDIM *UserRD = nullptr;
+    static thread_local RRDDIM *SystemRD = nullptr;
 
     if (!RS) {
         std::string IdPrefix = "training_stats";
-        std::string TitlePrefix = "Training step statistics for host";
+        std::string TitlePrefix = "Training thread CPU usage for host";
         auto IdTitlePair = getHostSpecificIdAndTitle(RH, IdPrefix, TitlePrefix);
 
         RS = rrdset_create_localhost(
@@ -238,24 +239,21 @@ static void updateTrainingChart(RRDHOST *RH,
             "training_stats", // family
             "anomaly_detection.training_stats", // ctx
             IdTitlePair.second.c_str(), // title
-            "milliseconds", // units
+            "milliseconds/s", // units
             "netdata", // plugin
             "ml", // module
             39188, // priority
             RH->rrd_update_every, // update_every
-            RRDSET_TYPE_LINE // chart_type
+            RRDSET_TYPE_STACKED // chart_type
         );
 
-        TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL,
-                1, 1, RRD_ALGORITHM_ABSOLUTE);
-        MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL,
-                1, 1, RRD_ALGORITHM_ABSOLUTE);
+        UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+        SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
     } else
         rrdset_next(RS);
 
-    rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration);
-    rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration);
-
+    rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
+    rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
     rrdset_done(RS);
 }
 
@@ -307,6 +305,7 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
     Json["smooth-n"] = Cfg.SmoothN;
     Json["lag-n"] = Cfg.LagN;
 
+    Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
     Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
 
     Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
@@ -345,11 +344,7 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
         return;
 
     D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
-
-    TimePoint StartTP = SteadyClock::now();
     D->trainModel();
-    Duration<double> Duration = SteadyClock::now() - StartTP;
-    D->updateTrainingDuration(Duration.count());
 
     {
         std::lock_guard<std::mutex> Lock(Mutex);
@@ -358,9 +353,11 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
 }
 
 void TrainableHost::train() {
-    Duration<double> MaxSleepFor = Seconds{updateEvery()};
+    Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
 
     while (!netdata_exit) {
+        updateResourceUsage();
+
         TimePoint NowTP = SteadyClock::now();
 
         auto P = findDimensionToTrain(NowTP);
@@ -393,9 +390,6 @@ void DetectableHost::detectOnce() {
     size_t NumNormalDimensions = 0;
     size_t NumTrainedDimensions = 0;
 
-    double TotalTrainingDuration = 0.0;
-    double MaxTrainingDuration = 0.0;
-
     bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
     if (CollectAnomalyRates)
         rrdset_next(AnomalyRateRS);
@@ -414,10 +408,6 @@ void DetectableHost::detectOnce() {
 
             NumTrainedDimensions += D->isTrained();
 
-            double DimTrainingDuration = D->updateTrainingDuration(0.0);
-            MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration);
-            TotalTrainingDuration += DimTrainingDuration;
-
             if (IsAnomalous)
                 NumAnomalousDimensions += 1;
 
@@ -448,7 +438,10 @@ void DetectableHost::detectOnce() {
     updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
     updateWindowLengthChart(getRH(), WindowLength);
     updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
-    updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0);
+
+    struct rusage TRU;
+    getResourceUsage(&TRU);
+    updateTrainingChart(getRH(), &TRU);
 
     if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
         return;
@@ -477,15 +470,15 @@ void DetectableHost::detectOnce() {
 void DetectableHost::detect() {
     std::this_thread::sleep_for(Seconds{10});
 
+    heartbeat_t HB;
+    heartbeat_init(&HB);
+
     while (!netdata_exit) {
-        TimePoint StartTP = SteadyClock::now();
-        detectOnce();
-        TimePoint EndTP = SteadyClock::now();
+        heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
 
-        Duration<double> Dur = EndTP - StartTP;
-        updateDetectionChart(getRH(), Dur.count() * 1000);
+        detectOnce();
 
-        std::this_thread::sleep_for(Seconds{updateEvery()});
+        updateDetectionChart(getRH());
     }
 }
 

+ 13 - 0
ml/Host.h

@@ -70,9 +70,22 @@ public:
 
     void train();
 
+    void updateResourceUsage() {
+        std::lock_guard<std::mutex> Lock(ResourceUsageMutex);
+        getrusage(RUSAGE_THREAD, &ResourceUsage);
+    }
+
+    void getResourceUsage(struct rusage *RU) {
+        std::lock_guard<std::mutex> Lock(ResourceUsageMutex);
+        memcpy(RU, &ResourceUsage, sizeof(struct rusage));
+    }
+
 private:
     std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP);
     void trainDimension(Dimension *D, const TimePoint &NowTP);
+
+    std::mutex ResourceUsageMutex;
+    struct rusage ResourceUsage;
 };
 
 class DetectableHost : public TrainableHost {

+ 6 - 0
ml/kmeans/SamplesBuffer.cc

@@ -130,7 +130,13 @@ std::vector<DSample> SamplesBuffer::preprocess() {
     DSamples.reserve(OutN);
     Preprocessed = true;
 
+    uint32_t MaxMT = std::numeric_limits<uint32_t>::max();
+    uint32_t CutOff = static_cast<double>(MaxMT) * SamplingRatio;
+
     for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) {
+        if (RandNums[Idx] > CutOff)
+            continue;
+
         DSample DS;
         DS.set_size(NumDimsPerSample * (LagN + 1));
 

+ 6 - 1
ml/kmeans/SamplesBuffer.h

@@ -80,9 +80,11 @@ class SamplesBuffer {
 public:
     SamplesBuffer(CalculatedNumber *CNs,
                   size_t NumSamples, size_t NumDimsPerSample,
-                  size_t DiffN = 1, size_t SmoothN = 3, size_t LagN = 3) :
+                  size_t DiffN, size_t SmoothN, size_t LagN,
+                  double SamplingRatio, std::vector<uint32_t> &RandNums) :
         CNs(CNs), NumSamples(NumSamples), NumDimsPerSample(NumDimsPerSample),
         DiffN(DiffN), SmoothN(SmoothN), LagN(LagN),
+        SamplingRatio(SamplingRatio), RandNums(RandNums),
         BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)),
         Preprocessed(false) {};
 
@@ -129,6 +131,9 @@ private:
     size_t DiffN;
     size_t SmoothN;
     size_t LagN;
+    double SamplingRatio;
+    std::vector<uint32_t> &RandNums;
+
     size_t BytesPerSample;
     bool Preprocessed;
 };

+ 15 - 0
ml/ml.cc

@@ -4,6 +4,8 @@
 #include "Dimension.h"
 #include "Host.h"
 
+#include <random>
+
 using namespace ml;
 
 bool ml_capable() {
@@ -27,7 +29,20 @@ bool ml_enabled(RRDHOST *RH) {
  */
 
 void ml_init(void) {
+    // Read config values
     Cfg.readMLConfig();
+
+    if (!Cfg.EnableAnomalyDetection)
+        return;
+
+    // Generate random numbers to efficiently sample the features we need
+    // for KMeans clustering.
+    std::random_device RD;
+    std::mt19937 Gen(RD());
+
+    Cfg.RandomNums.reserve(Cfg.MaxTrainSamples);
+    for (size_t Idx = 0; Idx != Cfg.MaxTrainSamples; Idx++)
+        Cfg.RandomNums.push_back(Gen());
 }
 
 void ml_new_host(RRDHOST *RH) {