123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946 |
- #!/usr/bin/env bash
- '''':;
- pybinary=$(which python3 || which python || which python2)
- filtered=()
- for arg in "$@"
- do
- case $arg in
- -p*) pybinary=${arg:2}
- shift 1 ;;
- *) filtered+=("$arg") ;;
- esac
- done
- if [ "$pybinary" = "" ]
- then
- echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
- exit 1
- fi
- exec "$pybinary" "$0" "${filtered[@]}" # '''
- # -*- coding: utf-8 -*-
- # Description:
- # Author: Pawel Krupa (paulfantom)
- # Author: Ilya Mashchenko (l2isbad)
- # SPDX-License-Identifier: GPL-3.0-or-later
- import collections
- import copy
- import gc
- import json
- import os
- import pprint
- import re
- import sys
- import threading
- import time
- import types
- try:
- from queue import Queue
- except ImportError:
- from Queue import Queue
- PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
- if PY_VERSION > (3, 1):
- from importlib.machinery import SourceFileLoader
- else:
- from imp import load_source as SourceFileLoader
- ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
- ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
- ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
- ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS'
- ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
- ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
- ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
- def add_pythond_packages():
- pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
- pythond = os.path.abspath(pluginsd + '/../python.d')
- packages = os.path.join(pythond, 'python_modules')
- sys.path.append(packages)
- add_pythond_packages()
- from bases.collection import safe_print
- from bases.loggers import PythonDLogger
- from bases.loaders import load_config
- from third_party import filelock
- try:
- from collections import OrderedDict
- except ImportError:
- from third_party.ordereddict import OrderedDict
- def dirs():
- var_lib = os.getenv(
- ENV_NETDATA_LIB_DIR,
- '@varlibdir_POST@',
- )
- plugin_user_config = os.getenv(
- ENV_NETDATA_USER_CONFIG_DIR,
- '@configdir_POST@',
- )
- plugin_stock_config = os.getenv(
- ENV_NETDATA_STOCK_CONFIG_DIR,
- '@libconfigdir_POST@',
- )
- pluginsd = os.getenv(
- ENV_NETDATA_PLUGINS_DIR,
- os.path.dirname(__file__),
- )
- locks = os.getenv(
- ENV_NETDATA_LOCK_DIR,
- os.path.join('@varlibdir_POST@', 'lock')
- )
- modules_user_config = os.path.join(plugin_user_config, 'python.d')
- modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
- modules = os.path.abspath(pluginsd + '/../python.d')
- user_modules = [os.path.join(p, 'python.d') for p in
- os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if
- p]
- Dirs = collections.namedtuple(
- 'Dirs',
- [
- 'plugin_user_config',
- 'plugin_stock_config',
- 'modules_user_config',
- 'modules_stock_config',
- 'modules',
- 'user_modules',
- 'var_lib',
- 'locks',
- ]
- )
- return Dirs(
- plugin_user_config,
- plugin_stock_config,
- modules_user_config,
- modules_stock_config,
- modules,
- user_modules,
- var_lib,
- locks,
- )
- DIRS = dirs()
- IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty()
- MODULE_SUFFIX = '.chart.py'
- def find_available_modules(*directories):
- AvailableModule = collections.namedtuple(
- 'AvailableModule',
- [
- 'filepath',
- 'name',
- ]
- )
- available = list()
- for d in directories:
- try:
- if not os.path.isdir(d):
- continue
- files = sorted(os.listdir(d))
- except OSError:
- continue
- modules = [m for m in files if m.endswith(MODULE_SUFFIX)]
- available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules])
- return available
- def available_modules():
- obsolete = (
- 'apache_cache', # replaced by web_log
- 'cpuidle', # rewritten in C
- 'cpufreq', # rewritten in C
- 'gunicorn_log', # replaced by web_log
- 'linux_power_supply', # rewritten in C
- 'nginx_log', # replaced by web_log
- 'mdstat', # rewritten in C
- 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
- 'unbound', # rewritten in Go
- )
- stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete]
- user = find_available_modules(*DIRS.user_modules)
- available, seen = list(), set()
- for m in user + stock:
- if m.name in seen:
- continue
- seen.add(m.name)
- available.append(m)
- return available
- AVAILABLE_MODULES = available_modules()
- JOB_BASE_CONF = {
- 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
- 'priority': 60000,
- 'autodetection_retry': 0,
- 'chart_cleanup': 10,
- 'penalty': True,
- 'name': str(),
- }
- PLUGIN_BASE_CONF = {
- 'enabled': True,
- 'default_run': True,
- 'gc_run': True,
- 'gc_interval': 300,
- }
- def multi_path_find(name, *paths):
- for path in paths:
- abs_name = os.path.join(path, name)
- if os.path.isfile(abs_name):
- return abs_name
- return str()
- def load_module(name, filepath):
- module = SourceFileLoader('pythond_' + name, filepath)
- if isinstance(module, types.ModuleType):
- return module
- return module.load_module()
- class ModuleConfig:
- def __init__(self, name, config=None):
- self.name = name
- self.config = config or OrderedDict()
- self.is_stock = False
- def load(self, abs_path):
- if not IS_ATTY:
- self.is_stock = abs_path.startswith(DIRS.modules_stock_config)
- self.config.update(load_config(abs_path) or dict())
- def defaults(self):
- keys = (
- 'update_every',
- 'priority',
- 'autodetection_retry',
- 'chart_cleanup',
- 'penalty',
- )
- return dict((k, self.config[k]) for k in keys if k in self.config)
- def create_job(self, job_name, job_config=None):
- job_config = job_config or dict()
- config = OrderedDict()
- config.update(job_config)
- config['job_name'] = job_name
- config['__is_stock'] = self.is_stock
- for k, v in self.defaults().items():
- config.setdefault(k, v)
- return config
- def job_names(self):
- return [v for v in self.config if isinstance(self.config.get(v), dict)]
- def single_job(self):
- return [self.create_job(self.name, self.config)]
- def multi_job(self):
- return [self.create_job(n, self.config[n]) for n in self.job_names()]
- def create_jobs(self):
- return self.multi_job() or self.single_job()
- class JobsConfigsBuilder:
- def __init__(self, config_dirs):
- self.config_dirs = config_dirs
- self.log = PythonDLogger()
- self.job_defaults = None
- self.module_defaults = None
- self.min_update_every = None
- def load_module_config(self, module_name):
- name = '{0}.conf'.format(module_name)
- self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
- config = ModuleConfig(module_name)
- abs_path = multi_path_find(name, *self.config_dirs)
- if not abs_path:
- self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
- return config
- self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
- try:
- config.load(abs_path)
- except Exception as error:
- self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
- return None
- self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
- return config
- @staticmethod
- def apply_defaults(jobs, defaults):
- if defaults is None:
- return
- for k, v in defaults.items():
- for job in jobs:
- job.setdefault(k, v)
- def set_min_update_every(self, jobs, min_update_every):
- if min_update_every is None:
- return
- for job in jobs:
- if 'update_every' in job and job['update_every'] < self.min_update_every:
- job['update_every'] = self.min_update_every
- def build(self, module_name):
- config = self.load_module_config(module_name)
- if config is None:
- return None
- configs = config.create_jobs()
- if not config.is_stock:
- self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
- self.apply_defaults(configs, self.module_defaults)
- self.apply_defaults(configs, self.job_defaults)
- self.set_min_update_every(configs, self.min_update_every)
- return configs
- JOB_STATUS_ACTIVE = 'active'
- JOB_STATUS_RECOVERING = 'recovering'
- JOB_STATUS_DROPPED = 'dropped'
- JOB_STATUS_INIT = 'initial'
- class Job(threading.Thread):
- inf = -1
- def __init__(self, service, module_name, config):
- threading.Thread.__init__(self)
- self.daemon = True
- self.service = service
- self.module_name = module_name
- self.config = config
- self.real_name = config['job_name']
- self.actual_name = config['override_name'] or self.real_name
- self.autodetection_retry = config['autodetection_retry']
- self.checks = self.inf
- self.job = None
- self.is_stock = config.get('__is_stock', False)
- self.status = JOB_STATUS_INIT
- def is_inited(self):
- return self.job is not None
- def init(self):
- self.job = self.service(configuration=copy.deepcopy(self.config))
- def full_name(self):
- return self.job.name
- def check(self):
- if self.is_stock:
- self.job.logger.mute()
- ok = self.job.check()
- self.job.logger.unmute()
- self.checks -= self.checks != self.inf and not ok
- return ok
- def create(self):
- self.job.create()
- def need_to_recheck(self):
- return self.autodetection_retry != 0 and self.checks != 0
- def run(self):
- self.job.run()
- class ModuleSrc:
- def __init__(self, m):
- self.name = m.name
- self.filepath = m.filepath
- self.src = None
- def load(self):
- self.src = load_module(self.name, self.filepath)
- def get(self, key):
- return getattr(self.src, key, None)
- def service(self):
- return self.get('Service')
- def defaults(self):
- keys = (
- 'update_every',
- 'priority',
- 'autodetection_retry',
- 'chart_cleanup',
- 'penalty',
- )
- return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
- def is_disabled_by_default(self):
- return bool(self.get('disabled_by_default'))
- class JobsStatuses:
- def __init__(self):
- self.items = OrderedDict()
- def dump(self):
- return json.dumps(self.items, indent=2)
- def get(self, module_name, job_name):
- if module_name not in self.items:
- return None
- return self.items[module_name].get(job_name)
- def has(self, module_name, job_name):
- return self.get(module_name, job_name) is not None
- def from_file(self, path):
- with open(path) as f:
- data = json.load(f)
- return self.from_json(data)
- @staticmethod
- def from_json(items):
- if not isinstance(items, dict):
- raise Exception('items obj has wrong type : {0}'.format(type(items)))
- if not items:
- return JobsStatuses()
- v = OrderedDict()
- for mod_name in sorted(items):
- if not items[mod_name]:
- continue
- v[mod_name] = OrderedDict()
- for job_name in sorted(items[mod_name]):
- v[mod_name][job_name] = items[mod_name][job_name]
- rv = JobsStatuses()
- rv.items = v
- return rv
- @staticmethod
- def from_jobs(jobs):
- v = OrderedDict()
- for job in jobs:
- status = job.status
- if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
- continue
- if job.module_name not in v:
- v[job.module_name] = OrderedDict()
- v[job.module_name][job.real_name] = status
- rv = JobsStatuses()
- rv.items = v
- return rv
- class StdoutSaver:
- @staticmethod
- def save(dump):
- print(dump)
- class CachedFileSaver:
- def __init__(self, path):
- self.last_save_success = False
- self.last_saved_dump = str()
- self.path = path
- def save(self, dump):
- if self.last_save_success and self.last_saved_dump == dump:
- return
- try:
- with open(self.path, 'w') as out:
- out.write(dump)
- except Exception:
- self.last_save_success = False
- raise
- self.last_saved_dump = dump
- self.last_save_success = True
- class PluginConfig(dict):
- def __init__(self, *args):
- dict.__init__(self, *args)
- def is_module_explicitly_enabled(self, module_name):
- return self._is_module_enabled(module_name, True)
- def is_module_enabled(self, module_name):
- return self._is_module_enabled(module_name, False)
- def _is_module_enabled(self, module_name, explicit):
- if module_name in self:
- return self[module_name]
- if explicit:
- return False
- return self['default_run']
- class FileLockRegistry:
- def __init__(self, path):
- self.path = path
- self.locks = dict()
- @staticmethod
- def rename(name):
- # go version name is 'docker'
- if name.startswith("dockerd"):
- name = "docker" + name[7:]
- return name
- def register(self, name):
- name = self.rename(name)
- if name in self.locks:
- return
- file = os.path.join(self.path, '{0}.collector.lock'.format(name))
- lock = filelock.FileLock(file)
- lock.acquire(timeout=0)
- self.locks[name] = lock
- def unregister(self, name):
- name = self.rename(name)
- if name not in self.locks:
- return
- lock = self.locks[name]
- lock.release()
- del self.locks[name]
- class DummyRegistry:
- def register(self, name):
- pass
- def unregister(self, name):
- pass
- class Plugin:
- config_name = 'python.d.conf'
- jobs_status_dump_name = 'pythond-jobs-statuses.json'
- def __init__(self, modules_to_run, min_update_every, registry):
- self.modules_to_run = modules_to_run
- self.min_update_every = min_update_every
- self.config = PluginConfig(PLUGIN_BASE_CONF)
- self.log = PythonDLogger()
- self.registry = registry
- self.started_jobs = collections.defaultdict(dict)
- self.jobs = list()
- self.saver = None
- self.runs = 0
- def load_config_file(self, filepath, expected):
- self.log.debug("looking for '{0}'".format(filepath))
- if not os.path.isfile(filepath):
- log = self.log.info if not expected else self.log.error
- log("'{0}' was not found".format(filepath))
- return dict()
- try:
- config = load_config(filepath)
- except Exception as error:
- self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
- return dict()
- self.log.debug("'{0}' is loaded".format(filepath))
- return config
- def load_config(self):
- user_config = self.load_config_file(
- filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
- expected=False,
- )
- stock_config = self.load_config_file(
- filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
- expected=True,
- )
- self.config.update(stock_config)
- self.config.update(user_config)
- def load_job_statuses(self):
- self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
- abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
- if not abs_path:
- self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
- return
- self.log.debug("loading '{0}'".format(abs_path))
- try:
- statuses = JobsStatuses().from_file(abs_path)
- except Exception as error:
- self.log.error("'{0}' invalid JSON format: {1}".format(
- abs_path, ' '.join([v.strip() for v in str(error).split('\n')])))
- return None
- self.log.debug("'{0}' is loaded".format(abs_path))
- return statuses
- def create_jobs(self, job_statuses=None):
- paths = [
- DIRS.modules_user_config,
- DIRS.modules_stock_config,
- ]
- builder = JobsConfigsBuilder(paths)
- builder.job_defaults = JOB_BASE_CONF
- builder.min_update_every = self.min_update_every
- jobs = list()
- for m in self.modules_to_run:
- if not self.config.is_module_enabled(m.name):
- self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name))
- continue
- src = ModuleSrc(m)
- try:
- src.load()
- except Exception as error:
- self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error)))
- continue
- self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath))
- if not (src.service() and callable(src.service())):
- self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name))
- continue
- if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name):
- self.log.info("[{0}] is disabled by default, skipping it".format(m.name))
- continue
- builder.module_defaults = src.defaults()
- configs = builder.build(m.name)
- if not configs:
- self.log.info("[{0}] has no job configs, skipping it".format(m.name))
- continue
- for config in configs:
- config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
- config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
- job = Job(src.service(), m.name, config)
- was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
- if was_previously_active and job.autodetection_retry == 0:
- self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
- job.module_name, job.real_name))
- job.checks = 11
- job.autodetection_retry = 30
- jobs.append(job)
- return jobs
- def setup(self):
- self.load_config()
- if not self.config['enabled']:
- self.log.info('disabled in the configuration file')
- return False
- statuses = self.load_job_statuses()
- self.jobs = self.create_jobs(statuses)
- if not self.jobs:
- self.log.info('no jobs to run')
- return False
- if not IS_ATTY:
- abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
- self.saver = CachedFileSaver(abs_path)
- return True
- def start_jobs(self, *jobs):
- for job in jobs:
- if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
- continue
- if job.actual_name in self.started_jobs[job.module_name]:
- self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
- job.module_name, job.real_name))
- job.status = JOB_STATUS_DROPPED
- continue
- if not job.is_inited():
- try:
- job.init()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
- job.status = JOB_STATUS_DROPPED
- continue
- try:
- ok = job.check()
- except Exception as error:
- if not job.is_stock:
- self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
- job.status = JOB_STATUS_DROPPED
- continue
- if not ok:
- if not job.is_stock:
- self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
- job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
- continue
- self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
- try:
- self.registry.register(job.full_name())
- except filelock.Timeout as error:
- self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
- job.module_name, job.real_name, error))
- job.status = JOB_STATUS_DROPPED
- continue
- except Exception as error:
- self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
- job.module_name, job.real_name, error))
- job.status = JOB_STATUS_DROPPED
- continue
- try:
- job.create()
- except Exception as error:
- self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
- job.module_name, job.real_name, repr(error)))
- job.status = JOB_STATUS_DROPPED
- try:
- self.registry.unregister(job.full_name())
- except Exception as error:
- self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
- job.module_name, job.real_name, error))
- continue
- self.started_jobs[job.module_name] = job.actual_name
- job.status = JOB_STATUS_ACTIVE
- job.start()
- @staticmethod
- def keep_alive():
- if not IS_ATTY:
- safe_print('\n')
- def garbage_collection(self):
- if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
- v = gc.collect()
- self.log.debug('GC collection run result: {0}'.format(v))
- def restart_recovering_jobs(self):
- for job in self.jobs:
- if job.status != JOB_STATUS_RECOVERING:
- continue
- if self.runs % job.autodetection_retry != 0:
- continue
- self.start_jobs(job)
- def cleanup_jobs(self):
- self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
- def have_alive_jobs(self):
- return next(
- (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
- False,
- )
- def save_job_statuses(self):
- if self.saver is None:
- return
- if self.runs % 10 != 0:
- return
- dump = JobsStatuses().from_jobs(self.jobs).dump()
- try:
- self.saver.save(dump)
- except Exception as error:
- self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
- def serve_once(self):
- if not self.have_alive_jobs():
- self.log.info('no jobs to serve')
- return False
- time.sleep(1)
- self.runs += 1
- self.keep_alive()
- self.garbage_collection()
- self.cleanup_jobs()
- self.restart_recovering_jobs()
- self.save_job_statuses()
- return True
- def serve(self):
- while self.serve_once():
- pass
- def run(self):
- self.start_jobs(*self.jobs)
- self.serve()
- def parse_command_line():
- opts = sys.argv[:][1:]
- debug = False
- trace = False
- nolock = False
- update_every = 1
- modules_to_run = list()
- def find_first_positive_int(values):
- return next((v for v in values if v.isdigit() and int(v) >= 1), None)
- u = find_first_positive_int(opts)
- if u is not None:
- update_every = int(u)
- opts.remove(u)
- if 'debug' in opts:
- debug = True
- opts.remove('debug')
- if 'trace' in opts:
- trace = True
- opts.remove('trace')
- if 'nolock' in opts:
- nolock = True
- opts.remove('nolock')
- if opts:
- modules_to_run = list(opts)
- cmd = collections.namedtuple(
- 'CMD',
- [
- 'update_every',
- 'debug',
- 'trace',
- 'nolock',
- 'modules_to_run',
- ])
- return cmd(
- update_every,
- debug,
- trace,
- nolock,
- modules_to_run,
- )
- def guess_module(modules, *names):
- def guess(n):
- found = None
- for i, _ in enumerate(n):
- cur = [x for x in modules if x.startswith(name[:i + 1])]
- if not cur:
- return found
- found = cur
- return found
- guessed = list()
- for name in names:
- name = name.lower()
- m = guess(name)
- if m:
- guessed.extend(m)
- return sorted(set(guessed))
- def disable():
- if not IS_ATTY:
- safe_print('DISABLE')
- exit(0)
- def get_modules_to_run(cmd):
- if not cmd.modules_to_run:
- return AVAILABLE_MODULES
- modules_to_run, seen = list(), set()
- for m in AVAILABLE_MODULES:
- if m.name not in cmd.modules_to_run or m.name in seen:
- continue
- seen.add(m.name)
- modules_to_run.append(m)
- return modules_to_run
- def main():
- cmd = parse_command_line()
- log = PythonDLogger()
- level = os.getenv('NETDATA_LOG_LEVEL') or str()
- level = level.lower()
- if level == 'debug':
- log.logger.severity = 'DEBUG'
- elif level == 'info':
- log.logger.severity = 'INFO'
- elif level == 'warn' or level == 'warning':
- log.logger.severity = 'WARNING'
- elif level == 'err' or level == 'error':
- log.logger.severity = 'ERROR'
- if cmd.debug:
- log.logger.severity = 'DEBUG'
- if cmd.trace:
- log.log_traceback = True
- log.info('using python v{0}'.format(PY_VERSION[0]))
- if DIRS.locks and not cmd.nolock:
- registry = FileLockRegistry(DIRS.locks)
- else:
- registry = DummyRegistry()
- unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES])
- unknown = set(cmd.modules_to_run) - unique_avail_module_names
- if unknown:
- log.error('unknown modules : {0}'.format(sorted(list(unknown))))
- guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run)
- if guessed:
- log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
- return
- p = Plugin(
- get_modules_to_run(cmd),
- cmd.update_every,
- registry,
- )
- # cheap attempt to reduce chance of python.d job running before go.d
- # TODO: better implementation needed
- if not IS_ATTY:
- time.sleep(1.5)
- try:
- if not p.setup():
- return
- p.run()
- except KeyboardInterrupt:
- pass
- log.info('exiting from main...')
- if __name__ == "__main__":
- main()
- disable()
|