123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733 |
- #!/usr/bin/env bash
- '''':;
- if [[ "$OSTYPE" == "darwin"* ]]; then
- export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
- fi
- exec "$(command -v python || command -v python3 || command -v python2 ||
- echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
- # -*- coding: utf-8 -*-
- # Description:
- # Author: Pawel Krupa (paulfantom)
- # Author: Ilya Mashchenko (l2isbad)
- # SPDX-License-Identifier: GPL-3.0-or-later
- import collections
- import copy
- import gc
- import multiprocessing
- import os
- import re
- import sys
- import time
- import threading
- import types
- PY_VERSION = sys.version_info[:2]
- if PY_VERSION > (3, 1):
- from importlib.machinery import SourceFileLoader
- else:
- from imp import load_source as SourceFileLoader
- ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
- ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
- ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
- ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
- def dirs():
- user_config = os.getenv(
- ENV_NETDATA_USER_CONFIG_DIR,
- '@configdir_POST@',
- )
- stock_config = os.getenv(
- ENV_NETDATA_STOCK_CONFIG_DIR,
- '@libconfigdir_POST@',
- )
- modules_user_config = os.path.join(user_config, 'python.d')
- modules_stock_config = os.path.join(stock_config, 'python.d')
- modules = os.path.abspath(
- os.getenv(
- ENV_NETDATA_PLUGINS_DIR,
- os.path.dirname(__file__),
- ) + '/../python.d'
- )
- pythond_packages = os.path.join(modules, 'python_modules')
- return collections.namedtuple(
- 'Dirs',
- [
- 'user_config',
- 'stock_config',
- 'modules_user_config',
- 'modules_stock_config',
- 'modules',
- 'pythond_packages',
- ]
- )(
- user_config,
- stock_config,
- modules_user_config,
- modules_stock_config,
- modules,
- pythond_packages,
- )
- DIRS = dirs()
- sys.path.append(DIRS.pythond_packages)
- from bases.collection import safe_print
- from bases.loggers import PythonDLogger
- from bases.loaders import load_config
- try:
- from collections import OrderedDict
- except ImportError:
- from third_party.ordereddict import OrderedDict
- END_TASK_MARKER = None
- IS_ATTY = sys.stdout.isatty()
- PLUGIN_CONF_FILE = 'python.d.conf'
- MODULE_SUFFIX = '.chart.py'
- OBSOLETED_MODULES = (
- 'apache_cache', # replaced by web_log
- 'cpuidle', # rewritten in C
- 'cpufreq', # rewritten in C
- 'gunicorn_log', # replaced by web_log
- 'linux_power_supply', # rewritten in C
- 'nginx_log', # replaced by web_log
- 'mdstat', # rewritten in C
- 'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
- )
- AVAILABLE_MODULES = [
- m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
- if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
- ]
- PLUGIN_BASE_CONF = {
- 'enabled': True,
- 'default_run': True,
- 'gc_run': True,
- 'gc_interval': 300,
- }
- JOB_BASE_CONF = {
- 'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
- 'priority': 60000,
- 'autodetection_retry': 0,
- 'chart_cleanup': 10,
- 'penalty': True,
- 'name': str(),
- }
- def heartbeat():
- if IS_ATTY:
- return
- safe_print('\n')
- class HeartBeat(threading.Thread):
- def __init__(self, every):
- threading.Thread.__init__(self)
- self.daemon = True
- self.every = every
- def run(self):
- while True:
- time.sleep(self.every)
- heartbeat()
- def load_module(name):
- abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
- module = SourceFileLoader(name, abs_path)
- if isinstance(module, types.ModuleType):
- return module
- return module.load_module()
- def multi_path_find(name, paths):
- for path in paths:
- abs_name = os.path.join(path, name)
- if os.path.isfile(abs_name):
- return abs_name
- return ''
- Task = collections.namedtuple(
- 'Task',
- [
- 'module_name',
- 'explicitly_enabled',
- ],
- )
- Result = collections.namedtuple(
- 'Result',
- [
- 'module_name',
- 'jobs_configs',
- ],
- )
- class ModuleChecker(multiprocessing.Process):
- def __init__(
- self,
- task_queue,
- result_queue,
- ):
- multiprocessing.Process.__init__(self)
- self.log = PythonDLogger()
- self.log.job_name = 'checker'
- self.task_queue = task_queue
- self.result_queue = result_queue
- def run(self):
- self.log.info('starting...')
- HeartBeat(1).start()
- while self.run_once():
- pass
- self.log.info('terminating...')
- def run_once(self):
- task = self.task_queue.get()
- if task is END_TASK_MARKER:
- # TODO: find better solution, understand why heartbeat thread doesn't work
- heartbeat()
- self.task_queue.task_done()
- self.result_queue.put(END_TASK_MARKER)
- return False
- result = self.do_task(task)
- if result:
- self.result_queue.put(result)
- self.task_queue.task_done()
- return True
- def do_task(self, task):
- self.log.info("{0} : checking".format(task.module_name))
- # LOAD SOURCE
- module = Module(task.module_name)
- try:
- module.load_source()
- except Exception as error:
- self.log.warning("{0} : error on loading source : {1}, skipping module".format(
- task.module_name,
- error,
- ))
- return None
- else:
- self.log.info("{0} : source successfully loaded".format(task.module_name))
- if module.is_disabled_by_default() and not task.explicitly_enabled:
- self.log.info("{0} : disabled by default".format(task.module_name))
- return None
- # LOAD CONFIG
- paths = [
- DIRS.modules_user_config,
- DIRS.modules_stock_config,
- ]
- conf_abs_path = multi_path_find(
- name='{0}.conf'.format(task.module_name),
- paths=paths,
- )
- if conf_abs_path:
- self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
- try:
- module.load_config(conf_abs_path)
- except Exception as error:
- self.log.warning("{0} : error on loading config : {1}, skipping module".format(
- task.module_name, error))
- return None
- else:
- self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
- task.module_name, paths))
- # CHECK JOBS
- jobs = module.create_jobs()
- self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
- successful_jobs_configs = list()
- for job in jobs:
- if job.autodetection_retry() > 0:
- successful_jobs_configs.append(job.config)
- self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
- continue
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
- task.module_name, job.name, error))
- continue
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- task.module_name, job.name, error))
- continue
- if not ok:
- self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
- continue
- self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
- job.config['autodetection_retry'] = job.config['update_every']
- successful_jobs_configs.append(job.config)
- if not successful_jobs_configs:
- self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
- return None
- return Result(module.source.__name__, successful_jobs_configs)
- class JobConf(OrderedDict):
- def __init__(self, *args):
- OrderedDict.__init__(self, *args)
- def set_defaults_from_module(self, module):
- for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
- self[k] = getattr(module, k)
- def set_defaults_from_config(self, module_config):
- for k in [k for k in JOB_BASE_CONF if k in module_config]:
- self[k] = module_config[k]
- def set_job_name(self, name):
- self['job_name'] = re.sub(r'\s+', '_', name)
- def set_override_name(self, name):
- self['override_name'] = re.sub(r'\s+', '_', name)
- def as_dict(self):
- return copy.deepcopy(OrderedDict(self))
- class Job:
- def __init__(
- self,
- service,
- module_name,
- config,
- ):
- self.service = service
- self.config = config
- self.module_name = module_name
- self.name = config['job_name']
- self.override_name = config['override_name']
- self.wrapped = None
- def init(self):
- self.wrapped = self.service(configuration=self.config.as_dict())
- def check(self):
- return self.wrapped.check()
- def post_check(self, min_update_every):
- if self.wrapped.update_every < min_update_every:
- self.wrapped.update_every = min_update_every
- def create(self):
- return self.wrapped.create()
- def autodetection_retry(self):
- return self.config['autodetection_retry']
- def run(self):
- self.wrapped.run()
- class Module:
- def __init__(self, name):
- self.name = name
- self.source = None
- self.config = dict()
- def is_disabled_by_default(self):
- return bool(getattr(self.source, 'disabled_by_default', False))
- def load_source(self):
- self.source = load_module(self.name)
- def load_config(self, abs_path):
- self.config = load_config(abs_path) or dict()
- def gather_jobs_configs(self):
- job_names = [v for v in self.config if isinstance(self.config[v], dict)]
- if len(job_names) == 0:
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.update(self.config)
- job_conf.set_job_name(self.name)
- job_conf.set_override_name(job_conf.pop('name'))
- return [job_conf]
- configs = list()
- for job_name in job_names:
- raw_job_conf = self.config[job_name]
- job_conf = JobConf(JOB_BASE_CONF)
- job_conf.set_defaults_from_module(self.source)
- job_conf.set_defaults_from_config(self.config)
- job_conf.update(raw_job_conf)
- job_conf.set_job_name(job_name)
- job_conf.set_override_name(job_conf.pop('name'))
- configs.append(job_conf)
- return configs
- def create_jobs(self, jobs_conf=None):
- return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
- class JobRunner(threading.Thread):
- def __init__(self, job):
- threading.Thread.__init__(self)
- self.daemon = True
- self.wrapped = job
- def run(self):
- self.wrapped.run()
- class PluginConf(dict):
- def __init__(self, *args):
- dict.__init__(self, *args)
- def is_module_enabled(self, module_name, explicit):
- if module_name in self:
- return self[module_name]
- if explicit:
- return False
- return self['default_run']
- class Plugin:
- def __init__(
- self,
- min_update_every=1,
- modules_to_run=tuple(AVAILABLE_MODULES),
- ):
- self.log = PythonDLogger()
- self.config = PluginConf(PLUGIN_BASE_CONF)
- self.task_queue = multiprocessing.JoinableQueue()
- self.result_queue = multiprocessing.JoinableQueue()
- self.min_update_every = min_update_every
- self.modules_to_run = modules_to_run
- self.auto_detection_jobs = list()
- self.tasks = list()
- self.results = list()
- self.checked_jobs = collections.defaultdict(list)
- self.runs = 0
- @staticmethod
- def shutdown():
- safe_print('DISABLE')
- exit(0)
- def run(self):
- jobs = self.create_jobs()
- if not jobs:
- return
- for job in self.prepare_jobs(jobs):
- self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
- JobRunner(job).start()
- self.serve()
- def enqueue_tasks(self):
- for task in self.tasks:
- self.task_queue.put(task)
- self.task_queue.put(END_TASK_MARKER)
- def dequeue_results(self):
- while True:
- result = self.result_queue.get()
- self.result_queue.task_done()
- if result is END_TASK_MARKER:
- break
- self.results.append(result)
- def load_config(self):
- paths = [
- DIRS.user_config,
- DIRS.stock_config,
- ]
- self.log.info("checking for config in {0}".format(paths))
- abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
- if not abs_path:
- self.log.warning('config was not found, using defaults')
- return True
- self.log.info("config found, loading config '{0}'".format(abs_path))
- try:
- config = load_config(abs_path) or dict()
- except Exception as error:
- self.log.error('error on loading config : {0}'.format(error))
- return False
- self.log.info('config successfully loaded')
- self.config.update(config)
- return True
- def setup(self):
- self.log.info('starting setup')
- if not self.load_config():
- return False
- if not self.config['enabled']:
- self.log.info('disabled in configuration file')
- return False
- for mod in self.modules_to_run:
- if self.config.is_module_enabled(mod, False):
- task = Task(mod, self.config.is_module_enabled(mod, True))
- self.tasks.append(task)
- else:
- self.log.info("{0} : disabled in configuration file".format(mod))
- if not self.tasks:
- self.log.info('no modules to run')
- return False
- worker = ModuleChecker(self.task_queue, self.result_queue)
- self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
- worker.start()
- # TODO: timeouts?
- self.enqueue_tasks()
- self.task_queue.join()
- self.dequeue_results()
- self.result_queue.join()
- self.task_queue.close()
- self.result_queue.close()
- self.log.info('stopping checker process')
- worker.join()
- if not self.results:
- self.log.info('no modules to run')
- return False
- self.log.info("setup complete, {0} active module(s) : '{1}'".format(
- len(self.results),
- [v.module_name for v in self.results])
- )
- return True
- def create_jobs(self):
- jobs = list()
- for result in self.results:
- module = Module(result.module_name)
- try:
- module.load_source()
- except Exception as error:
- self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
- result.module_name, error))
- continue
- module_jobs = module.create_jobs(result.jobs_configs)
- self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
- jobs.extend(module_jobs)
- return jobs
- def prepare_jobs(self, jobs):
- prepared = list()
- for job in jobs:
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
- job.module_name, job.name))
- continue
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
- job.module_name, job.name, error))
- continue
- self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- job.module_name, job.name, error))
- continue
- if not ok:
- self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
- if job.autodetection_retry() > 0:
- self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- self.auto_detection_jobs.append(job)
- continue
- self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
- job.post_check(int(self.min_update_every))
- if not job.create():
- self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
- self.checked_jobs[job.module_name].append(check_name)
- prepared.append(job)
- return prepared
- def serve(self):
- gc_run = self.config['gc_run']
- gc_interval = self.config['gc_interval']
- while True:
- self.runs += 1
- # threads: main + heartbeat
- if threading.active_count() <= 2 and not self.auto_detection_jobs:
- return
- time.sleep(1)
- if gc_run and self.runs % gc_interval == 0:
- v = gc.collect()
- self.log.debug('GC collection run result: {0}'.format(v))
- self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
- def retry_job(self, job):
- stop_retrying = True
- retry_later = False
- if self.runs % job.autodetection_retry() != 0:
- return retry_later
- check_name = job.override_name or job.name
- if check_name in self.checked_jobs[job.module_name]:
- self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
- job.module_name, job.name))
- return stop_retrying
- try:
- ok = job.check()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
- job.module_name, job.name, error))
- return stop_retrying
- if not ok:
- self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
- job.module_name, job.name, job.autodetection_retry()))
- return retry_later
- self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
- if not job.create():
- return stop_retrying
- job.post_check(int(self.min_update_every))
- self.checked_jobs[job.module_name].append(check_name)
- JobRunner(job).start()
- return stop_retrying
- def parse_cmd():
- opts = sys.argv[:][1:]
- debug = False
- trace = False
- update_every = 1
- modules_to_run = list()
- v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
- if v:
- update_every = v
- opts.remove(v)
- if 'debug' in opts:
- debug = True
- opts.remove('debug')
- if 'trace' in opts:
- trace = True
- opts.remove('trace')
- if opts:
- modules_to_run = list(opts)
- return collections.namedtuple(
- 'CMD',
- [
- 'update_every',
- 'debug',
- 'trace',
- 'modules_to_run',
- ],
- )(
- update_every,
- debug,
- trace,
- modules_to_run,
- )
- def main():
- cmd = parse_cmd()
- logger = PythonDLogger()
- if cmd.debug:
- logger.logger.severity = 'DEBUG'
- if cmd.trace:
- logger.log_traceback = True
- logger.info('using python v{0}'.format(PY_VERSION[0]))
- unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
- if unknown_modules:
- logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
- safe_print('DISABLE')
- return
- plugin = Plugin(
- cmd.update_every,
- cmd.modules_to_run or AVAILABLE_MODULES,
- )
- HeartBeat(1).start()
- if not plugin.setup():
- safe_print('DISABLE')
- return
- plugin.run()
- logger.info('exiting from main...')
- plugin.shutdown()
- if __name__ == '__main__':
- main()
|