python.d.plugin.in 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. #!/usr/bin/env bash
  2. '''':; exec "$(command -v python || command -v python3 || command -v python2 ||
  3. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
  4. # -*- coding: utf-8 -*-
  5. # Description:
  6. # Author: Pawel Krupa (paulfantom)
  7. # Author: Ilya Mashchenko (l2isbad)
  8. # SPDX-License-Identifier: GPL-3.0-or-later
  9. import collections
  10. import copy
  11. import gc
  12. import multiprocessing
  13. import os
  14. import re
  15. import sys
  16. import time
  17. import threading
  18. import types
  19. PY_VERSION = sys.version_info[:2]
  20. if PY_VERSION > (3, 1):
  21. from importlib.machinery import SourceFileLoader
  22. else:
  23. from imp import load_source as SourceFileLoader
  24. ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
  25. ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
  26. ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
  27. ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
  28. def dirs():
  29. user_config = os.getenv(
  30. ENV_NETDATA_USER_CONFIG_DIR,
  31. '@configdir_POST@',
  32. )
  33. stock_config = os.getenv(
  34. ENV_NETDATA_STOCK_CONFIG_DIR,
  35. '@libconfigdir_POST@',
  36. )
  37. modules_user_config = os.path.join(user_config, 'python.d')
  38. modules_stock_config = os.path.join(stock_config, 'python.d')
  39. modules = os.path.abspath(
  40. os.getenv(
  41. ENV_NETDATA_PLUGINS_DIR,
  42. os.path.dirname(__file__),
  43. ) + '/../python.d'
  44. )
  45. pythond_packages = os.path.join(modules, 'python_modules')
  46. return collections.namedtuple(
  47. 'Dirs',
  48. [
  49. 'user_config',
  50. 'stock_config',
  51. 'modules_user_config',
  52. 'modules_stock_config',
  53. 'modules',
  54. 'pythond_packages',
  55. ]
  56. )(
  57. user_config,
  58. stock_config,
  59. modules_user_config,
  60. modules_stock_config,
  61. modules,
  62. pythond_packages,
  63. )
  64. DIRS = dirs()
  65. sys.path.append(DIRS.pythond_packages)
  66. from bases.collection import safe_print
  67. from bases.loggers import PythonDLogger
  68. from bases.loaders import load_config
  69. try:
  70. from collections import OrderedDict
  71. except ImportError:
  72. from third_party.ordereddict import OrderedDict
  73. END_TASK_MARKER = None
  74. IS_ATTY = sys.stdout.isatty()
  75. PLUGIN_CONF_FILE = 'python.d.conf'
  76. MODULE_SUFFIX = '.chart.py'
  77. OBSOLETED_MODULES = (
  78. 'apache_cache', # replaced by web_log
  79. 'cpuidle', # rewritten in C
  80. 'cpufreq', # rewritten in C
  81. 'gunicorn_log', # replaced by web_log
  82. 'linux_power_supply', # rewritten in C
  83. 'nginx_log', # replaced by web_log
  84. 'mdstat', # rewritten in C
  85. 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
  86. )
  87. AVAILABLE_MODULES = [
  88. m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
  89. if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
  90. ]
  91. PLUGIN_BASE_CONF = {
  92. 'enabled': True,
  93. 'default_run': True,
  94. 'gc_run': True,
  95. 'gc_interval': 300,
  96. }
  97. JOB_BASE_CONF = {
  98. 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
  99. 'priority': 60000,
  100. 'autodetection_retry': 0,
  101. 'chart_cleanup': 10,
  102. 'penalty': True,
  103. 'name': str(),
  104. }
  105. class HeartBeat(threading.Thread):
  106. def __init__(self, every):
  107. threading.Thread.__init__(self)
  108. self.daemon = True
  109. self.every = every
  110. def run(self):
  111. while True:
  112. time.sleep(self.every)
  113. if IS_ATTY:
  114. continue
  115. safe_print('\n')
  116. def load_module(name):
  117. abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
  118. module = SourceFileLoader(name, abs_path)
  119. if isinstance(module, types.ModuleType):
  120. return module
  121. return module.load_module()
  122. def multi_path_find(name, paths):
  123. for path in paths:
  124. abs_name = os.path.join(path, name)
  125. if os.path.isfile(abs_name):
  126. return abs_name
  127. return ''
  128. Task = collections.namedtuple(
  129. 'Task',
  130. [
  131. 'module_name',
  132. 'explicitly_enabled',
  133. ],
  134. )
  135. Result = collections.namedtuple(
  136. 'Result',
  137. [
  138. 'module_name',
  139. 'jobs_configs',
  140. ],
  141. )
  142. class ModuleChecker(multiprocessing.Process):
  143. def __init__(
  144. self,
  145. task_queue,
  146. result_queue,
  147. ):
  148. multiprocessing.Process.__init__(self)
  149. self.log = PythonDLogger()
  150. self.log.job_name = 'checker'
  151. self.task_queue = task_queue
  152. self.result_queue = result_queue
  153. def run(self):
  154. self.log.info('starting...')
  155. HeartBeat(1).start()
  156. while self.run_once():
  157. pass
  158. self.log.info('terminating...')
  159. def run_once(self):
  160. task = self.task_queue.get()
  161. if task is END_TASK_MARKER:
  162. self.task_queue.task_done()
  163. self.result_queue.put(END_TASK_MARKER)
  164. return False
  165. result = self.do_task(task)
  166. if result:
  167. self.result_queue.put(result)
  168. self.task_queue.task_done()
  169. return True
  170. def do_task(self, task):
  171. self.log.info("{0} : checking".format(task.module_name))
  172. # LOAD SOURCE
  173. module = Module(task.module_name)
  174. try:
  175. module.load_source()
  176. except Exception as error:
  177. self.log.warning("{0} : error on loading source : {1}, skipping module".format(
  178. task.module_name,
  179. error,
  180. ))
  181. return None
  182. else:
  183. self.log.info("{0} : source successfully loaded".format(task.module_name))
  184. if module.is_disabled_by_default() and not task.explicitly_enabled:
  185. self.log.info("{0} : disabled by default".format(task.module_name))
  186. return None
  187. # LOAD CONFIG
  188. paths = [
  189. DIRS.modules_user_config,
  190. DIRS.modules_stock_config,
  191. ]
  192. conf_abs_path = multi_path_find(
  193. name='{0}.conf'.format(task.module_name),
  194. paths=paths,
  195. )
  196. if conf_abs_path:
  197. self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
  198. try:
  199. module.load_config(conf_abs_path)
  200. except Exception as error:
  201. self.log.warning("{0} : error on loading config : {1}, skipping module".format(
  202. task.module_name, error))
  203. return None
  204. else:
  205. self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
  206. task.module_name, paths))
  207. # CHECK JOBS
  208. jobs = module.create_jobs()
  209. self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
  210. successful_jobs_configs = list()
  211. for job in jobs:
  212. if job.autodetection_retry() > 0:
  213. successful_jobs_configs.append(job.config)
  214. self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
  215. continue
  216. try:
  217. job.init()
  218. except Exception as error:
  219. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
  220. task.module_name, job.name, error))
  221. continue
  222. try:
  223. ok = job.check()
  224. except Exception as error:
  225. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  226. task.module_name, job.name, error))
  227. continue
  228. if not ok:
  229. self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
  230. continue
  231. self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
  232. job.config['autodetection_retry'] = job.config['update_every']
  233. successful_jobs_configs.append(job.config)
  234. if not successful_jobs_configs:
  235. self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
  236. return None
  237. return Result(module.source.__name__, successful_jobs_configs)
  238. class JobConf(OrderedDict):
  239. def __init__(self, *args):
  240. OrderedDict.__init__(self, *args)
  241. def set_defaults_from_module(self, module):
  242. for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
  243. self[k] = getattr(module, k)
  244. def set_defaults_from_config(self, module_config):
  245. for k in [k for k in JOB_BASE_CONF if k in module_config]:
  246. self[k] = module_config[k]
  247. def set_job_name(self, name):
  248. self['job_name'] = re.sub(r'\s+', '_', name)
  249. def set_override_name(self, name):
  250. self['override_name'] = re.sub(r'\s+', '_', name)
  251. def as_dict(self):
  252. return copy.deepcopy(OrderedDict(self))
  253. class Job:
  254. def __init__(
  255. self,
  256. service,
  257. module_name,
  258. config,
  259. ):
  260. self.service = service
  261. self.config = config
  262. self.module_name = module_name
  263. self.name = config['job_name']
  264. self.override_name = config['override_name']
  265. self.wrapped = None
  266. def init(self):
  267. self.wrapped = self.service(configuration=self.config.as_dict())
  268. def check(self):
  269. return self.wrapped.check()
  270. def post_check(self, min_update_every):
  271. if self.wrapped.update_every < min_update_every:
  272. self.wrapped.update_every = min_update_every
  273. def create(self):
  274. return self.wrapped.create()
  275. def autodetection_retry(self):
  276. return self.config['autodetection_retry']
  277. def run(self):
  278. self.wrapped.run()
  279. class Module:
  280. def __init__(self, name):
  281. self.name = name
  282. self.source = None
  283. self.config = dict()
  284. def is_disabled_by_default(self):
  285. return bool(getattr(self.source, 'disabled_by_default', False))
  286. def load_source(self):
  287. self.source = load_module(self.name)
  288. def load_config(self, abs_path):
  289. self.config = load_config(abs_path) or dict()
  290. def gather_jobs_configs(self):
  291. job_names = [v for v in self.config if isinstance(self.config[v], dict)]
  292. if len(job_names) == 0:
  293. job_conf = JobConf(JOB_BASE_CONF)
  294. job_conf.set_defaults_from_module(self.source)
  295. job_conf.update(self.config)
  296. job_conf.set_job_name(self.name)
  297. job_conf.set_override_name(job_conf.pop('name'))
  298. return [job_conf]
  299. configs = list()
  300. for job_name in job_names:
  301. raw_job_conf = self.config[job_name]
  302. job_conf = JobConf(JOB_BASE_CONF)
  303. job_conf.set_defaults_from_module(self.source)
  304. job_conf.set_defaults_from_config(self.config)
  305. job_conf.update(raw_job_conf)
  306. job_conf.set_job_name(job_name)
  307. job_conf.set_override_name(job_conf.pop('name'))
  308. configs.append(job_conf)
  309. return configs
  310. def create_jobs(self, jobs_conf=None):
  311. return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
  312. class JobRunner(threading.Thread):
  313. def __init__(self, job):
  314. threading.Thread.__init__(self)
  315. self.daemon = True
  316. self.wrapped = job
  317. def run(self):
  318. self.wrapped.run()
  319. class PluginConf(dict):
  320. def __init__(self, *args):
  321. dict.__init__(self, *args)
  322. def is_module_enabled(self, module_name, explicit):
  323. if module_name in self:
  324. return self[module_name]
  325. if explicit:
  326. return False
  327. return self['default_run']
  328. class Plugin:
  329. def __init__(
  330. self,
  331. min_update_every=1,
  332. modules_to_run=tuple(AVAILABLE_MODULES),
  333. ):
  334. self.log = PythonDLogger()
  335. self.config = PluginConf(PLUGIN_BASE_CONF)
  336. self.task_queue = multiprocessing.JoinableQueue()
  337. self.result_queue = multiprocessing.JoinableQueue()
  338. self.min_update_every = min_update_every
  339. self.modules_to_run = modules_to_run
  340. self.auto_detection_jobs = list()
  341. self.tasks = list()
  342. self.results = list()
  343. self.checked_jobs = collections.defaultdict(list)
  344. self.runs = 0
  345. @staticmethod
  346. def shutdown():
  347. safe_print('DISABLE')
  348. exit(0)
  349. def run(self):
  350. jobs = self.create_jobs()
  351. if not jobs:
  352. return
  353. for job in self.prepare_jobs(jobs):
  354. self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
  355. JobRunner(job).start()
  356. self.serve()
  357. def enqueue_tasks(self):
  358. for task in self.tasks:
  359. self.task_queue.put(task)
  360. self.task_queue.put(END_TASK_MARKER)
  361. def dequeue_results(self):
  362. while True:
  363. result = self.result_queue.get()
  364. self.result_queue.task_done()
  365. if result is END_TASK_MARKER:
  366. break
  367. self.results.append(result)
  368. def load_config(self):
  369. paths = [
  370. DIRS.user_config,
  371. DIRS.stock_config,
  372. ]
  373. self.log.info("checking for config in {0}".format(paths))
  374. abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
  375. if not abs_path:
  376. self.log.warning('config was not found, using defaults')
  377. return True
  378. self.log.info("config found, loading config '{0}'".format(abs_path))
  379. try:
  380. config = load_config(abs_path) or dict()
  381. except Exception as error:
  382. self.log.error('error on loading config : {0}'.format(error))
  383. return False
  384. self.log.info('config successfully loaded')
  385. self.config.update(config)
  386. return True
  387. def setup(self):
  388. self.log.info('starting setup')
  389. if not self.load_config():
  390. return False
  391. if not self.config['enabled']:
  392. self.log.info('disabled in configuration file')
  393. return False
  394. for mod in self.modules_to_run:
  395. if self.config.is_module_enabled(mod, False):
  396. task = Task(mod, self.config.is_module_enabled(mod, True))
  397. self.tasks.append(task)
  398. else:
  399. self.log.info("{0} : disabled in configuration file".format(mod))
  400. if not self.tasks:
  401. self.log.info('no modules to run')
  402. return False
  403. worker = ModuleChecker(self.task_queue, self.result_queue)
  404. self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
  405. worker.start()
  406. # TODO: timeouts?
  407. self.enqueue_tasks()
  408. self.task_queue.join()
  409. self.dequeue_results()
  410. self.result_queue.join()
  411. self.task_queue.close()
  412. self.result_queue.close()
  413. self.log.info('stopping checker process')
  414. worker.join()
  415. if not self.results:
  416. self.log.info('no modules to run')
  417. return False
  418. self.log.info("setup complete, {0} active module(s) : '{1}'".format(
  419. len(self.results),
  420. [v.module_name for v in self.results])
  421. )
  422. return True
  423. def create_jobs(self):
  424. jobs = list()
  425. for result in self.results:
  426. module = Module(result.module_name)
  427. try:
  428. module.load_source()
  429. except Exception as error:
  430. self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
  431. result.module_name, error))
  432. continue
  433. module_jobs = module.create_jobs(result.jobs_configs)
  434. self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
  435. jobs.extend(module_jobs)
  436. return jobs
  437. def prepare_jobs(self, jobs):
  438. prepared = list()
  439. for job in jobs:
  440. check_name = job.override_name or job.name
  441. if check_name in self.checked_jobs[job.module_name]:
  442. self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
  443. job.module_name, job.name))
  444. continue
  445. try:
  446. job.init()
  447. except Exception as error:
  448. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
  449. job.module_name, job.name, error))
  450. continue
  451. self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
  452. try:
  453. ok = job.check()
  454. except Exception as error:
  455. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  456. job.module_name, job.name, error))
  457. continue
  458. if not ok:
  459. self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
  460. if job.autodetection_retry() > 0:
  461. self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
  462. job.module_name, job.name, job.autodetection_retry()))
  463. self.auto_detection_jobs.append(job)
  464. continue
  465. self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
  466. job.post_check(int(self.min_update_every))
  467. if not job.create():
  468. self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
  469. self.checked_jobs[job.module_name].append(check_name)
  470. prepared.append(job)
  471. return prepared
  472. def serve(self):
  473. gc_run = self.config['gc_run']
  474. gc_interval = self.config['gc_interval']
  475. while True:
  476. self.runs += 1
  477. # threads: main + heartbeat
  478. if threading.active_count() <= 2 and not self.auto_detection_jobs:
  479. return
  480. time.sleep(1)
  481. if gc_run and self.runs % gc_interval == 0:
  482. v = gc.collect()
  483. self.log.debug('GC collection run result: {0}'.format(v))
  484. self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
  485. def retry_job(self, job):
  486. stop_retrying = True
  487. retry_later = False
  488. if self.runs % job.autodetection_retry() != 0:
  489. return retry_later
  490. check_name = job.override_name or job.name
  491. if check_name in self.checked_jobs[job.module_name]:
  492. self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
  493. job.module_name, job.name))
  494. return stop_retrying
  495. try:
  496. ok = job.check()
  497. except Exception as error:
  498. self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
  499. job.module_name, job.name, error))
  500. return stop_retrying
  501. if not ok:
  502. self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
  503. job.module_name, job.name, job.autodetection_retry()))
  504. return retry_later
  505. self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
  506. if not job.create():
  507. return stop_retrying
  508. job.post_check(int(self.min_update_every))
  509. self.checked_jobs[job.module_name].append(check_name)
  510. return stop_retrying
  511. def parse_cmd():
  512. opts = sys.argv[:][1:]
  513. debug = False
  514. trace = False
  515. update_every = 1
  516. modules_to_run = list()
  517. v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
  518. if v:
  519. update_every = v
  520. opts.remove(v)
  521. if 'debug' in opts:
  522. debug = True
  523. opts.remove('debug')
  524. if 'trace' in opts:
  525. trace = True
  526. opts.remove('trace')
  527. if opts:
  528. modules_to_run = list(opts)
  529. return collections.namedtuple(
  530. 'CMD',
  531. [
  532. 'update_every',
  533. 'debug',
  534. 'trace',
  535. 'modules_to_run',
  536. ],
  537. )(
  538. update_every,
  539. debug,
  540. trace,
  541. modules_to_run,
  542. )
  543. def main():
  544. cmd = parse_cmd()
  545. logger = PythonDLogger()
  546. if cmd.debug:
  547. logger.logger.severity = 'DEBUG'
  548. if cmd.trace:
  549. logger.log_traceback = True
  550. logger.info('using python v{0}'.format(PY_VERSION[0]))
  551. unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
  552. if unknown_modules:
  553. logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
  554. safe_print('DISABLE')
  555. return
  556. plugin = Plugin(
  557. cmd.update_every,
  558. cmd.modules_to_run or AVAILABLE_MODULES,
  559. )
  560. HeartBeat(1).start()
  561. if not plugin.setup():
  562. safe_print('DISABLE')
  563. return
  564. plugin.run()
  565. logger.info('exiting from main...')
  566. plugin.shutdown()
  567. if __name__ == '__main__':
  568. main()