anomalies.chart.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # -*- coding: utf-8 -*-
  2. # Description: anomalies netdata python.d module
  3. # Author: andrewm4894
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. import sys
  6. import time
  7. from datetime import datetime
  8. import re
  9. import warnings
  10. import requests
  11. import numpy as np
  12. import pandas as pd
  13. from netdata_pandas.data import get_data, get_allmetrics_async
  14. from pyod.models.hbos import HBOS
  15. from pyod.models.pca import PCA
  16. from pyod.models.loda import LODA
  17. from pyod.models.iforest import IForest
  18. from pyod.models.cblof import CBLOF
  19. from pyod.models.feature_bagging import FeatureBagging
  20. from pyod.models.copod import COPOD
  21. from sklearn.preprocessing import MinMaxScaler
  22. from bases.FrameworkServices.SimpleService import SimpleService
  23. # ignore some sklearn/numpy warnings that are ok
  24. warnings.filterwarnings('ignore', r'All-NaN slice encountered')
  25. warnings.filterwarnings('ignore', r'invalid value encountered in true_divide')
  26. warnings.filterwarnings('ignore', r'divide by zero encountered in true_divide')
  27. warnings.filterwarnings('ignore', r'invalid value encountered in subtract')
  28. disabled_by_default = True
  29. ORDER = ['probability', 'anomaly']
  30. CHARTS = {
  31. 'probability': {
  32. 'options': ['probability', 'Anomaly Probability', 'probability', 'anomalies', 'anomalies.probability', 'line'],
  33. 'lines': []
  34. },
  35. 'anomaly': {
  36. 'options': ['anomaly', 'Anomaly', 'count', 'anomalies', 'anomalies.anomaly', 'stacked'],
  37. 'lines': []
  38. },
  39. }
  40. class Service(SimpleService):
  41. def __init__(self, configuration=None, name=None):
  42. SimpleService.__init__(self, configuration=configuration, name=name)
  43. self.basic_init()
  44. self.charts_init()
  45. self.custom_models_init()
  46. self.data_init()
  47. self.model_params_init()
  48. self.models_init()
  49. self.collected_dims = {'probability': set(), 'anomaly': set()}
  50. def check(self):
  51. if not (sys.version_info[0] >= 3 and sys.version_info[1] >= 6):
  52. self.error("anomalies collector only works with Python>=3.6")
  53. if len(self.host_charts_dict[self.host]) > 0:
  54. _ = get_allmetrics_async(host_charts_dict=self.host_charts_dict, protocol=self.protocol, user=self.username, pwd=self.password)
  55. return True
  56. def basic_init(self):
  57. """Perform some basic initialization.
  58. """
  59. self.order = ORDER
  60. self.definitions = CHARTS
  61. self.protocol = self.configuration.get('protocol', 'http')
  62. self.host = self.configuration.get('host', '127.0.0.1:19999')
  63. self.username = self.configuration.get('username', None)
  64. self.password = self.configuration.get('password', None)
  65. self.tls_verify = self.configuration.get('tls_verify', True)
  66. self.fitted_at = {}
  67. self.df_allmetrics = pd.DataFrame()
  68. self.last_train_at = 0
  69. self.include_average_prob = bool(self.configuration.get('include_average_prob', True))
  70. self.reinitialize_at_every_step = bool(self.configuration.get('reinitialize_at_every_step', False))
  71. def charts_init(self):
  72. """Do some initialisation of charts in scope related variables.
  73. """
  74. self.charts_regex = re.compile(self.configuration.get('charts_regex','None'))
  75. self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts', verify=self.tls_verify).json().get('charts', {}).keys())]
  76. self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available))
  77. self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',')
  78. if len(self.charts_to_exclude) > 0:
  79. self.charts_in_scope = [c for c in self.charts_in_scope if c not in self.charts_to_exclude]
  80. def custom_models_init(self):
  81. """Perform initialization steps related to custom models.
  82. """
  83. self.custom_models = self.configuration.get('custom_models', None)
  84. self.custom_models_normalize = bool(self.configuration.get('custom_models_normalize', False))
  85. if self.custom_models:
  86. self.custom_models_names = [model['name'] for model in self.custom_models]
  87. self.custom_models_dims = [i for s in [model['dimensions'].split(',') for model in self.custom_models] for i in s]
  88. self.custom_models_dims = [dim if '::' in dim else f'{self.host}::{dim}' for dim in self.custom_models_dims]
  89. self.custom_models_charts = list(set([dim.split('|')[0].split('::')[1] for dim in self.custom_models_dims]))
  90. self.custom_models_hosts = list(set([dim.split('::')[0] for dim in self.custom_models_dims]))
  91. self.custom_models_host_charts_dict = {}
  92. for host in self.custom_models_hosts:
  93. self.custom_models_host_charts_dict[host] = list(set([dim.split('::')[1].split('|')[0] for dim in self.custom_models_dims if dim.startswith(host)]))
  94. self.custom_models_dims_renamed = [f"{model['name']}|{dim}" for model in self.custom_models for dim in model['dimensions'].split(',')]
  95. self.models_in_scope = list(set([f'{self.host}::{c}' for c in self.charts_in_scope] + self.custom_models_names))
  96. self.charts_in_scope = list(set(self.charts_in_scope + self.custom_models_charts))
  97. self.host_charts_dict = {self.host: self.charts_in_scope}
  98. for host in self.custom_models_host_charts_dict:
  99. if host not in self.host_charts_dict:
  100. self.host_charts_dict[host] = self.custom_models_host_charts_dict[host]
  101. else:
  102. for chart in self.custom_models_host_charts_dict[host]:
  103. if chart not in self.host_charts_dict[host]:
  104. self.host_charts_dict[host].extend(chart)
  105. else:
  106. self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope]
  107. self.host_charts_dict = {self.host: self.charts_in_scope}
  108. self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope}
  109. #self.info(f'self.host_charts_dict (len={len(self.host_charts_dict[self.host])}): {self.host_charts_dict}')
  110. def data_init(self):
  111. """Initialize some empty data objects.
  112. """
  113. self.data_probability_latest = {f'{m}_prob': 0 for m in self.charts_in_scope}
  114. self.data_anomaly_latest = {f'{m}_anomaly': 0 for m in self.charts_in_scope}
  115. self.data_latest = {**self.data_probability_latest, **self.data_anomaly_latest}
  116. def model_params_init(self):
  117. """Model parameters initialisation.
  118. """
  119. self.train_max_n = self.configuration.get('train_max_n', 100000)
  120. self.train_n_secs = self.configuration.get('train_n_secs', 14400)
  121. self.offset_n_secs = self.configuration.get('offset_n_secs', 0)
  122. self.train_every_n = self.configuration.get('train_every_n', 1800)
  123. self.train_no_prediction_n = self.configuration.get('train_no_prediction_n', 10)
  124. self.initial_train_data_after = self.configuration.get('initial_train_data_after', 0)
  125. self.initial_train_data_before = self.configuration.get('initial_train_data_before', 0)
  126. self.contamination = self.configuration.get('contamination', 0.001)
  127. self.lags_n = {model: self.configuration.get('lags_n', 5) for model in self.models_in_scope}
  128. self.smooth_n = {model: self.configuration.get('smooth_n', 5) for model in self.models_in_scope}
  129. self.diffs_n = {model: self.configuration.get('diffs_n', 5) for model in self.models_in_scope}
  130. def models_init(self):
  131. """Models initialisation.
  132. """
  133. self.model = self.configuration.get('model', 'pca')
  134. if self.model == 'pca':
  135. self.models = {model: PCA(contamination=self.contamination) for model in self.models_in_scope}
  136. elif self.model == 'loda':
  137. self.models = {model: LODA(contamination=self.contamination) for model in self.models_in_scope}
  138. elif self.model == 'iforest':
  139. self.models = {model: IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination) for model in self.models_in_scope}
  140. elif self.model == 'cblof':
  141. self.models = {model: CBLOF(n_clusters=3, contamination=self.contamination) for model in self.models_in_scope}
  142. elif self.model == 'feature_bagging':
  143. self.models = {model: FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination) for model in self.models_in_scope}
  144. elif self.model == 'copod':
  145. self.models = {model: COPOD(contamination=self.contamination) for model in self.models_in_scope}
  146. elif self.model == 'hbos':
  147. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  148. else:
  149. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  150. self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope}
  151. def model_init(self, model):
  152. """Model initialisation of a single model.
  153. """
  154. if self.model == 'pca':
  155. self.models[model] = PCA(contamination=self.contamination)
  156. elif self.model == 'loda':
  157. self.models[model] = LODA(contamination=self.contamination)
  158. elif self.model == 'iforest':
  159. self.models[model] = IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination)
  160. elif self.model == 'cblof':
  161. self.models[model] = CBLOF(n_clusters=3, contamination=self.contamination)
  162. elif self.model == 'feature_bagging':
  163. self.models[model] = FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination)
  164. elif self.model == 'copod':
  165. self.models[model] = COPOD(contamination=self.contamination)
  166. elif self.model == 'hbos':
  167. self.models[model] = HBOS(contamination=self.contamination)
  168. else:
  169. self.models[model] = HBOS(contamination=self.contamination)
  170. self.custom_model_scalers[model] = MinMaxScaler()
  171. def reinitialize(self):
  172. """Reinitialize charts, models and data to a beginning state.
  173. """
  174. self.charts_init()
  175. self.custom_models_init()
  176. self.data_init()
  177. self.model_params_init()
  178. self.models_init()
  179. def save_data_latest(self, data, data_probability, data_anomaly):
  180. """Save the most recent data objects to be used if needed in the future.
  181. """
  182. self.data_latest = data
  183. self.data_probability_latest = data_probability
  184. self.data_anomaly_latest = data_anomaly
  185. def validate_charts(self, chart, data, algorithm='absolute', multiplier=1, divisor=1):
  186. """If dimension not in chart then add it.
  187. """
  188. for dim in data:
  189. if dim not in self.collected_dims[chart]:
  190. self.collected_dims[chart].add(dim)
  191. self.charts[chart].add_dimension([dim, dim, algorithm, multiplier, divisor])
  192. for dim in list(self.collected_dims[chart]):
  193. if dim not in data:
  194. self.collected_dims[chart].remove(dim)
  195. self.charts[chart].del_dimension(dim, hide=False)
  196. def add_custom_models_dims(self, df):
  197. """Given a df, select columns used by custom models, add custom model name as prefix, and append to df.
  198. :param df <pd.DataFrame>: dataframe to append new renamed columns to.
  199. :return: <pd.DataFrame> dataframe with additional columns added relating to the specified custom models.
  200. """
  201. df_custom = df[self.custom_models_dims].copy()
  202. df_custom.columns = self.custom_models_dims_renamed
  203. df = df.join(df_custom)
  204. return df
  205. def make_features(self, arr, train=False, model=None):
  206. """Take in numpy array and preprocess accordingly by taking diffs, smoothing and adding lags.
  207. :param arr <np.ndarray>: numpy array we want to make features from.
  208. :param train <bool>: True if making features for training, in which case need to fit_transform scaler and maybe sample train_max_n.
  209. :param model <str>: model to make features for.
  210. :return: <np.ndarray> transformed numpy array.
  211. """
  212. def lag(arr, n):
  213. res = np.empty_like(arr)
  214. res[:n] = np.nan
  215. res[n:] = arr[:-n]
  216. return res
  217. arr = np.nan_to_num(arr)
  218. diffs_n = self.diffs_n[model]
  219. smooth_n = self.smooth_n[model]
  220. lags_n = self.lags_n[model]
  221. if self.custom_models_normalize and model in self.custom_models_names:
  222. if train:
  223. arr = self.custom_model_scalers[model].fit_transform(arr)
  224. else:
  225. arr = self.custom_model_scalers[model].transform(arr)
  226. if diffs_n > 0:
  227. arr = np.diff(arr, diffs_n, axis=0)
  228. arr = arr[~np.isnan(arr).any(axis=1)]
  229. if smooth_n > 1:
  230. arr = np.cumsum(arr, axis=0, dtype=float)
  231. arr[smooth_n:] = arr[smooth_n:] - arr[:-smooth_n]
  232. arr = arr[smooth_n - 1:] / smooth_n
  233. arr = arr[~np.isnan(arr).any(axis=1)]
  234. if lags_n > 0:
  235. arr_orig = np.copy(arr)
  236. for lag_n in range(1, lags_n + 1):
  237. arr = np.concatenate((arr, lag(arr_orig, lag_n)), axis=1)
  238. arr = arr[~np.isnan(arr).any(axis=1)]
  239. if train:
  240. if len(arr) > self.train_max_n:
  241. arr = arr[np.random.randint(arr.shape[0], size=self.train_max_n), :]
  242. arr = np.nan_to_num(arr)
  243. return arr
  244. def train(self, models_to_train=None, train_data_after=0, train_data_before=0):
  245. """Pull required training data and train a model for each specified model.
  246. :param models_to_train <list>: list of models to train on.
  247. :param train_data_after <int>: integer timestamp for start of train data.
  248. :param train_data_before <int>: integer timestamp for end of train data.
  249. """
  250. now = datetime.now().timestamp()
  251. if train_data_after > 0 and train_data_before > 0:
  252. before = train_data_before
  253. after = train_data_after
  254. else:
  255. before = int(now) - self.offset_n_secs
  256. after = before - self.train_n_secs
  257. # get training data
  258. df_train = get_data(
  259. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before,
  260. sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password,
  261. verify=self.tls_verify
  262. ).ffill()
  263. if self.custom_models:
  264. df_train = self.add_custom_models_dims(df_train)
  265. # train model
  266. self.try_fit(df_train, models_to_train=models_to_train)
  267. self.info(f'training complete in {round(time.time() - now, 2)} seconds (runs_counter={self.runs_counter}, model={self.model}, train_n_secs={self.train_n_secs}, models={len(self.fitted_at)}, n_fit_success={self.n_fit_success}, n_fit_fails={self.n_fit_fail}, after={after}, before={before}).')
  268. self.last_train_at = self.runs_counter
  269. def try_fit(self, df_train, models_to_train=None):
  270. """Try fit each model and try to fallback to a default model if fit fails for any reason.
  271. :param df_train <pd.DataFrame>: data to train on.
  272. :param models_to_train <list>: list of models to train.
  273. """
  274. if models_to_train is None:
  275. models_to_train = list(self.models.keys())
  276. self.n_fit_fail, self.n_fit_success = 0, 0
  277. for model in models_to_train:
  278. if model not in self.models:
  279. self.model_init(model)
  280. X_train = self.make_features(
  281. df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values,
  282. train=True, model=model)
  283. try:
  284. self.models[model].fit(X_train)
  285. self.n_fit_success += 1
  286. except Exception as e:
  287. self.n_fit_fail += 1
  288. self.info(e)
  289. self.info(f'training failed for {model} at run_counter {self.runs_counter}, defaulting to hbos model.')
  290. self.models[model] = HBOS(contamination=self.contamination)
  291. self.models[model].fit(X_train)
  292. self.fitted_at[model] = self.runs_counter
  293. def predict(self):
  294. """Get latest data, make it into a feature vector, and get predictions for each available model.
  295. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  296. """
  297. # get recent data to predict on
  298. df_allmetrics = get_allmetrics_async(
  299. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  300. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  301. )
  302. if self.custom_models:
  303. df_allmetrics = self.add_custom_models_dims(df_allmetrics)
  304. self.df_allmetrics = self.df_allmetrics.append(df_allmetrics).ffill().tail((max(self.lags_n.values()) + max(self.smooth_n.values()) + max(self.diffs_n.values())) * 2)
  305. # get predictions
  306. data_probability, data_anomaly = self.try_predict()
  307. return data_probability, data_anomaly
  308. def try_predict(self):
  309. """Try make prediction and fall back to last known prediction if fails.
  310. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  311. """
  312. data_probability, data_anomaly = {}, {}
  313. for model in self.fitted_at.keys():
  314. model_display_name = self.model_display_names[model]
  315. try:
  316. X_model = np.nan_to_num(
  317. self.make_features(
  318. self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
  319. model=model
  320. )[-1,:].reshape(1, -1)
  321. )
  322. data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000
  323. data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1]
  324. except Exception as _:
  325. #self.info(e)
  326. if model_display_name + '_prob' in self.data_latest:
  327. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.')
  328. data_probability[model_display_name + '_prob'] = self.data_latest[model_display_name + '_prob']
  329. data_anomaly[model_display_name + '_anomaly'] = self.data_latest[model_display_name + '_anomaly']
  330. else:
  331. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, skipping as no previous prediction.')
  332. continue
  333. return data_probability, data_anomaly
  334. def get_data(self):
  335. # initialize to what's available right now
  336. if self.reinitialize_at_every_step or len(self.host_charts_dict[self.host]) == 0:
  337. self.charts_init()
  338. self.custom_models_init()
  339. self.model_params_init()
  340. # if not all models have been trained then train those we need to
  341. if len(self.fitted_at) < len(self.models_in_scope):
  342. self.train(
  343. models_to_train=[m for m in self.models_in_scope if m not in self.fitted_at],
  344. train_data_after=self.initial_train_data_after,
  345. train_data_before=self.initial_train_data_before
  346. )
  347. # retrain all models as per schedule from config
  348. elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0:
  349. self.reinitialize()
  350. self.train()
  351. # roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly
  352. if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n:
  353. data_probability = self.data_probability_latest
  354. data_anomaly = self.data_anomaly_latest
  355. else:
  356. data_probability, data_anomaly = self.predict()
  357. if self.include_average_prob:
  358. average_prob = np.mean(list(data_probability.values()))
  359. data_probability['average_prob'] = 0 if np.isnan(average_prob) else average_prob
  360. data = {**data_probability, **data_anomaly}
  361. self.validate_charts('probability', data_probability, divisor=100)
  362. self.validate_charts('anomaly', data_anomaly)
  363. self.save_data_latest(data, data_probability, data_anomaly)
  364. #self.info(f'len(data)={len(data)}')
  365. #self.info(f'data')
  366. return data