anomalies.chart.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. # -*- coding: utf-8 -*-
  2. # Description: anomalies netdata python.d module
  3. # Author: andrewm4894
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. import time
  6. from datetime import datetime
  7. import re
  8. import warnings
  9. import requests
  10. import numpy as np
  11. import pandas as pd
  12. from netdata_pandas.data import get_data, get_allmetrics_async
  13. from pyod.models.hbos import HBOS
  14. from pyod.models.pca import PCA
  15. from pyod.models.loda import LODA
  16. from pyod.models.iforest import IForest
  17. from pyod.models.cblof import CBLOF
  18. from pyod.models.feature_bagging import FeatureBagging
  19. from pyod.models.copod import COPOD
  20. from sklearn.preprocessing import MinMaxScaler
  21. from bases.FrameworkServices.SimpleService import SimpleService
  22. # ignore some sklearn/numpy warnings that are ok
  23. warnings.filterwarnings('ignore', r'All-NaN slice encountered')
  24. warnings.filterwarnings('ignore', r'invalid value encountered in true_divide')
  25. disabled_by_default = True
  26. ORDER = ['probability', 'anomaly']
  27. CHARTS = {
  28. 'probability': {
  29. 'options': ['probability', 'Anomaly Probability', 'probability', 'anomalies', 'anomalies.probability', 'line'],
  30. 'lines': []
  31. },
  32. 'anomaly': {
  33. 'options': ['anomaly', 'Anomaly', 'count', 'anomalies', 'anomalies.anomaly', 'stacked'],
  34. 'lines': []
  35. },
  36. }
  37. class Service(SimpleService):
  38. def __init__(self, configuration=None, name=None):
  39. SimpleService.__init__(self, configuration=configuration, name=name)
  40. self.basic_init()
  41. self.charts_init()
  42. self.custom_models_init()
  43. self.model_params_init()
  44. self.models_init()
  45. def check(self):
  46. _ = get_allmetrics_async(
  47. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  48. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  49. )
  50. return True
  51. def basic_init(self):
  52. """Perform some basic initialization.
  53. """
  54. self.order = ORDER
  55. self.definitions = CHARTS
  56. self.protocol = self.configuration.get('protocol', 'http')
  57. self.host = self.configuration.get('host', '127.0.0.1:19999')
  58. self.username = self.configuration.get('username', None)
  59. self.password = self.configuration.get('password', None)
  60. self.fitted_at = {}
  61. self.df_allmetrics = pd.DataFrame()
  62. self.data_latest = {}
  63. self.expected_cols = []
  64. self.last_train_at = 0
  65. self.include_average_prob = bool(self.configuration.get('include_average_prob', True))
  66. def charts_init(self):
  67. """Do some initialisation of charts in scope related variables.
  68. """
  69. self.charts_regex = re.compile(self.configuration.get('charts_regex','None'))
  70. self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts').json().get('charts', {}).keys())]
  71. self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available))
  72. self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',')
  73. if len(self.charts_to_exclude) > 0:
  74. self.charts_in_scope = [c for c in self.charts_in_scope if c not in self.charts_to_exclude]
  75. def custom_models_init(self):
  76. """Perform initialization steps related to custom models.
  77. """
  78. self.custom_models = self.configuration.get('custom_models', None)
  79. self.custom_models_normalize = bool(self.configuration.get('custom_models_normalize', False))
  80. if self.custom_models:
  81. self.custom_models_names = [model['name'] for model in self.custom_models]
  82. self.custom_models_dims = [i for s in [model['dimensions'].split(',') for model in self.custom_models] for i in s]
  83. self.custom_models_dims = [dim if '::' in dim else f'{self.host}::{dim}' for dim in self.custom_models_dims]
  84. self.custom_models_charts = list(set([dim.split('|')[0].split('::')[1] for dim in self.custom_models_dims]))
  85. self.custom_models_hosts = list(set([dim.split('::')[0] for dim in self.custom_models_dims]))
  86. self.custom_models_host_charts_dict = {}
  87. for host in self.custom_models_hosts:
  88. self.custom_models_host_charts_dict[host] = list(set([dim.split('::')[1].split('|')[0] for dim in self.custom_models_dims if dim.startswith(host)]))
  89. self.custom_models_dims_renamed = [f"{model['name']}.{dim}" for model in self.custom_models for dim in model['dimensions'].split(',')]
  90. self.models_in_scope = list(set([f'{self.host}::{c}' for c in self.charts_in_scope] + self.custom_models_names))
  91. self.charts_in_scope = list(set(self.charts_in_scope + self.custom_models_charts))
  92. self.host_charts_dict = {self.host: self.charts_in_scope}
  93. for host in self.custom_models_host_charts_dict:
  94. if host not in self.host_charts_dict:
  95. self.host_charts_dict[host] = self.custom_models_host_charts_dict[host]
  96. else:
  97. for chart in self.custom_models_host_charts_dict[host]:
  98. if chart not in self.host_charts_dict[host]:
  99. self.host_charts_dict[host].extend(chart)
  100. else:
  101. self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope]
  102. self.host_charts_dict = {self.host: self.charts_in_scope}
  103. self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope}
  104. def model_params_init(self):
  105. """Model paramaters initialisation.
  106. """
  107. self.train_max_n = self.configuration.get('train_max_n', 100000)
  108. self.train_n_secs = self.configuration.get('train_n_secs', 14400)
  109. self.offset_n_secs = self.configuration.get('offset_n_secs', 0)
  110. self.train_every_n = self.configuration.get('train_every_n', 1800)
  111. self.train_no_prediction_n = self.configuration.get('train_no_prediction_n', 10)
  112. self.initial_train_data_after = self.configuration.get('initial_train_data_after', 0)
  113. self.initial_train_data_before = self.configuration.get('initial_train_data_before', 0)
  114. self.contamination = self.configuration.get('contamination', 0.001)
  115. self.lags_n = {model: self.configuration.get('lags_n', 5) for model in self.models_in_scope}
  116. self.smooth_n = {model: self.configuration.get('smooth_n', 5) for model in self.models_in_scope}
  117. self.diffs_n = {model: self.configuration.get('diffs_n', 5) for model in self.models_in_scope}
  118. def models_init(self):
  119. """Models initialisation.
  120. """
  121. self.model = self.configuration.get('model', 'pca')
  122. if self.model == 'pca':
  123. self.models = {model: PCA(contamination=self.contamination) for model in self.models_in_scope}
  124. elif self.model == 'loda':
  125. self.models = {model: LODA(contamination=self.contamination) for model in self.models_in_scope}
  126. elif self.model == 'iforest':
  127. self.models = {model: IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination) for model in self.models_in_scope}
  128. elif self.model == 'cblof':
  129. self.models = {model: CBLOF(n_clusters=3, contamination=self.contamination) for model in self.models_in_scope}
  130. elif self.model == 'feature_bagging':
  131. self.models = {model: FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination) for model in self.models_in_scope}
  132. elif self.model == 'copod':
  133. self.models = {model: COPOD(contamination=self.contamination) for model in self.models_in_scope}
  134. elif self.model == 'hbos':
  135. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  136. else:
  137. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  138. self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope}
  139. def validate_charts(self, name, data, algorithm='absolute', multiplier=1, divisor=1):
  140. """If dimension not in chart then add it.
  141. """
  142. for dim in data:
  143. if dim not in self.charts[name]:
  144. self.charts[name].add_dimension([dim, dim, algorithm, multiplier, divisor])
  145. def add_custom_models_dims(self, df):
  146. """Given a df, select columns used by custom models, add custom model name as prefix, and append to df.
  147. :param df <pd.DataFrame>: dataframe to append new renamed columns to.
  148. :return: <pd.DataFrame> dataframe with additional columns added relating to the specified custom models.
  149. """
  150. df_custom = df[self.custom_models_dims].copy()
  151. df_custom.columns = self.custom_models_dims_renamed
  152. df = df.join(df_custom)
  153. return df
  154. def make_features(self, arr, train=False, model=None):
  155. """Take in numpy array and preprocess accordingly by taking diffs, smoothing and adding lags.
  156. :param arr <np.ndarray>: numpy array we want to make features from.
  157. :param train <bool>: True if making features for training, in which case need to fit_transform scaler and maybe sample train_max_n.
  158. :param model <str>: model to make features for.
  159. :return: <np.ndarray> transformed numpy array.
  160. """
  161. def lag(arr, n):
  162. res = np.empty_like(arr)
  163. res[:n] = np.nan
  164. res[n:] = arr[:-n]
  165. return res
  166. arr = np.nan_to_num(arr)
  167. diffs_n = self.diffs_n[model]
  168. smooth_n = self.smooth_n[model]
  169. lags_n = self.lags_n[model]
  170. if self.custom_models_normalize and model in self.custom_models_names:
  171. if train:
  172. arr = self.custom_model_scalers[model].fit_transform(arr)
  173. else:
  174. arr = self.custom_model_scalers[model].transform(arr)
  175. if diffs_n > 0:
  176. arr = np.diff(arr, diffs_n, axis=0)
  177. arr = arr[~np.isnan(arr).any(axis=1)]
  178. if smooth_n > 1:
  179. arr = np.cumsum(arr, axis=0, dtype=float)
  180. arr[smooth_n:] = arr[smooth_n:] - arr[:-smooth_n]
  181. arr = arr[smooth_n - 1:] / smooth_n
  182. arr = arr[~np.isnan(arr).any(axis=1)]
  183. if lags_n > 0:
  184. arr_orig = np.copy(arr)
  185. for lag_n in range(1, lags_n + 1):
  186. arr = np.concatenate((arr, lag(arr_orig, lag_n)), axis=1)
  187. arr = arr[~np.isnan(arr).any(axis=1)]
  188. if train:
  189. if len(arr) > self.train_max_n:
  190. arr = arr[np.random.randint(arr.shape[0], size=self.train_max_n), :]
  191. arr = np.nan_to_num(arr)
  192. return arr
  193. def train(self, models_to_train=None, train_data_after=0, train_data_before=0):
  194. """Pull required training data and train a model for each specified model.
  195. :param models_to_train <list>: list of models to train on.
  196. :param train_data_after <int>: integer timestamp for start of train data.
  197. :param train_data_before <int>: integer timestamp for end of train data.
  198. """
  199. now = datetime.now().timestamp()
  200. if train_data_after > 0 and train_data_before > 0:
  201. before = train_data_before
  202. after = train_data_after
  203. else:
  204. before = int(now) - self.offset_n_secs
  205. after = before - self.train_n_secs
  206. # get training data
  207. df_train = get_data(
  208. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before,
  209. sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password
  210. ).ffill()
  211. self.expected_cols = list(df_train.columns)
  212. if self.custom_models:
  213. df_train = self.add_custom_models_dims(df_train)
  214. # train model
  215. self.try_fit(df_train, models_to_train=models_to_train)
  216. self.info(f'training complete in {round(time.time() - now, 2)} seconds (runs_counter={self.runs_counter}, model={self.model}, train_n_secs={self.train_n_secs}, models={len(self.fitted_at)}, n_fit_success={self.n_fit_success}, n_fit_fails={self.n_fit_fail}, after={after}, before={before}).')
  217. self.last_train_at = self.runs_counter
  218. def try_fit(self, df_train, models_to_train=None):
  219. """Try fit each model and try to fallback to a default model if fit fails for any reason.
  220. :param df_train <pd.DataFrame>: data to train on.
  221. :param models_to_train <list>: list of models to train.
  222. """
  223. if models_to_train is None:
  224. models_to_train = list(self.models.keys())
  225. self.n_fit_fail, self.n_fit_success = 0, 0
  226. for model in models_to_train:
  227. X_train = self.make_features(
  228. df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values,
  229. train=True, model=model)
  230. try:
  231. self.models[model].fit(X_train)
  232. self.n_fit_success += 1
  233. except Exception as e:
  234. self.n_fit_fail += 1
  235. self.info(e)
  236. self.info(f'training failed for {model} at run_counter {self.runs_counter}, defaulting to hbos model.')
  237. self.models[model] = HBOS(contamination=self.contamination)
  238. self.models[model].fit(X_train)
  239. self.fitted_at[model] = self.runs_counter
  240. def predict(self):
  241. """Get latest data, make it into a feature vector, and get predictions for each available model.
  242. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  243. """
  244. # get recent data to predict on
  245. df_allmetrics = get_allmetrics_async(
  246. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  247. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  248. )[self.expected_cols]
  249. if self.custom_models:
  250. df_allmetrics = self.add_custom_models_dims(df_allmetrics)
  251. self.df_allmetrics = self.df_allmetrics.append(df_allmetrics).ffill().tail((max(self.lags_n.values()) + max(self.smooth_n.values()) + max(self.diffs_n.values())) * 2)
  252. # get predictions
  253. data_probability, data_anomaly = self.try_predict()
  254. return data_probability, data_anomaly
  255. def try_predict(self):
  256. """Try make prediction and fall back to last known prediction if fails.
  257. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  258. """
  259. data_probability, data_anomaly = {}, {}
  260. for model in self.fitted_at.keys():
  261. model_display_name = self.model_display_names[model]
  262. X_model = np.nan_to_num(self.make_features(
  263. self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
  264. model=model)[-1,:].reshape(1, -1))
  265. try:
  266. data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000
  267. data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1]
  268. except Exception:
  269. #self.info(e)
  270. if model_display_name + '_prob' in self.data_latest:
  271. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.')
  272. data_probability[model_display_name + '_prob'] = self.data_latest[model_display_name + '_prob']
  273. data_anomaly[model_display_name + '_anomaly'] = self.data_latest[model_display_name + '_anomaly']
  274. else:
  275. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, skipping as no previous prediction.')
  276. continue
  277. return data_probability, data_anomaly
  278. def get_data(self):
  279. # if not all models have been trained then train those we need to
  280. if len(self.fitted_at) < len(self.models):
  281. self.train(
  282. models_to_train=[m for m in self.models if m not in self.fitted_at],
  283. train_data_after=self.initial_train_data_after,
  284. train_data_before=self.initial_train_data_before)
  285. # retrain all models as per schedule from config
  286. elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0:
  287. self.train()
  288. # roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly
  289. if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n:
  290. data = self.data_latest
  291. else:
  292. data_probability, data_anomaly = self.predict()
  293. if self.include_average_prob:
  294. data_probability['average_prob'] = np.mean(list(data_probability.values()))
  295. data = {**data_probability, **data_anomaly}
  296. self.validate_charts('probability', data_probability, divisor=100)
  297. self.validate_charts('anomaly', data_anomaly)
  298. self.data_latest = data
  299. return data