anomalies.chart.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. # -*- coding: utf-8 -*-
  2. # Description: anomalies netdata python.d module
  3. # Author: andrewm4894
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. import time
  6. from datetime import datetime
  7. import re
  8. import warnings
  9. import requests
  10. import numpy as np
  11. import pandas as pd
  12. from netdata_pandas.data import get_data, get_allmetrics_async
  13. from pyod.models.hbos import HBOS
  14. from pyod.models.pca import PCA
  15. from pyod.models.loda import LODA
  16. from pyod.models.iforest import IForest
  17. from pyod.models.cblof import CBLOF
  18. from pyod.models.feature_bagging import FeatureBagging
  19. from pyod.models.copod import COPOD
  20. from sklearn.preprocessing import MinMaxScaler
  21. from bases.FrameworkServices.SimpleService import SimpleService
  22. # ignore some sklearn/numpy warnings that are ok
  23. warnings.filterwarnings('ignore', r'All-NaN slice encountered')
  24. warnings.filterwarnings('ignore', r'invalid value encountered in true_divide')
  25. warnings.filterwarnings('ignore', r'divide by zero encountered in true_divide')
  26. warnings.filterwarnings('ignore', r'invalid value encountered in subtract')
  27. disabled_by_default = True
  28. ORDER = ['probability', 'anomaly']
  29. CHARTS = {
  30. 'probability': {
  31. 'options': ['probability', 'Anomaly Probability', 'probability', 'anomalies', 'anomalies.probability', 'line'],
  32. 'lines': []
  33. },
  34. 'anomaly': {
  35. 'options': ['anomaly', 'Anomaly', 'count', 'anomalies', 'anomalies.anomaly', 'stacked'],
  36. 'lines': []
  37. },
  38. }
  39. class Service(SimpleService):
  40. def __init__(self, configuration=None, name=None):
  41. SimpleService.__init__(self, configuration=configuration, name=name)
  42. self.basic_init()
  43. self.charts_init()
  44. self.custom_models_init()
  45. self.model_params_init()
  46. self.models_init()
  47. def check(self):
  48. _ = get_allmetrics_async(
  49. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  50. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  51. )
  52. return True
  53. def basic_init(self):
  54. """Perform some basic initialization.
  55. """
  56. self.order = ORDER
  57. self.definitions = CHARTS
  58. self.protocol = self.configuration.get('protocol', 'http')
  59. self.host = self.configuration.get('host', '127.0.0.1:19999')
  60. self.username = self.configuration.get('username', None)
  61. self.password = self.configuration.get('password', None)
  62. self.fitted_at = {}
  63. self.df_allmetrics = pd.DataFrame()
  64. self.data_latest = {}
  65. self.last_train_at = 0
  66. self.include_average_prob = bool(self.configuration.get('include_average_prob', True))
  67. def charts_init(self):
  68. """Do some initialisation of charts in scope related variables.
  69. """
  70. self.charts_regex = re.compile(self.configuration.get('charts_regex','None'))
  71. self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts').json().get('charts', {}).keys())]
  72. self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available))
  73. self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',')
  74. if len(self.charts_to_exclude) > 0:
  75. self.charts_in_scope = [c for c in self.charts_in_scope if c not in self.charts_to_exclude]
  76. def custom_models_init(self):
  77. """Perform initialization steps related to custom models.
  78. """
  79. self.custom_models = self.configuration.get('custom_models', None)
  80. self.custom_models_normalize = bool(self.configuration.get('custom_models_normalize', False))
  81. if self.custom_models:
  82. self.custom_models_names = [model['name'] for model in self.custom_models]
  83. self.custom_models_dims = [i for s in [model['dimensions'].split(',') for model in self.custom_models] for i in s]
  84. self.custom_models_dims = [dim if '::' in dim else f'{self.host}::{dim}' for dim in self.custom_models_dims]
  85. self.custom_models_charts = list(set([dim.split('|')[0].split('::')[1] for dim in self.custom_models_dims]))
  86. self.custom_models_hosts = list(set([dim.split('::')[0] for dim in self.custom_models_dims]))
  87. self.custom_models_host_charts_dict = {}
  88. for host in self.custom_models_hosts:
  89. self.custom_models_host_charts_dict[host] = list(set([dim.split('::')[1].split('|')[0] for dim in self.custom_models_dims if dim.startswith(host)]))
  90. self.custom_models_dims_renamed = [f"{model['name']}|{dim}" for model in self.custom_models for dim in model['dimensions'].split(',')]
  91. self.models_in_scope = list(set([f'{self.host}::{c}' for c in self.charts_in_scope] + self.custom_models_names))
  92. self.charts_in_scope = list(set(self.charts_in_scope + self.custom_models_charts))
  93. self.host_charts_dict = {self.host: self.charts_in_scope}
  94. for host in self.custom_models_host_charts_dict:
  95. if host not in self.host_charts_dict:
  96. self.host_charts_dict[host] = self.custom_models_host_charts_dict[host]
  97. else:
  98. for chart in self.custom_models_host_charts_dict[host]:
  99. if chart not in self.host_charts_dict[host]:
  100. self.host_charts_dict[host].extend(chart)
  101. else:
  102. self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope]
  103. self.host_charts_dict = {self.host: self.charts_in_scope}
  104. self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope}
  105. def model_params_init(self):
  106. """Model parameters initialisation.
  107. """
  108. self.train_max_n = self.configuration.get('train_max_n', 100000)
  109. self.train_n_secs = self.configuration.get('train_n_secs', 14400)
  110. self.offset_n_secs = self.configuration.get('offset_n_secs', 0)
  111. self.train_every_n = self.configuration.get('train_every_n', 1800)
  112. self.train_no_prediction_n = self.configuration.get('train_no_prediction_n', 10)
  113. self.initial_train_data_after = self.configuration.get('initial_train_data_after', 0)
  114. self.initial_train_data_before = self.configuration.get('initial_train_data_before', 0)
  115. self.contamination = self.configuration.get('contamination', 0.001)
  116. self.lags_n = {model: self.configuration.get('lags_n', 5) for model in self.models_in_scope}
  117. self.smooth_n = {model: self.configuration.get('smooth_n', 5) for model in self.models_in_scope}
  118. self.diffs_n = {model: self.configuration.get('diffs_n', 5) for model in self.models_in_scope}
  119. def models_init(self):
  120. """Models initialisation.
  121. """
  122. self.model = self.configuration.get('model', 'pca')
  123. if self.model == 'pca':
  124. self.models = {model: PCA(contamination=self.contamination) for model in self.models_in_scope}
  125. elif self.model == 'loda':
  126. self.models = {model: LODA(contamination=self.contamination) for model in self.models_in_scope}
  127. elif self.model == 'iforest':
  128. self.models = {model: IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination) for model in self.models_in_scope}
  129. elif self.model == 'cblof':
  130. self.models = {model: CBLOF(n_clusters=3, contamination=self.contamination) for model in self.models_in_scope}
  131. elif self.model == 'feature_bagging':
  132. self.models = {model: FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination) for model in self.models_in_scope}
  133. elif self.model == 'copod':
  134. self.models = {model: COPOD(contamination=self.contamination) for model in self.models_in_scope}
  135. elif self.model == 'hbos':
  136. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  137. else:
  138. self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
  139. self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope}
  140. def validate_charts(self, name, data, algorithm='absolute', multiplier=1, divisor=1):
  141. """If dimension not in chart then add it.
  142. """
  143. for dim in data:
  144. if dim not in self.charts[name]:
  145. self.charts[name].add_dimension([dim, dim, algorithm, multiplier, divisor])
  146. def add_custom_models_dims(self, df):
  147. """Given a df, select columns used by custom models, add custom model name as prefix, and append to df.
  148. :param df <pd.DataFrame>: dataframe to append new renamed columns to.
  149. :return: <pd.DataFrame> dataframe with additional columns added relating to the specified custom models.
  150. """
  151. df_custom = df[self.custom_models_dims].copy()
  152. df_custom.columns = self.custom_models_dims_renamed
  153. df = df.join(df_custom)
  154. return df
  155. def make_features(self, arr, train=False, model=None):
  156. """Take in numpy array and preprocess accordingly by taking diffs, smoothing and adding lags.
  157. :param arr <np.ndarray>: numpy array we want to make features from.
  158. :param train <bool>: True if making features for training, in which case need to fit_transform scaler and maybe sample train_max_n.
  159. :param model <str>: model to make features for.
  160. :return: <np.ndarray> transformed numpy array.
  161. """
  162. def lag(arr, n):
  163. res = np.empty_like(arr)
  164. res[:n] = np.nan
  165. res[n:] = arr[:-n]
  166. return res
  167. arr = np.nan_to_num(arr)
  168. diffs_n = self.diffs_n[model]
  169. smooth_n = self.smooth_n[model]
  170. lags_n = self.lags_n[model]
  171. if self.custom_models_normalize and model in self.custom_models_names:
  172. if train:
  173. arr = self.custom_model_scalers[model].fit_transform(arr)
  174. else:
  175. arr = self.custom_model_scalers[model].transform(arr)
  176. if diffs_n > 0:
  177. arr = np.diff(arr, diffs_n, axis=0)
  178. arr = arr[~np.isnan(arr).any(axis=1)]
  179. if smooth_n > 1:
  180. arr = np.cumsum(arr, axis=0, dtype=float)
  181. arr[smooth_n:] = arr[smooth_n:] - arr[:-smooth_n]
  182. arr = arr[smooth_n - 1:] / smooth_n
  183. arr = arr[~np.isnan(arr).any(axis=1)]
  184. if lags_n > 0:
  185. arr_orig = np.copy(arr)
  186. for lag_n in range(1, lags_n + 1):
  187. arr = np.concatenate((arr, lag(arr_orig, lag_n)), axis=1)
  188. arr = arr[~np.isnan(arr).any(axis=1)]
  189. if train:
  190. if len(arr) > self.train_max_n:
  191. arr = arr[np.random.randint(arr.shape[0], size=self.train_max_n), :]
  192. arr = np.nan_to_num(arr)
  193. return arr
  194. def train(self, models_to_train=None, train_data_after=0, train_data_before=0):
  195. """Pull required training data and train a model for each specified model.
  196. :param models_to_train <list>: list of models to train on.
  197. :param train_data_after <int>: integer timestamp for start of train data.
  198. :param train_data_before <int>: integer timestamp for end of train data.
  199. """
  200. now = datetime.now().timestamp()
  201. if train_data_after > 0 and train_data_before > 0:
  202. before = train_data_before
  203. after = train_data_after
  204. else:
  205. before = int(now) - self.offset_n_secs
  206. after = before - self.train_n_secs
  207. # get training data
  208. df_train = get_data(
  209. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before,
  210. sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password
  211. ).ffill()
  212. if self.custom_models:
  213. df_train = self.add_custom_models_dims(df_train)
  214. # train model
  215. self.try_fit(df_train, models_to_train=models_to_train)
  216. self.info(f'training complete in {round(time.time() - now, 2)} seconds (runs_counter={self.runs_counter}, model={self.model}, train_n_secs={self.train_n_secs}, models={len(self.fitted_at)}, n_fit_success={self.n_fit_success}, n_fit_fails={self.n_fit_fail}, after={after}, before={before}).')
  217. self.last_train_at = self.runs_counter
  218. def try_fit(self, df_train, models_to_train=None):
  219. """Try fit each model and try to fallback to a default model if fit fails for any reason.
  220. :param df_train <pd.DataFrame>: data to train on.
  221. :param models_to_train <list>: list of models to train.
  222. """
  223. if models_to_train is None:
  224. models_to_train = list(self.models.keys())
  225. self.n_fit_fail, self.n_fit_success = 0, 0
  226. for model in models_to_train:
  227. X_train = self.make_features(
  228. df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values,
  229. train=True, model=model)
  230. try:
  231. self.models[model].fit(X_train)
  232. self.n_fit_success += 1
  233. except Exception as e:
  234. self.n_fit_fail += 1
  235. self.info(e)
  236. self.info(f'training failed for {model} at run_counter {self.runs_counter}, defaulting to hbos model.')
  237. self.models[model] = HBOS(contamination=self.contamination)
  238. self.models[model].fit(X_train)
  239. self.fitted_at[model] = self.runs_counter
  240. def predict(self):
  241. """Get latest data, make it into a feature vector, and get predictions for each available model.
  242. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  243. """
  244. # get recent data to predict on
  245. df_allmetrics = get_allmetrics_async(
  246. host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
  247. protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
  248. )
  249. if self.custom_models:
  250. df_allmetrics = self.add_custom_models_dims(df_allmetrics)
  251. self.df_allmetrics = self.df_allmetrics.append(df_allmetrics).ffill().tail((max(self.lags_n.values()) + max(self.smooth_n.values()) + max(self.diffs_n.values())) * 2)
  252. # get predictions
  253. data_probability, data_anomaly = self.try_predict()
  254. return data_probability, data_anomaly
  255. def try_predict(self):
  256. """Try make prediction and fall back to last known prediction if fails.
  257. :return: (<dict>,<dict>) tuple of dictionaries, one for probability scores and the other for anomaly predictions.
  258. """
  259. data_probability, data_anomaly = {}, {}
  260. for model in self.fitted_at.keys():
  261. model_display_name = self.model_display_names[model]
  262. X_model = np.nan_to_num(self.make_features(
  263. self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
  264. model=model)[-1,:].reshape(1, -1))
  265. try:
  266. data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000
  267. data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1]
  268. except Exception:
  269. #self.info(e)
  270. if model_display_name + '_prob' in self.data_latest:
  271. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.')
  272. data_probability[model_display_name + '_prob'] = self.data_latest[model_display_name + '_prob']
  273. data_anomaly[model_display_name + '_anomaly'] = self.data_latest[model_display_name + '_anomaly']
  274. else:
  275. #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, skipping as no previous prediction.')
  276. continue
  277. return data_probability, data_anomaly
  278. def get_data(self):
  279. # if not all models have been trained then train those we need to
  280. if len(self.fitted_at) < len(self.models):
  281. self.train(
  282. models_to_train=[m for m in self.models if m not in self.fitted_at],
  283. train_data_after=self.initial_train_data_after,
  284. train_data_before=self.initial_train_data_before)
  285. # retrain all models as per schedule from config
  286. elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0:
  287. self.train()
  288. # roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly
  289. if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n:
  290. data = self.data_latest
  291. else:
  292. data_probability, data_anomaly = self.predict()
  293. if self.include_average_prob:
  294. data_probability['average_prob'] = np.mean(list(data_probability.values()))
  295. data = {**data_probability, **data_anomaly}
  296. self.validate_charts('probability', data_probability, divisor=100)
  297. self.validate_charts('anomaly', data_anomaly)
  298. self.data_latest = data
  299. return data