123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- #!/usr/bin/env bash
- '''':; exec "$(command -v python || command -v python3 || command -v python2 ||
- echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
- # -*- coding: utf-8 -*-
- # Description:
- # Author: Pawel Krupa (paulfantom)
- # Author: Ilya Mashchenko (l2isbad)
- # SPDX-License-Identifier: GPL-3.0-or-later
- import gc
- import os
- import sys
- import threading
- from re import sub
- from sys import version_info, argv
- from time import sleep
- GC_RUN = True
- GC_COLLECT_EVERY = 300
- PY_VERSION = version_info[:2]
- USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
- STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
- PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
- PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
- PLUGINS_DIR = os.path.abspath(os.getenv(
- 'NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + '/../python.d')
- PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
- sys.path.append(PYTHON_MODULES_DIR)
- from bases.loaders import ModuleAndConfigLoader # noqa: E402
- from bases.loggers import PythonDLogger # noqa: E402
- from bases.collection import setdefault_values, run_and_exit # noqa: E402
- try:
- from collections import OrderedDict
- except ImportError:
- from third_party.ordereddict import OrderedDict
- BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
- 'retries': 60,
- 'priority': 60000,
- 'autodetection_retry': 0,
- 'chart_cleanup': 10,
- 'name': str()}
- MODULE_EXTENSION = '.chart.py'
- OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log', 'cpufreq']
- def module_ok(m):
- return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
- ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
- def parse_cmd():
- debug = 'debug' in argv[1:]
- trace = 'trace' in argv[1:]
- override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
- modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
- return debug, trace, override_update_every, modules or ALL_MODULES
- def multi_job_check(config):
- return next((True for key in config if isinstance(config[key], dict)), False)
- class RawModule:
- def __init__(self, name, path, explicitly_enabled=True):
- self.name = name
- self.path = path
- self.explicitly_enabled = explicitly_enabled
- class Job(object):
- def __init__(self, initialized_job, job_id):
- """
- :param initialized_job: instance of <Class Service>
- :param job_id: <str>
- """
- self.job = initialized_job
- self.id = job_id # key in Modules.jobs()
- self.module_name = self.job.__module__ # used in Plugin.delete_job()
- self.recheck_every = self.job.configuration.pop('autodetection_retry')
- self.checked = False # used in Plugin.check_job()
- self.created = False # used in Plugin.create_job_charts()
- if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
- self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
- def __getattr__(self, item):
- return getattr(self.job, item)
- def __repr__(self):
- return self.job.__repr__()
- def is_dead(self):
- return bool(self.ident) and not self.is_alive()
- def not_launched(self):
- return not bool(self.ident)
- def is_autodetect(self):
- return self.recheck_every
- class Module(object):
- def __init__(self, service, config):
- """
- :param service: <Module>
- :param config: <dict>
- """
- self.service = service
- self.name = service.__name__
- self.config = self.jobs_configurations_builder(config)
- self.jobs = OrderedDict()
- self.counter = 1
- self.initialize_jobs()
- def __repr__(self):
- return "<Class Module '{name}'>".format(name=self.name)
- def __iter__(self):
- return iter(OrderedDict(self.jobs).values())
- def __getitem__(self, item):
- return self.jobs[item]
- def __delitem__(self, key):
- del self.jobs[key]
- def __len__(self):
- return len(self.jobs)
- def __bool__(self):
- return bool(self.jobs)
- def __nonzero__(self):
- return self.__bool__()
- def jobs_configurations_builder(self, config):
- """
- :param config: <dict>
- :return:
- """
- counter = 0
- job_base_config = dict()
- for attr in BASE_CONFIG:
- job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
- if not config:
- config = {str(): dict()}
- elif not multi_job_check(config):
- config = {str(): config}
- for job_name in config:
- if not isinstance(config[job_name], dict):
- continue
- job_config = setdefault_values(config[job_name], base_dict=job_base_config)
- job_name = sub(r'\s+', '_', job_name)
- config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
- counter += 1
- job_id = 'job' + str(counter).zfill(3)
- yield job_id, job_name, job_config
- def initialize_jobs(self):
- """
- :return:
- """
- for job_id, job_name, job_config in self.config:
- job_config['job_name'] = job_name
- job_config['override_name'] = job_config.pop('name')
- try:
- initialized_job = self.service.Service(configuration=job_config)
- except Exception as error:
- Logger.error("job initialization: '{module_name} {job_name}' "
- "=> ['FAILED'] ({error})".format(module_name=self.name,
- job_name=job_name,
- error=error))
- continue
- else:
- Logger.debug("job initialization: '{module_name} {job_name}' "
- "=> ['OK']".format(module_name=self.name,
- job_name=job_name or self.name))
- self.jobs[job_id] = Job(initialized_job=initialized_job,
- job_id=job_id)
- del self.config
- del self.service
- class Plugin(object):
- def __init__(self):
- self.loader = ModuleAndConfigLoader()
- self.modules = OrderedDict()
- self.sleep_time = 1
- self.runs_counter = 0
- user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
- stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
- Logger.debug("loading '{0}'".format(user_config))
- self.config, error = self.loader.load_config_from_file(user_config)
- if error:
- Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
- Logger.debug("loading '{0}'".format(stock_config))
- self.config, error = self.loader.load_config_from_file(stock_config)
- if error:
- Logger.error("cannot load '{0}': {1}".format(stock_config, error))
- self.do_gc = self.config.get("gc_run", GC_RUN)
- self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
- if not self.config.get('enabled', True):
- run_and_exit(Logger.info)('DISABLED in configuration file.')
- self.load_and_initialize_modules()
- if not self.modules:
- run_and_exit(Logger.info)('No modules to run. Exit...')
- def __iter__(self):
- return iter(OrderedDict(self.modules).values())
- @property
- def jobs(self):
- return (job for mod in self for job in mod)
- @property
- def dead_jobs(self):
- return (job for job in self.jobs if job.is_dead())
- @property
- def autodetect_jobs(self):
- return [job for job in self.jobs if job.not_launched()]
- def enabled_modules(self):
- for mod in MODULES_TO_RUN:
- mod_name = mod[:-len(MODULE_EXTENSION)]
- mod_path = os.path.join(PLUGINS_DIR, mod)
- if any(
- [
- self.config.get('default_run', True) and self.config.get(mod_name, True),
- (not self.config.get('default_run')) and self.config.get(mod_name),
- ]
- ):
- yield RawModule(
- name=mod_name,
- path=mod_path,
- explicitly_enabled=self.config.get(mod_name),
- )
- def load_and_initialize_modules(self):
- for mod in self.enabled_modules():
- # Load module from file ------------------------------------------------------------
- loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
- log = Logger.error if error else Logger.debug
- log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
- module_name=mod.name))
- if error:
- Logger.error("load source error : {0}".format(error))
- continue
- # Load module config from file ------------------------------------------------------
- user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
- stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
- Logger.debug("loading '{0}'".format(user_config))
- loaded_config, error = self.loader.load_config_from_file(user_config)
- if error:
- Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
- Logger.debug("loading '{0}'".format(stock_config))
- loaded_config, error = self.loader.load_config_from_file(stock_config)
- if error:
- Logger.error("cannot load '{0}': {1}".format(stock_config, error))
- # Skip disabled modules
- if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
- Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
- continue
- # Module initialization ---------------------------------------------------
- initialized_module = Module(service=loaded_module, config=loaded_config)
- Logger.debug("module status: '{module_name}' => [{status}] "
- "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
- module_name=initialized_module.name,
- jobs_number=len(initialized_module)))
- if initialized_module:
- self.modules[initialized_module.name] = initialized_module
- @staticmethod
- def check_job(job):
- """
- :param job: <Job>
- :return:
- """
- try:
- check_ok = bool(job.check())
- except Exception as error:
- job.error('check() unhandled exception: {error}'.format(error=error))
- return None
- else:
- return check_ok
- @staticmethod
- def create_job_charts(job):
- """
- :param job: <Job>
- :return:
- """
- try:
- create_ok = job.create()
- except Exception as error:
- job.error('create() unhandled exception: {error}'.format(error=error))
- return False
- else:
- return create_ok
- def delete_job(self, job):
- """
- :param job: <Job>
- :return:
- """
- del self.modules[job.module_name][job.id]
- def run_check(self):
- checked = list()
- for job in self.jobs:
- if job.name in checked:
- job.info('check() => [DROPPED] (already served by another job)')
- self.delete_job(job)
- continue
- ok = self.check_job(job)
- if ok:
- job.info('check() => [OK]')
- checked.append(job.name)
- job.checked = True
- continue
- if not job.is_autodetect() or ok is None:
- job.info('check() => [FAILED]')
- self.delete_job(job)
- else:
- job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
- def run_create(self):
- for job in self.jobs:
- if not job.checked:
- # skip autodetection_retry jobs
- continue
- ok = self.create_job_charts(job)
- if ok:
- job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
- job.created = True
- continue
- job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
- self.delete_job(job)
- def start(self):
- self.run_check()
- self.run_create()
- for job in self.jobs:
- if job.created:
- job.start()
- while True:
- if threading.active_count() <= 1 and not self.autodetect_jobs:
- run_and_exit(Logger.info)('FINISHED')
- sleep(self.sleep_time)
- self.cleanup()
- self.autodetect_retry()
- # FIXME: https://github.com/netdata/netdata/issues/3817
- if self.do_gc and self.runs_counter % self.gc_interval == 0:
- v = gc.collect()
- Logger.debug("GC full collection run result: {0}".format(v))
- def cleanup(self):
- for job in self.dead_jobs:
- self.delete_job(job)
- for mod in self:
- if not mod:
- del self.modules[mod.name]
- def autodetect_retry(self):
- self.runs_counter += self.sleep_time
- for job in self.autodetect_jobs:
- if self.runs_counter % job.recheck_every == 0:
- checked = self.check_job(job)
- if checked:
- created = self.create_job_charts(job)
- if not created:
- self.delete_job(job)
- continue
- job.start()
- if __name__ == '__main__':
- DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
- Logger = PythonDLogger()
- if DEBUG:
- Logger.logger.severity = 'DEBUG'
- if TRACE:
- Logger.log_traceback = True
- Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
- plugin = Plugin()
- plugin.start()
|