anomalies.chart.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. # -*- coding: utf-8 -*-
  2. # Description: anomalies netdata python.d module
  3. # Author: andrewm4894
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. import sys
  6. import time
  7. from datetime import datetime
  8. import re
  9. import warnings
  10. import requests
  11. import numpy as np
  12. import pandas as pd
  13. from netdata_pandas.data import get_data, get_allmetrics_async
  14. from pyod.models.hbos import HBOS
  15. from pyod.models.pca import PCA
  16. from pyod.models.loda import LODA
  17. from pyod.models.iforest import IForest
  18. from pyod.models.cblof import CBLOF
  19. from pyod.models.feature_bagging import FeatureBagging
  20. from pyod.models.copod import COPOD
  21. from sklearn.preprocessing import MinMaxScaler
  22. from bases.FrameworkServices.SimpleService import SimpleService
  23. # ignore some sklearn/numpy warnings that are ok
  24. warnings.filterwarnings('ignore', r'All-NaN slice encountered')
  25. warnings.filterwarnings('ignore', r'invalid value encountered in true_divide')
  26. warnings.filterwarnings('ignore', r'divide by zero encountered in true_divide')
  27. warnings.filterwarnings('ignore', r'invalid value encountered in subtract')
  28. disabled_by_default = True
  29. ORDER = ['probability', 'anomaly']
  30. CHARTS = {
  31. 'probability': {
  32. 'options': ['probability', 'Anomaly Probability', 'probability', 'anomalies', 'anomalies.probability', 'line'],
  33. 'lines': []
  34. },
  35. 'anomaly': {
  36. 'options': ['anomaly', 'Anomaly', 'count', 'anomalies', 'anomalies.anomaly', 'stacked'],
  37. 'lines': []
  38. },
  39. }
  40. class Service(SimpleService):
  41. def __init__(self, configuration=None, name=None):
  42. SimpleService.__init__(self, configuration=configuration, name=name)
  43. self.basic_init()
  44. self.charts_init()
  45. self.custom_models_init()
  46. self.data_init()
  47. self.model_params_init()
  48. self.models_init()
  49. self.collected_dims = {'probability': set(), 'anomaly': set()}
  50. def check(self):
  51. python_version = float('{}.{}'.format(sys.version_info[0], sys.version_info[1]))
  52. if python_version < 3.6:
  53. self.error("anomalies collector only works with Python>=3.6")
  54. if len(self.host_charts_dict[self.host]) > 0:
  55. _ = get_allmetrics_async(host_charts_dict=self.host_charts_dict, protocol=self.protocol, user=self.username, pwd=self.password)
  56. return True
  57. def basic_init(self):
  58. """Perform some basic initialization.
  59. """
  60. self.order = ORDER
  61. self.definitions = CHARTS
  62. self.protocol = self.configuration.get('protocol', 'http')
  63. self.host = self.configuration.get('host', '127.0.0.1:19999')
  64. self.username = self.configuration.get('username', None)
  65. self.password = self.configuration.get('password', None)
  66. self.tls_verify = self.configuration.get('tls_verify', True)
  67. self.fitted_at = {}
  68. self.df_allmetrics = pd.DataFrame()
  69. self.last_train_at = 0
  70. self.include_average_prob = bool(self.configuration.get('include_average_prob', True))
  71. self.reinitialize_at_every_step = bool(self.configuration.get('reinitialize_at_every_step', False))
  72. def charts_init(self):
  73. """Do some initialisation of charts in scope related variables.
  74. """
  75. self.charts_regex = re.compile(self.configuration.get('charts_regex','None'))
  76. self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts', verify=self.tls_verify).json().get('charts', {}).keys())]
  77. self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available))
  78. self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',')
  79. if len(self.charts_to_exclude) > 0:
  80. self.charts_in_scope = [c for c in self.charts_in_scope if c not in self.charts_to_exclude]
  81. def custom_models_init(self):
  82. """Perform initialization steps related to custom models.
  83. """
  84. self.custom_models = self.configuration.get('custom_models', None)
  85. self.custom_models_normalize = bool(self.configuration.get('custom_models_normalize', False))
  86. if self.custom_models:
  87. self.custom_models_names = [model['name'] for model in self.custom_models]
  88. self.custom_models_dims = [i for s in [model['dimensions'].split(',') for model in self.custom_models] for i in s]
  89. self.custom_models_dims = [dim if '::' in dim else f'{self.host}::{dim}' for dim in self.custom_models_dims]
  90. self.custom_models_charts = list(set([dim.split('|')[0].split('::')[1] for dim in self.custom_models_dims]))
  91. self.custom_models_hosts = list(set([dim.split('::')[0] for dim in self.custom_models_dims]))
  92. self.custom_models_host_charts_dict = {}
  93. for host in self.custom_models_hosts:
  94. self.custom_models_host_charts_dict[host] = list(set([dim.split('::')[1].split('|')[0] for dim in self.custom_models_dims if dim.startswith(host)]))
  95. self.custom_models_dims_renamed = [f"{model['name']}|{dim}" for model in self.custom_models for dim in model['dimensions'].split(',')]
  96. self.models_in_scope = list(set([f'{self.host}::{c}' for c in self.charts_in_scope] + self.custom_models_names))
  97. self.charts_in_scope = list(set(self.charts_in_scope + self.custom_models_charts))
  98. self.host_charts_dict = {self.host: self.charts_in_scope}
  99. for host in self.custom_models_host_charts_dict:
  100. if host not in self.host_charts_dict:
  101. self.host_charts_dict[host] = self.custom_models_host_charts_dict[host]
  102. else:
  103. for chart in self.custom_models_host_charts_dict[host]:
  104. if chart not in self.host_charts_dict[host]:
  105. self.host_charts_dict[host].extend(chart)
  106. else:
  107. self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope]
  108. self.host_charts_dict = {self.host: self.charts_in_scope}
  109. self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope}
  110. #self.info(f'self.host_charts_dict (len={len(self.host_charts_dict[self.host])}): {self.host_charts_dict}')
  111. def data_init(self):
  112. """Initialize some empty data objects.
  113. """
  114. self.data_probability_latest = {f'{m}_prob': 0 for m in self.charts_in_scope}
  115. self.data_anomaly_latest = {f'{m}_anomaly': 0 for m in self.charts_in_scope}
  116. self.data_latest = {**self.data_probability_latest, **self.data_anomaly_latest}
  117. def model_params_init(self):
  118. """Model parameters initialisation.
  119. """
  120. self.train_max_n = self.configuration.get('train_max_n', 100000)
  121. self.train_n_secs = self.configuration.get('train_n_secs', 14400)
  122. self.offset_n_secs = self.configuration.get('offset_n_secs', 0)
  123. self.train_every_n = self.configuration.get('train_every_n', 1800)
  124. self.train_no_prediction_n = self.configuration.get('train_no_prediction_n', 10)
  125. self.initial_train_data_after = self.configuration.get('initial_train_data_after', 0)
  126. self.initial_train_data_before = self.configuration.get('initial_train_data_before', 0)
  127. self.contamination = self.configuration.get('contamination', 0.001)
  128. self.lags_n = {model: self.configuration.get('lags_n', 5) for model in self.models_in_scope}
  129. self.smooth_n = {model: self.configuration.get('smooth_n', 5) for model in self.models_in_scope}
  130. self.diffs_n = {model: self.configuration.get('diffs_n', 5) for model in self.models_in_scope}
  131. def models_init(self):
  132. """Models initialisation.
  133. """
  134. self.model = self.configuration.get('model', 'pca')
  135. if self.model == 'pca':
  136. self.models = {model: PCA(contamination=self.contamination) for model in self.models_in_scope}
  137. elif self.model == 'loda':
  138. self.models = {model: LODA(contamination=self.contamination) for model in self.models_in_scope}
  139. elif self.model == 'iforest':
  140. self.models = {model: IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination) for model in self.models_in_scope}
  141. elif self.model == 'cblof':
  142. self.models = {model: CBLOF(n_clusters=3, contamination=self.contamination) for model in self.models_in_scope}
  143. elif self.model == 'feature_bagging':
  144. self.models = {model: FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination) for model in self.models_in_scope}
  145. elif self.model == 'copod':
  146. self.models = {model: COPOD(contamination=self.contamination) for model in self.models_in_scope}
  147. elif self.model == 'hbos':
  148. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  149. else:
  150. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  151. self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope}
  152. def model_init(self, model):
  153. """Model initialisation of a single model.
  154. """
  155. if self.model == 'pca':
  156. self.models[model] = PCA(contamination=self.contamination)
  157. elif self.model == 'loda':
  158. self.models[model] = LODA(contamination=self.contamination)
  159. elif self.model == 'iforest':
  160. self.models[model] = IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination)
  161. elif self.model == 'cblof':
  162. self.models[model] = CBLOF(n_clusters=3, contamination=self.contamination)
  163. elif self.model == 'feature_bagging':
  164. self.models[model] = FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination)
  165. elif self.model == 'copod':
  166. self.models[model] = COPOD(contamination=self.contamination)
  167. elif self.model == 'hbos':
  168. self.models[model] = HBOS(contamination=self.contamination)
  169. else:
  170. self.models[model] = HBOS(contamination=self.contamination)
  171. self.custom_model_scalers[model] = MinMaxScaler()
  172. def reinitialize(self):
  173. """Reinitialize charts, models and data to a beginning state.
  174. """
  175. self.charts_init()
  176. self.custom_models_init()
  177. self.data_init()
  178. self.model_params_init()
  179. self.models_init()
  180. def save_data_latest(self, data, data_probability, data_anomaly):
  181. """Save the most recent data objects to be used if needed in the future.
  182. """
  183. self.data_latest = data
  184. self.data_probability_latest = data_probability
  185. self.data_anomaly_latest = data_anomaly
  186. def validate_charts(self, chart, data, algorithm='absolute', multiplier=1, divisor=1):
  187. """If dimension not in chart then add it.
  188. """
  189. for dim in data:
  190. if dim not in self.collected_dims[chart]:
  191. self.collected_dims[chart].add(dim)
  192. self.charts[chart].add_dimension([dim, dim, algorithm, multiplier, divisor])
  193. for dim in list(self.collected_dims[chart]):
  194. if dim not in data:
  195. self.collected_dims[chart].remove(dim)
  196. self.charts[chart].del_dimension(dim, hide=False)
  197. def add_custom_models_dims(self, df):
  198. """Given a df, select columns used by custom models, add custom model name as prefix, and append to df.
  199. :param df <pd.DataFrame>: dataframe to append new renamed columns to.
  200. :return: <pd.DataFrame> dataframe with additional columns added relating to the specified custom models.
  201. """
  202. df_custom = df[self.custom_models_dims].copy()
  203. df_custom.columns = self.custom_models_dims_renamed
  204. df = df.join(df_custom)
  205. return df
  206. def make_features(self, arr, train=False, model=None):
  207. """Take in numpy array and preprocess accordingly by taking diffs, smoothing and adding lags.
  208. :param arr <np.ndarray>: numpy array we want to make features from.
  209. :param train <bool>: True if making features for training, in which case need to fit_transform scaler and maybe sample train_max_n.
  210. :param model <str>: model to make features for.
  211. :return: <np.ndarray> transformed numpy array.
  212. """
  213. def lag(arr, n):
  214. res = np.empty_like(arr)
  215. res[:n] = np.nan
  216. res[n:] = arr[:-n]
  217. return res
  218. arr = np.nan_to_num(arr)
  219. diffs_n = self.diffs_n[model]
  220. smooth_n = self.smooth_n[model]
  221. lags_n = self.lags_n[model]
  222. if self.custom_models_normalize and model in self.custom_models_names:
  223. if train:
  224. arr = self.custom_model_scalers[model].fit_transform(arr)
  225. else:
  226. arr = self.custom_model_scalers[model].transform(arr)
  227. if diffs_n > 0:
  228. arr = np.diff(arr, diffs_n, axis=0)
  229. arr = arr[~np.isnan(arr).any(axis=1)]
  230. if smooth_n > 1:
  231. arr = np.cumsum(arr, axis=0, dtype=float)
  232. arr[smooth_n:] = arr[smooth_n:] - arr[:-smooth_n]
  233. arr = arr[smooth_n - 1:] / smooth_n
  234. arr = arr[~np.isnan(arr).any(axis=1)]
  235. if lags_n > 0:
  236. arr_orig = np.copy(arr)
  237. for lag_n in range(1, lags_n + 1):
  238. arr = np.concatenate((arr, lag(arr_orig, lag_n)), axis=1)
  239. arr = arr[~np.isnan(arr).any(axis=1)]
  240. if train:
  241. if len(arr) > self.train_max_n:
  242. arr = arr[np.random.randint(arr.shape[0], size=self.train_max_n), :]
  243. arr = np.nan_to_num(arr)
  244. return arr
  245. def train(self, models_to_train=None, train_data_after=0, train_data_before=0):
  246. """Pull required training data and train a model for each specified model.
  247. :param models_to_train <list>: list of models to train on.
  248. :param train_data_after <int>: integer timestamp for start of train data.
  249. :param train_data_before <int>: integer timestamp for end of train data.
  250. """
  251. now = datetime.now().timestamp()
  252. if train_data_after > 0 and train_data_before > 0:
  253. before = train_data_before
  254. after = train_data_after
  255. else:
  256. before = int(now) - self.offset_n_secs
  257. after = before - self.train_n_secs
  258. # get training data
  259. df_train = get_data(
  260. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before,
  261. sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password,
  262. verify=self.tls_verify
  263. ).ffill()
  264. if self.custom_models:
  265. df_train = self.add_custom_models_dims(df_train)
  266. # train model
  267. self.try_fit(df_train, models_to_train=models_to_train)
  268. self.info(f'training complete in {round(time.time() - now, 2)} seconds (runs_counter={self.runs_counter}, model={self.model}, train_n_secs={self.train_n_secs}, models={len(self.fitted_at)}, n_fit_success={self.n_fit_success}, n_fit_fails={self.n_fit_fail}, after={after}, before={before}).')
  269. self.last_train_at = self.runs_counter
  270. def try_fit(self, df_train, models_to_train=None):
  271. """Try fit each model and try to fallback to a default model if fit fails for any reason.
  272. :param df_train <pd.DataFrame>: data to train on.
  273. :param models_to_train <list>: list of models to train.
  274. """
  275. if models_to_train is None:
  276. models_to_train = list(self.models.keys())
  277. self.n_fit_fail, self.n_fit_success = 0, 0
  278. for model in models_to_train:
  279. if model not in self.models:
  280. self.model_init(model)
  281. X_train = self.make_features(
  282. df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values,
  283. train=True, model=model)
  284. try:
  285. self.models[model].fit(X_train)
  286. self.n_fit_success += 1
  287. except Exception as e:
  288. self.n_fit_fail += 1
  289. self.info(e)
  290. self.info(f'training failed for {model} at run_counter {self.runs_counter}, defaulting to hbos model.')
  291. self.models[model] = HBOS(contamination=self.contamination)
  292. self.models[model].fit(X_train)
  293. self.fitted_at[model] = self.runs_counter
  294. def predict(self):
  295. """Get latest data, make it into a feature vector, and get predictions for each available model.
  296. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  297. """
  298. # get recent data to predict on
  299. df_allmetrics = get_allmetrics_async(
  300. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  301. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  302. )
  303. if self.custom_models:
  304. df_allmetrics = self.add_custom_models_dims(df_allmetrics)
  305. self.df_allmetrics = self.df_allmetrics.append(df_allmetrics).ffill().tail((max(self.lags_n.values()) + max(self.smooth_n.values()) + max(self.diffs_n.values())) * 2)
  306. # get predictions
  307. data_probability, data_anomaly = self.try_predict()
  308. return data_probability, data_anomaly
  309. def try_predict(self):
  310. """Try make prediction and fall back to last known prediction if fails.
  311. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  312. """
  313. data_probability, data_anomaly = {}, {}
  314. for model in self.fitted_at.keys():
  315. model_display_name = self.model_display_names[model]
  316. try:
  317. X_model = np.nan_to_num(
  318. self.make_features(
  319. self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
  320. model=model
  321. )[-1,:].reshape(1, -1)
  322. )
  323. data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000
  324. data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1]
  325. except Exception as _:
  326. #self.info(e)
  327. if model_display_name + '_prob' in self.data_latest:
  328. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.')
  329. data_probability[model_display_name + '_prob'] = self.data_latest[model_display_name + '_prob']
  330. data_anomaly[model_display_name + '_anomaly'] = self.data_latest[model_display_name + '_anomaly']
  331. else:
  332. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, skipping as no previous prediction.')
  333. continue
  334. return data_probability, data_anomaly
  335. def get_data(self):
  336. # initialize to what's available right now
  337. if self.reinitialize_at_every_step or len(self.host_charts_dict[self.host]) == 0:
  338. self.charts_init()
  339. self.custom_models_init()
  340. self.model_params_init()
  341. # if not all models have been trained then train those we need to
  342. if len(self.fitted_at) < len(self.models_in_scope):
  343. self.train(
  344. models_to_train=[m for m in self.models_in_scope if m not in self.fitted_at],
  345. train_data_after=self.initial_train_data_after,
  346. train_data_before=self.initial_train_data_before
  347. )
  348. # retrain all models as per schedule from config
  349. elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0:
  350. self.reinitialize()
  351. self.train()
  352. # roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly
  353. if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n:
  354. data_probability = self.data_probability_latest
  355. data_anomaly = self.data_anomaly_latest
  356. else:
  357. data_probability, data_anomaly = self.predict()
  358. if self.include_average_prob:
  359. average_prob = np.mean(list(data_probability.values()))
  360. data_probability['average_prob'] = 0 if np.isnan(average_prob) else average_prob
  361. data = {**data_probability, **data_anomaly}
  362. self.validate_charts('probability', data_probability, divisor=100)
  363. self.validate_charts('anomaly', data_anomaly)
  364. self.save_data_latest(data, data_probability, data_anomaly)
  365. #self.info(f'len(data)={len(data)}')
  366. #self.info(f'data')
  367. return data