Dimension.cc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "Config.h"
  3. #include "Dimension.h"
  4. #include "Query.h"
  5. #include "Host.h"
  6. using namespace ml;
  7. static const char *mls2str(MachineLearningStatus MLS) {
  8. switch (MLS) {
  9. case ml::MachineLearningStatus::Enabled:
  10. return "enabled";
  11. case ml::MachineLearningStatus::DisabledDueToUniqueUpdateEvery:
  12. return "disabled-ue";
  13. case ml::MachineLearningStatus::DisabledDueToExcludedChart:
  14. return "disabled-sp";
  15. default:
  16. return "unknown";
  17. }
  18. }
  19. static const char *mt2str(MetricType MT) {
  20. switch (MT) {
  21. case ml::MetricType::Constant:
  22. return "constant";
  23. case ml::MetricType::Variable:
  24. return "variable";
  25. default:
  26. return "unknown";
  27. }
  28. }
  29. static const char *ts2str(TrainingStatus TS) {
  30. switch (TS) {
  31. case ml::TrainingStatus::PendingWithModel:
  32. return "pending-with-model";
  33. case ml::TrainingStatus::PendingWithoutModel:
  34. return "pending-without-model";
  35. case ml::TrainingStatus::Trained:
  36. return "trained";
  37. case ml::TrainingStatus::Untrained:
  38. return "untrained";
  39. default:
  40. return "unknown";
  41. }
  42. }
  43. static const char *tr2str(TrainingResult TR) {
  44. switch (TR) {
  45. case ml::TrainingResult::Ok:
  46. return "ok";
  47. case ml::TrainingResult::InvalidQueryTimeRange:
  48. return "invalid-query";
  49. case ml::TrainingResult::NotEnoughCollectedValues:
  50. return "missing-values";
  51. case ml::TrainingResult::NullAcquiredDimension:
  52. return "null-acquired-dim";
  53. case ml::TrainingResult::ChartUnderReplication:
  54. return "chart-under-replication";
  55. default:
  56. return "unknown";
  57. }
  58. }
  59. std::pair<CalculatedNumber *, TrainingResponse> Dimension::getCalculatedNumbers(const TrainingRequest &TrainingReq) {
  60. TrainingResponse TrainingResp = {};
  61. TrainingResp.RequestTime = TrainingReq.RequestTime;
  62. TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest;
  63. TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest;
  64. TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0);
  65. TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0);
  66. size_t MinN = Cfg.MinTrainSamples;
  67. size_t MaxN = Cfg.MaxTrainSamples;
  68. // Figure out what our time window should be.
  69. TrainingResp.QueryBeforeT = TrainingResp.LastEntryOnResponse;
  70. TrainingResp.QueryAfterT = std::max(
  71. TrainingResp.QueryBeforeT - static_cast<time_t>((MaxN - 1) * updateEvery()),
  72. TrainingResp.FirstEntryOnResponse
  73. );
  74. if (TrainingResp.QueryAfterT >= TrainingResp.QueryBeforeT) {
  75. TrainingResp.Result = TrainingResult::InvalidQueryTimeRange;
  76. return { nullptr, TrainingResp };
  77. }
  78. if (rrdset_is_replicating(RD->rrdset)) {
  79. TrainingResp.Result = TrainingResult::ChartUnderReplication;
  80. return { nullptr, TrainingResp };
  81. }
  82. CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)]();
  83. // Start the query.
  84. size_t Idx = 0;
  85. CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN();
  86. Query Q = Query(getRD());
  87. Q.init(TrainingResp.QueryAfterT, TrainingResp.QueryBeforeT);
  88. while (!Q.isFinished()) {
  89. if (Idx == MaxN)
  90. break;
  91. auto P = Q.nextMetric();
  92. CalculatedNumber Value = P.second;
  93. if (netdata_double_isnumber(Value)) {
  94. if (!TrainingResp.DbAfterT)
  95. TrainingResp.DbAfterT = P.first;
  96. TrainingResp.DbBeforeT = P.first;
  97. CNs[Idx] = Value;
  98. LastValue = CNs[Idx];
  99. TrainingResp.CollectedValues++;
  100. } else
  101. CNs[Idx] = LastValue;
  102. Idx++;
  103. }
  104. TrainingResp.TotalValues = Idx;
  105. if (TrainingResp.CollectedValues < MinN) {
  106. TrainingResp.Result = TrainingResult::NotEnoughCollectedValues;
  107. delete[] CNs;
  108. return { nullptr, TrainingResp };
  109. }
  110. // Find first non-NaN value.
  111. for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TrainingResp.TotalValues--) { }
  112. // Overwrite NaN values.
  113. if (Idx != 0)
  114. memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TrainingResp.TotalValues);
  115. TrainingResp.Result = TrainingResult::Ok;
  116. return { CNs, TrainingResp };
  117. }
  118. TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) {
  119. auto P = getCalculatedNumbers(TrainingReq);
  120. CalculatedNumber *CNs = P.first;
  121. TrainingResponse TrainingResp = P.second;
  122. if (TrainingResp.Result != TrainingResult::Ok) {
  123. std::lock_guard<Mutex> L(M);
  124. MT = MetricType::Constant;
  125. switch (TS) {
  126. case TrainingStatus::PendingWithModel:
  127. TS = TrainingStatus::Trained;
  128. break;
  129. case TrainingStatus::PendingWithoutModel:
  130. TS = TrainingStatus::Untrained;
  131. break;
  132. default:
  133. break;
  134. }
  135. TR = TrainingResp;
  136. LastTrainingTime = TrainingResp.LastEntryOnResponse;
  137. return TrainingResp.Result;
  138. }
  139. unsigned N = TrainingResp.TotalValues;
  140. unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio;
  141. double SamplingRatio = std::min(static_cast<double>(TargetNumSamples) / N, 1.0);
  142. SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
  143. SamplingRatio, Cfg.RandomNums);
  144. std::vector<DSample> Samples;
  145. SB.preprocess(Samples);
  146. KMeans KM;
  147. KM.train(Samples, Cfg.MaxKMeansIters);
  148. {
  149. std::lock_guard<Mutex> L(M);
  150. if (Models.size() < Cfg.NumModelsToUse) {
  151. Models.push_back(std::move(KM));
  152. } else {
  153. std::rotate(std::begin(Models), std::begin(Models) + 1, std::end(Models));
  154. Models[Models.size() - 1] = std::move(KM);
  155. }
  156. MT = MetricType::Constant;
  157. TS = TrainingStatus::Trained;
  158. TR = TrainingResp;
  159. LastTrainingTime = rrddim_last_entry_s(RD);
  160. }
  161. delete[] CNs;
  162. return TrainingResp.Result;
  163. }
  164. void Dimension::scheduleForTraining(time_t CurrT) {
  165. switch (MT) {
  166. case MetricType::Constant: {
  167. return;
  168. } default:
  169. break;
  170. }
  171. switch (TS) {
  172. case TrainingStatus::PendingWithModel:
  173. case TrainingStatus::PendingWithoutModel:
  174. break;
  175. case TrainingStatus::Untrained: {
  176. Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
  177. TS = TrainingStatus::PendingWithoutModel;
  178. H->scheduleForTraining(getTrainingRequest(CurrT));
  179. break;
  180. }
  181. case TrainingStatus::Trained: {
  182. bool NeedsTraining = (time_t)(LastTrainingTime + (Cfg.TrainEvery * updateEvery())) < CurrT;
  183. if (NeedsTraining) {
  184. Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
  185. TS = TrainingStatus::PendingWithModel;
  186. H->scheduleForTraining(getTrainingRequest(CurrT));
  187. }
  188. break;
  189. }
  190. }
  191. }
  192. bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) {
  193. // Nothing to do if ML is disabled for this dimension
  194. if (MLS != MachineLearningStatus::Enabled)
  195. return false;
  196. // Don't treat values that don't exist as anomalous
  197. if (!Exists) {
  198. CNs.clear();
  199. return false;
  200. }
  201. // Save the value and return if we don't have enough values for a sample
  202. unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
  203. if (CNs.size() < N) {
  204. CNs.push_back(Value);
  205. return false;
  206. }
  207. // Push the value and check if it's different from the last one
  208. bool SameValue = true;
  209. std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
  210. if (CNs[N - 1] != Value)
  211. SameValue = false;
  212. CNs[N - 1] = Value;
  213. // Create the sample
  214. CalculatedNumber TmpCNs[N * (Cfg.LagN + 1)];
  215. memset(TmpCNs, 0, N * (Cfg.LagN + 1) * sizeof(CalculatedNumber));
  216. std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
  217. SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1,
  218. Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
  219. 1.0, Cfg.RandomNums);
  220. SB.preprocess(Feature);
  221. /*
  222. * Lock to predict and possibly schedule the dimension for training
  223. */
  224. std::unique_lock<Mutex> L(M, std::defer_lock);
  225. if (!L.try_lock()) {
  226. return false;
  227. }
  228. // Mark the metric time as variable if we received different values
  229. if (!SameValue)
  230. MT = MetricType::Variable;
  231. // Decide if the dimension needs to be scheduled for training
  232. scheduleForTraining(CurrT);
  233. // Nothing to do if we don't have a model
  234. switch (TS) {
  235. case TrainingStatus::Untrained:
  236. case TrainingStatus::PendingWithoutModel:
  237. return false;
  238. default:
  239. break;
  240. }
  241. /*
  242. * Use the KMeans models to check if the value is anomalous
  243. */
  244. size_t ModelsConsulted = 0;
  245. size_t Sum = 0;
  246. for (const auto &KM : Models) {
  247. ModelsConsulted++;
  248. double AnomalyScore = KM.anomalyScore(Feature);
  249. if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN())
  250. continue;
  251. if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) {
  252. global_statistics_ml_models_consulted(ModelsConsulted);
  253. return false;
  254. }
  255. Sum += 1;
  256. }
  257. global_statistics_ml_models_consulted(ModelsConsulted);
  258. return Sum;
  259. }
  260. std::vector<KMeans> Dimension::getModels() {
  261. std::unique_lock<Mutex> L(M);
  262. return Models;
  263. }
  264. void Dimension::dump() const {
  265. const char *ChartId = rrdset_id(RD->rrdset);
  266. const char *DimensionId = rrddim_id(RD);
  267. const char *MLS_Str = mls2str(MLS);
  268. const char *MT_Str = mt2str(MT);
  269. const char *TS_Str = ts2str(TS);
  270. const char *TR_Str = tr2str(TR.Result);
  271. const char *fmt =
  272. "[ML] %s.%s: MLS=%s, MT=%s, TS=%s, Result=%s, "
  273. "ReqTime=%ld, FEOReq=%ld, LEOReq=%ld, "
  274. "FEOResp=%ld, LEOResp=%ld, QTR=<%ld, %ld>, DBTR=<%ld, %ld>, Collected=%zu, Total=%zu";
  275. error(fmt,
  276. ChartId, DimensionId, MLS_Str, MT_Str, TS_Str, TR_Str,
  277. TR.RequestTime, TR.FirstEntryOnRequest, TR.LastEntryOnRequest,
  278. TR.FirstEntryOnResponse, TR.LastEntryOnResponse,
  279. TR.QueryAfterT, TR.QueryBeforeT, TR.DbAfterT, TR.DbBeforeT, TR.CollectedValues, TR.TotalValues
  280. );
  281. }