python.d.plugin.in 14 KB


  1. #!/usr/bin/env bash
  2. '''':; exec "$(command -v python || command -v python3 || command -v python2 ||
  3. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
  4. # -*- coding: utf-8 -*-
  5. # Description:
  6. # Author: Pawel Krupa (paulfantom)
  7. # Author: Ilya Mashchenko (l2isbad)
  8. # SPDX-License-Identifier: GPL-3.0-or-later
  9. import gc
  10. import os
  11. import sys
  12. import threading
  13. from re import sub
  14. from sys import version_info, argv
  15. from time import sleep
  16. GC_RUN = True
  17. GC_COLLECT_EVERY = 300
  18. PY_VERSION = version_info[:2]
  19. USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
  20. STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
  21. PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
  22. PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
  23. PLUGINS_DIR = os.path.abspath(os.getenv(
  24. 'NETDATA_PLUGINS_DIR',
  25. os.path.dirname(__file__)) + '/../python.d')
  26. PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
  27. sys.path.append(PYTHON_MODULES_DIR)
  28. from bases.loaders import ModuleAndConfigLoader # noqa: E402
  29. from bases.loggers import PythonDLogger # noqa: E402
  30. from bases.collection import setdefault_values, run_and_exit # noqa: E402
  31. try:
  32. from collections import OrderedDict
  33. except ImportError:
  34. from third_party.ordereddict import OrderedDict
  35. BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
  36. 'retries': 60,
  37. 'priority': 60000,
  38. 'autodetection_retry': 0,
  39. 'chart_cleanup': 10,
  40. 'name': str()}
  41. MODULE_EXTENSION = '.chart.py'
  42. OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq']
  43. def module_ok(m):
  44. return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
  45. ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
  46. def parse_cmd():
  47. debug = 'debug' in argv[1:]
  48. trace = 'trace' in argv[1:]
  49. override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
  50. modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
  51. return debug, trace, override_update_every, modules or ALL_MODULES
  52. def multi_job_check(config):
  53. return next((True for key in config if isinstance(config[key], dict)), False)
  54. class RawModule:
  55. def __init__(self, name, path, explicitly_enabled=True):
  56. self.name = name
  57. self.path = path
  58. self.explicitly_enabled = explicitly_enabled
  59. class Job(object):
  60. def __init__(self, initialized_job, job_id):
  61. """
  62. :param initialized_job: instance of <Class Service>
  63. :param job_id: <str>
  64. """
  65. self.job = initialized_job
  66. self.id = job_id # key in Modules.jobs()
  67. self.module_name = self.job.__module__ # used in Plugin.delete_job()
  68. self.recheck_every = self.job.configuration.pop('autodetection_retry')
  69. self.checked = False # used in Plugin.check_job()
  70. self.created = False # used in Plugin.create_job_charts()
  71. if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
  72. self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
  73. def __getattr__(self, item):
  74. return getattr(self.job, item)
  75. def __repr__(self):
  76. return self.job.__repr__()
  77. def is_dead(self):
  78. return bool(self.ident) and not self.is_alive()
  79. def not_launched(self):
  80. return not bool(self.ident)
  81. def is_autodetect(self):
  82. return self.recheck_every
  83. class Module(object):
  84. def __init__(self, service, config):
  85. """
  86. :param service: <Module>
  87. :param config: <dict>
  88. """
  89. self.service = service
  90. self.name = service.__name__
  91. self.config = self.jobs_configurations_builder(config)
  92. self.jobs = OrderedDict()
  93. self.counter = 1
  94. self.initialize_jobs()
  95. def __repr__(self):
  96. return "<Class Module '{name}'>".format(name=self.name)
  97. def __iter__(self):
  98. return iter(OrderedDict(self.jobs).values())
  99. def __getitem__(self, item):
  100. return self.jobs[item]
  101. def __delitem__(self, key):
  102. del self.jobs[key]
  103. def __len__(self):
  104. return len(self.jobs)
  105. def __bool__(self):
  106. return bool(self.jobs)
  107. def __nonzero__(self):
  108. return self.__bool__()
  109. def jobs_configurations_builder(self, config):
  110. """
  111. :param config: <dict>
  112. :return:
  113. """
  114. counter = 0
  115. job_base_config = dict()
  116. for attr in BASE_CONFIG:
  117. job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
  118. if not config:
  119. config = {str(): dict()}
  120. elif not multi_job_check(config):
  121. config = {str(): config}
  122. for job_name in config:
  123. if not isinstance(config[job_name], dict):
  124. continue
  125. job_config = setdefault_values(config[job_name], base_dict=job_base_config)
  126. job_name = sub(r'\s+', '_', job_name)
  127. config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
  128. counter += 1
  129. job_id = 'job' + str(counter).zfill(3)
  130. yield job_id, job_name, job_config
  131. def initialize_jobs(self):
  132. """
  133. :return:
  134. """
  135. for job_id, job_name, job_config in self.config:
  136. job_config['job_name'] = job_name
  137. job_config['override_name'] = job_config.pop('name')
  138. try:
  139. initialized_job = self.service.Service(configuration=job_config)
  140. except Exception as error:
  141. Logger.error("job initialization: '{module_name} {job_name}' "
  142. "=> ['FAILED'] ({error})".format(module_name=self.name,
  143. job_name=job_name,
  144. error=error))
  145. continue
  146. else:
  147. Logger.debug("job initialization: '{module_name} {job_name}' "
  148. "=> ['OK']".format(module_name=self.name,
  149. job_name=job_name or self.name))
  150. self.jobs[job_id] = Job(initialized_job=initialized_job,
  151. job_id=job_id)
  152. del self.config
  153. del self.service
  154. class Plugin(object):
  155. def __init__(self):
  156. self.loader = ModuleAndConfigLoader()
  157. self.modules = OrderedDict()
  158. self.sleep_time = 1
  159. self.runs_counter = 0
  160. user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
  161. stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
  162. Logger.debug("loading '{0}'".format(user_config))
  163. self.config, error = self.loader.load_config_from_file(user_config)
  164. if error:
  165. Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
  166. Logger.debug("loading '{0}'".format(stock_config))
  167. self.config, error = self.loader.load_config_from_file(stock_config)
  168. if error:
  169. Logger.error("cannot load '{0}': {1}".format(stock_config, error))
  170. self.do_gc = self.config.get("gc_run", GC_RUN)
  171. self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
  172. if not self.config.get('enabled', True):
  173. run_and_exit(Logger.info)('DISABLED in configuration file.')
  174. self.load_and_initialize_modules()
  175. if not self.modules:
  176. run_and_exit(Logger.info)('No modules to run. Exit...')
  177. def __iter__(self):
  178. return iter(OrderedDict(self.modules).values())
  179. @property
  180. def jobs(self):
  181. return (job for mod in self for job in mod)
  182. @property
  183. def dead_jobs(self):
  184. return (job for job in self.jobs if job.is_dead())
  185. @property
  186. def autodetect_jobs(self):
  187. return [job for job in self.jobs if job.not_launched()]
  188. def enabled_modules(self):
  189. for mod in MODULES_TO_RUN:
  190. mod_name = mod[:-len(MODULE_EXTENSION)]
  191. mod_path = os.path.join(PLUGINS_DIR, mod)
  192. if any(
  193. [
  194. self.config.get('default_run', True) and self.config.get(mod_name, True),
  195. (not self.config.get('default_run')) and self.config.get(mod_name),
  196. ]
  197. ):
  198. yield RawModule(
  199. name=mod_name,
  200. path=mod_path,
  201. explicitly_enabled=self.config.get(mod_name),
  202. )
  203. def load_and_initialize_modules(self):
  204. for mod in self.enabled_modules():
  205. # Load module from file ------------------------------------------------------------
  206. loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
  207. log = Logger.error if error else Logger.debug
  208. log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
  209. module_name=mod.name))
  210. if error:
  211. Logger.error("load source error : {0}".format(error))
  212. continue
  213. # Load module config from file ------------------------------------------------------
  214. user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
  215. stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
  216. Logger.debug("loading '{0}'".format(user_config))
  217. loaded_config, error = self.loader.load_config_from_file(user_config)
  218. if error:
  219. Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
  220. Logger.debug("loading '{0}'".format(stock_config))
  221. loaded_config, error = self.loader.load_config_from_file(stock_config)
  222. if error:
  223. Logger.error("cannot load '{0}': {1}".format(stock_config, error))
  224. # Skip disabled modules
  225. if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
  226. Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
  227. continue
  228. # Module initialization ---------------------------------------------------
  229. initialized_module = Module(service=loaded_module, config=loaded_config)
  230. Logger.debug("module status: '{module_name}' => [{status}] "
  231. "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
  232. module_name=initialized_module.name,
  233. jobs_number=len(initialized_module)))
  234. if initialized_module:
  235. self.modules[initialized_module.name] = initialized_module
  236. @staticmethod
  237. def check_job(job):
  238. """
  239. :param job: <Job>
  240. :return:
  241. """
  242. try:
  243. check_ok = bool(job.check())
  244. except Exception as error:
  245. job.error('check() unhandled exception: {error}'.format(error=error))
  246. return None
  247. else:
  248. return check_ok
  249. @staticmethod
  250. def create_job_charts(job):
  251. """
  252. :param job: <Job>
  253. :return:
  254. """
  255. try:
  256. create_ok = job.create()
  257. except Exception as error:
  258. job.error('create() unhandled exception: {error}'.format(error=error))
  259. return False
  260. else:
  261. return create_ok
  262. def delete_job(self, job):
  263. """
  264. :param job: <Job>
  265. :return:
  266. """
  267. del self.modules[job.module_name][job.id]
  268. def run_check(self):
  269. checked = list()
  270. for job in self.jobs:
  271. if job.name in checked:
  272. job.info('check() => [DROPPED] (already served by another job)')
  273. self.delete_job(job)
  274. continue
  275. ok = self.check_job(job)
  276. if ok:
  277. job.info('check() => [OK]')
  278. checked.append(job.name)
  279. job.checked = True
  280. continue
  281. if not job.is_autodetect() or ok is None:
  282. job.info('check() => [FAILED]')
  283. self.delete_job(job)
  284. else:
  285. job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
  286. def run_create(self):
  287. for job in self.jobs:
  288. if not job.checked:
  289. # skip autodetection_retry jobs
  290. continue
  291. ok = self.create_job_charts(job)
  292. if ok:
  293. job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
  294. job.created = True
  295. continue
  296. job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
  297. self.delete_job(job)
  298. def start(self):
  299. self.run_check()
  300. self.run_create()
  301. for job in self.jobs:
  302. if job.created:
  303. job.start()
  304. while True:
  305. if threading.active_count() <= 1 and not self.autodetect_jobs:
  306. run_and_exit(Logger.info)('FINISHED')
  307. sleep(self.sleep_time)
  308. self.cleanup()
  309. self.autodetect_retry()
  310. # FIXME: https://github.com/netdata/netdata/issues/3817
  311. if self.do_gc and self.runs_counter % self.gc_interval == 0:
  312. v = gc.collect()
  313. Logger.debug("GC full collection run result: {0}".format(v))
  314. def cleanup(self):
  315. for job in self.dead_jobs:
  316. self.delete_job(job)
  317. for mod in self:
  318. if not mod:
  319. del self.modules[mod.name]
  320. def autodetect_retry(self):
  321. self.runs_counter += self.sleep_time
  322. for job in self.autodetect_jobs:
  323. if self.runs_counter % job.recheck_every == 0:
  324. checked = self.check_job(job)
  325. if checked:
  326. created = self.create_job_charts(job)
  327. if not created:
  328. self.delete_job(job)
  329. continue
  330. job.start()
  331. if __name__ == '__main__':
  332. DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
  333. Logger = PythonDLogger()
  334. if DEBUG:
  335. Logger.logger.severity = 'DEBUG'
  336. if TRACE:
  337. Logger.log_traceback = True
  338. Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
  339. plugin = Plugin()
  340. plugin.start()