python.d.plugin.in 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. #!/usr/bin/env bash
  2. '''':;
  3. if [[ "$OSTYPE" == "darwin"* ]]; then
  4. export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
  5. fi
  6. exec "$(command -v python || command -v python3 || command -v python2 ||
  7. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
  8. # -*- coding: utf-8 -*-
  9. # Description:
  10. # Author: Pawel Krupa (paulfantom)
  11. # Author: Ilya Mashchenko (l2isbad)
  12. # SPDX-License-Identifier: GPL-3.0-or-later
  13. import collections
  14. import copy
  15. import gc
  16. import multiprocessing
  17. import os
  18. import re
  19. import sys
  20. import time
  21. import threading
  22. import types
  23. PY_VERSION = sys.version_info[:2]
  24. if PY_VERSION > (3, 1):
  25. from importlib.machinery import SourceFileLoader
  26. else:
  27. from imp import load_source as SourceFileLoader
  28. ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
  29. ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
  30. ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
  31. ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
  32. def dirs():
  33. user_config = os.getenv(
  34. ENV_NETDATA_USER_CONFIG_DIR,
  35. '@configdir_POST@',
  36. )
  37. stock_config = os.getenv(
  38. ENV_NETDATA_STOCK_CONFIG_DIR,
  39. '@libconfigdir_POST@',
  40. )
  41. modules_user_config = os.path.join(user_config, 'python.d')
  42. modules_stock_config = os.path.join(stock_config, 'python.d')
  43. modules = os.path.abspath(
  44. os.getenv(
  45. ENV_NETDATA_PLUGINS_DIR,
  46. os.path.dirname(__file__),
  47. ) + '/../python.d'
  48. )
  49. pythond_packages = os.path.join(modules, 'python_modules')
  50. return collections.namedtuple(
  51. 'Dirs',
  52. [
  53. 'user_config',
  54. 'stock_config',
  55. 'modules_user_config',
  56. 'modules_stock_config',
  57. 'modules',
  58. 'pythond_packages',
  59. ]
  60. )(
  61. user_config,
  62. stock_config,
  63. modules_user_config,
  64. modules_stock_config,
  65. modules,
  66. pythond_packages,
  67. )
  68. DIRS = dirs()
  69. sys.path.append(DIRS.pythond_packages)
  70. from bases.collection import safe_print
  71. from bases.loggers import PythonDLogger
  72. from bases.loaders import load_config
  73. try:
  74. from collections import OrderedDict
  75. except ImportError:
  76. from third_party.ordereddict import OrderedDict
  77. END_TASK_MARKER = None
  78. IS_ATTY = sys.stdout.isatty()
  79. PLUGIN_CONF_FILE = 'python.d.conf'
  80. MODULE_SUFFIX = '.chart.py'
  81. OBSOLETED_MODULES = (
  82. 'apache_cache', # replaced by web_log
  83. 'cpuidle', # rewritten in C
  84. 'cpufreq', # rewritten in C
  85. 'gunicorn_log', # replaced by web_log
  86. 'linux_power_supply', # rewritten in C
  87. 'nginx_log', # replaced by web_log
  88. 'mdstat', # rewritten in C
  89. 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
  90. )
  91. AVAILABLE_MODULES = [
  92. m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
  93. if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
  94. ]
  95. PLUGIN_BASE_CONF = {
  96. 'enabled': True,
  97. 'default_run': True,
  98. 'gc_run': True,
  99. 'gc_interval': 300,
  100. }
  101. JOB_BASE_CONF = {
  102. 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
  103. 'priority': 60000,
  104. 'autodetection_retry': 0,
  105. 'chart_cleanup': 10,
  106. 'penalty': True,
  107. 'name': str(),
  108. }
  109. def heartbeat():
  110. if IS_ATTY:
  111. return
  112. safe_print('\n')
  113. class HeartBeat(threading.Thread):
  114. def __init__(self, every):
  115. threading.Thread.__init__(self)
  116. self.daemon = True
  117. self.every = every
  118. def run(self):
  119. while True:
  120. time.sleep(self.every)
  121. heartbeat()
  122. def load_module(name):
  123. abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
  124. module = SourceFileLoader(name, abs_path)
  125. if isinstance(module, types.ModuleType):
  126. return module
  127. return module.load_module()
  128. def multi_path_find(name, paths):
  129. for path in paths:
  130. abs_name = os.path.join(path, name)
  131. if os.path.isfile(abs_name):
  132. return abs_name
  133. return ''
  134. Task = collections.namedtuple(
  135. 'Task',
  136. [
  137. 'module_name',
  138. 'explicitly_enabled',
  139. ],
  140. )
  141. Result = collections.namedtuple(
  142. 'Result',
  143. [
  144. 'module_name',
  145. 'jobs_configs',
  146. ],
  147. )
  148. class ModuleChecker(multiprocessing.Process):
  149. def __init__(
  150. self,
  151. task_queue,
  152. result_queue,
  153. ):
  154. multiprocessing.Process.__init__(self)
  155. self.log = PythonDLogger()
  156. self.log.job_name = 'checker'
  157. self.task_queue = task_queue
  158. self.result_queue = result_queue
  159. def run(self):
  160. self.log.info('starting...')
  161. HeartBeat(1).start()
  162. while self.run_once():
  163. pass
  164. self.log.info('terminating...')
  165. def run_once(self):
  166. task = self.task_queue.get()
  167. if task is END_TASK_MARKER:
  168. # TODO: find better solution, understand why heartbeat thread doesn't work
  169. heartbeat()
  170. self.task_queue.task_done()
  171. self.result_queue.put(END_TASK_MARKER)
  172. return False
  173. result = self.do_task(task)
  174. if result:
  175. self.result_queue.put(result)
  176. self.task_queue.task_done()
  177. return True
  178. def do_task(self, task):
  179. self.log.info("{0} : checking".format(task.module_name))
  180. # LOAD SOURCE
  181. module = Module(task.module_name)
  182. try:
  183. module.load_source()
  184. except Exception as error:
  185. self.log.warning("{0} : error on loading source : {1}, skipping module".format(
  186. task.module_name,
  187. error,
  188. ))
  189. return None
  190. else:
  191. self.log.info("{0} : source successfully loaded".format(task.module_name))
  192. if module.is_disabled_by_default() and not task.explicitly_enabled:
  193. self.log.info("{0} : disabled by default".format(task.module_name))
  194. return None
  195. # LOAD CONFIG
  196. paths = [
  197. DIRS.modules_user_config,
  198. DIRS.modules_stock_config,
  199. ]
  200. conf_abs_path = multi_path_find(
  201. name='{0}.conf'.format(task.module_name),
  202. paths=paths,
  203. )
  204. if conf_abs_path:
  205. self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
  206. try:
  207. module.load_config(conf_abs_path)
  208. except Exception as error:
  209. self.log.warning("{0} : error on loading config : {1}, skipping module".format(
  210. task.module_name, error))
  211. return None
  212. else:
  213. self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
  214. task.module_name, paths))
  215. # CHECK JOBS
  216. jobs = module.create_jobs()
  217. self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
  218. successful_jobs_configs = list()
  219. for job in jobs:
  220. if job.autodetection_retry() > 0:
  221. successful_jobs_configs.append(job.config)
  222. self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
  223. continue
  224. try:
  225. job.init()
  226. except Exception as error:
  227. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
  228. task.module_name, job.name, error))
  229. continue
  230. try:
  231. ok = job.check()
  232. except Exception as error:
  233. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  234. task.module_name, job.name, error))
  235. continue
  236. if not ok:
  237. self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
  238. continue
  239. self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
  240. job.config['autodetection_retry'] = job.config['update_every']
  241. successful_jobs_configs.append(job.config)
  242. if not successful_jobs_configs:
  243. self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
  244. return None
  245. return Result(module.source.__name__, successful_jobs_configs)
  246. class JobConf(OrderedDict):
  247. def __init__(self, *args):
  248. OrderedDict.__init__(self, *args)
  249. def set_defaults_from_module(self, module):
  250. for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
  251. self[k] = getattr(module, k)
  252. def set_defaults_from_config(self, module_config):
  253. for k in [k for k in JOB_BASE_CONF if k in module_config]:
  254. self[k] = module_config[k]
  255. def set_job_name(self, name):
  256. self['job_name'] = re.sub(r'\s+', '_', name)
  257. def set_override_name(self, name):
  258. self['override_name'] = re.sub(r'\s+', '_', name)
  259. def as_dict(self):
  260. return copy.deepcopy(OrderedDict(self))
  261. class Job:
  262. def __init__(
  263. self,
  264. service,
  265. module_name,
  266. config,
  267. ):
  268. self.service = service
  269. self.config = config
  270. self.module_name = module_name
  271. self.name = config['job_name']
  272. self.override_name = config['override_name']
  273. self.wrapped = None
  274. def init(self):
  275. self.wrapped = self.service(configuration=self.config.as_dict())
  276. def check(self):
  277. return self.wrapped.check()
  278. def post_check(self, min_update_every):
  279. if self.wrapped.update_every < min_update_every:
  280. self.wrapped.update_every = min_update_every
  281. def create(self):
  282. return self.wrapped.create()
  283. def autodetection_retry(self):
  284. return self.config['autodetection_retry']
  285. def run(self):
  286. self.wrapped.run()
  287. class Module:
  288. def __init__(self, name):
  289. self.name = name
  290. self.source = None
  291. self.config = dict()
  292. def is_disabled_by_default(self):
  293. return bool(getattr(self.source, 'disabled_by_default', False))
  294. def load_source(self):
  295. self.source = load_module(self.name)
  296. def load_config(self, abs_path):
  297. self.config = load_config(abs_path) or dict()
  298. def gather_jobs_configs(self):
  299. job_names = [v for v in self.config if isinstance(self.config[v], dict)]
  300. if len(job_names) == 0:
  301. job_conf = JobConf(JOB_BASE_CONF)
  302. job_conf.set_defaults_from_module(self.source)
  303. job_conf.update(self.config)
  304. job_conf.set_job_name(self.name)
  305. job_conf.set_override_name(job_conf.pop('name'))
  306. return [job_conf]
  307. configs = list()
  308. for job_name in job_names:
  309. raw_job_conf = self.config[job_name]
  310. job_conf = JobConf(JOB_BASE_CONF)
  311. job_conf.set_defaults_from_module(self.source)
  312. job_conf.set_defaults_from_config(self.config)
  313. job_conf.update(raw_job_conf)
  314. job_conf.set_job_name(job_name)
  315. job_conf.set_override_name(job_conf.pop('name'))
  316. configs.append(job_conf)
  317. return configs
  318. def create_jobs(self, jobs_conf=None):
  319. return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
  320. class JobRunner(threading.Thread):
  321. def __init__(self, job):
  322. threading.Thread.__init__(self)
  323. self.daemon = True
  324. self.wrapped = job
  325. def run(self):
  326. self.wrapped.run()
  327. class PluginConf(dict):
  328. def __init__(self, *args):
  329. dict.__init__(self, *args)
  330. def is_module_enabled(self, module_name, explicit):
  331. if module_name in self:
  332. return self[module_name]
  333. if explicit:
  334. return False
  335. return self['default_run']
  336. class Plugin:
  337. def __init__(
  338. self,
  339. min_update_every=1,
  340. modules_to_run=tuple(AVAILABLE_MODULES),
  341. ):
  342. self.log = PythonDLogger()
  343. self.config = PluginConf(PLUGIN_BASE_CONF)
  344. self.task_queue = multiprocessing.JoinableQueue()
  345. self.result_queue = multiprocessing.JoinableQueue()
  346. self.min_update_every = min_update_every
  347. self.modules_to_run = modules_to_run
  348. self.auto_detection_jobs = list()
  349. self.tasks = list()
  350. self.results = list()
  351. self.checked_jobs = collections.defaultdict(list)
  352. self.runs = 0
  353. @staticmethod
  354. def shutdown():
  355. safe_print('DISABLE')
  356. exit(0)
  357. def run(self):
  358. jobs = self.create_jobs()
  359. if not jobs:
  360. return
  361. for job in self.prepare_jobs(jobs):
  362. self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
  363. JobRunner(job).start()
  364. self.serve()
  365. def enqueue_tasks(self):
  366. for task in self.tasks:
  367. self.task_queue.put(task)
  368. self.task_queue.put(END_TASK_MARKER)
  369. def dequeue_results(self):
  370. while True:
  371. result = self.result_queue.get()
  372. self.result_queue.task_done()
  373. if result is END_TASK_MARKER:
  374. break
  375. self.results.append(result)
  376. def load_config(self):
  377. paths = [
  378. DIRS.user_config,
  379. DIRS.stock_config,
  380. ]
  381. self.log.info("checking for config in {0}".format(paths))
  382. abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
  383. if not abs_path:
  384. self.log.warning('config was not found, using defaults')
  385. return True
  386. self.log.info("config found, loading config '{0}'".format(abs_path))
  387. try:
  388. config = load_config(abs_path) or dict()
  389. except Exception as error:
  390. self.log.error('error on loading config : {0}'.format(error))
  391. return False
  392. self.log.info('config successfully loaded')
  393. self.config.update(config)
  394. return True
  395. def setup(self):
  396. self.log.info('starting setup')
  397. if not self.load_config():
  398. return False
  399. if not self.config['enabled']:
  400. self.log.info('disabled in configuration file')
  401. return False
  402. for mod in self.modules_to_run:
  403. if self.config.is_module_enabled(mod, False):
  404. task = Task(mod, self.config.is_module_enabled(mod, True))
  405. self.tasks.append(task)
  406. else:
  407. self.log.info("{0} : disabled in configuration file".format(mod))
  408. if not self.tasks:
  409. self.log.info('no modules to run')
  410. return False
  411. worker = ModuleChecker(self.task_queue, self.result_queue)
  412. self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
  413. worker.start()
  414. # TODO: timeouts?
  415. self.enqueue_tasks()
  416. self.task_queue.join()
  417. self.dequeue_results()
  418. self.result_queue.join()
  419. self.task_queue.close()
  420. self.result_queue.close()
  421. self.log.info('stopping checker process')
  422. worker.join()
  423. if not self.results:
  424. self.log.info('no modules to run')
  425. return False
  426. self.log.info("setup complete, {0} active module(s) : '{1}'".format(
  427. len(self.results),
  428. [v.module_name for v in self.results])
  429. )
  430. return True
  431. def create_jobs(self):
  432. jobs = list()
  433. for result in self.results:
  434. module = Module(result.module_name)
  435. try:
  436. module.load_source()
  437. except Exception as error:
  438. self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
  439. result.module_name, error))
  440. continue
  441. module_jobs = module.create_jobs(result.jobs_configs)
  442. self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
  443. jobs.extend(module_jobs)
  444. return jobs
  445. def prepare_jobs(self, jobs):
  446. prepared = list()
  447. for job in jobs:
  448. check_name = job.override_name or job.name
  449. if check_name in self.checked_jobs[job.module_name]:
  450. self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
  451. job.module_name, job.name))
  452. continue
  453. try:
  454. job.init()
  455. except Exception as error:
  456. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
  457. job.module_name, job.name, error))
  458. continue
  459. self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
  460. try:
  461. ok = job.check()
  462. except Exception as error:
  463. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  464. job.module_name, job.name, error))
  465. continue
  466. if not ok:
  467. self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
  468. if job.autodetection_retry() > 0:
  469. self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
  470. job.module_name, job.name, job.autodetection_retry()))
  471. self.auto_detection_jobs.append(job)
  472. continue
  473. self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
  474. job.post_check(int(self.min_update_every))
  475. if not job.create():
  476. self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
  477. self.checked_jobs[job.module_name].append(check_name)
  478. prepared.append(job)
  479. return prepared
  480. def serve(self):
  481. gc_run = self.config['gc_run']
  482. gc_interval = self.config['gc_interval']
  483. while True:
  484. self.runs += 1
  485. # threads: main + heartbeat
  486. if threading.active_count() <= 2 and not self.auto_detection_jobs:
  487. return
  488. time.sleep(1)
  489. if gc_run and self.runs % gc_interval == 0:
  490. v = gc.collect()
  491. self.log.debug('GC collection run result: {0}'.format(v))
  492. self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
  493. def retry_job(self, job):
  494. stop_retrying = True
  495. retry_later = False
  496. if self.runs % job.autodetection_retry() != 0:
  497. return retry_later
  498. check_name = job.override_name or job.name
  499. if check_name in self.checked_jobs[job.module_name]:
  500. self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
  501. job.module_name, job.name))
  502. return stop_retrying
  503. try:
  504. ok = job.check()
  505. except Exception as error:
  506. self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
  507. job.module_name, job.name, error))
  508. return stop_retrying
  509. if not ok:
  510. self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
  511. job.module_name, job.name, job.autodetection_retry()))
  512. return retry_later
  513. self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
  514. if not job.create():
  515. return stop_retrying
  516. job.post_check(int(self.min_update_every))
  517. self.checked_jobs[job.module_name].append(check_name)
  518. JobRunner(job).start()
  519. return stop_retrying
  520. def parse_cmd():
  521. opts = sys.argv[:][1:]
  522. debug = False
  523. trace = False
  524. update_every = 1
  525. modules_to_run = list()
  526. v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
  527. if v:
  528. update_every = v
  529. opts.remove(v)
  530. if 'debug' in opts:
  531. debug = True
  532. opts.remove('debug')
  533. if 'trace' in opts:
  534. trace = True
  535. opts.remove('trace')
  536. if opts:
  537. modules_to_run = list(opts)
  538. return collections.namedtuple(
  539. 'CMD',
  540. [
  541. 'update_every',
  542. 'debug',
  543. 'trace',
  544. 'modules_to_run',
  545. ],
  546. )(
  547. update_every,
  548. debug,
  549. trace,
  550. modules_to_run,
  551. )
  552. def main():
  553. cmd = parse_cmd()
  554. logger = PythonDLogger()
  555. if cmd.debug:
  556. logger.logger.severity = 'DEBUG'
  557. if cmd.trace:
  558. logger.log_traceback = True
  559. logger.info('using python v{0}'.format(PY_VERSION[0]))
  560. unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
  561. if unknown_modules:
  562. logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
  563. safe_print('DISABLE')
  564. return
  565. plugin = Plugin(
  566. cmd.update_every,
  567. cmd.modules_to_run or AVAILABLE_MODULES,
  568. )
  569. HeartBeat(1).start()
  570. if not plugin.setup():
  571. safe_print('DISABLE')
  572. return
  573. plugin.run()
  574. logger.info('exiting from main...')
  575. plugin.shutdown()
  576. if __name__ == '__main__':
  577. main()