python.d.plugin.in 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. #!/usr/bin/env bash
  2. '''':;
  3. pybinary=$(which python3 || which python || which python2)
  4. filtered=()
  5. for arg in "$@"
  6. do
  7. case $arg in
  8. -p*) pybinary=${arg:2}
  9. shift 1 ;;
  10. *) filtered+=("$arg") ;;
  11. esac
  12. done
  13. if [ "$pybinary" = "" ]
  14. then
  15. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
  16. exit 1
  17. fi
  18. exec "$pybinary" "$0" "${filtered[@]}" # '''
  19. # -*- coding: utf-8 -*-
  20. # Description:
  21. # Author: Pawel Krupa (paulfantom)
  22. # Author: Ilya Mashchenko (l2isbad)
  23. # SPDX-License-Identifier: GPL-3.0-or-later
  24. import collections
  25. import copy
  26. import gc
  27. import json
  28. import os
  29. import pprint
  30. import re
  31. import sys
  32. import threading
  33. import time
  34. import types
  35. try:
  36. from queue import Queue
  37. except ImportError:
  38. from Queue import Queue
  39. PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
  40. if PY_VERSION > (3, 1):
  41. from importlib.machinery import SourceFileLoader
  42. else:
  43. from imp import load_source as SourceFileLoader
  44. ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
  45. ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
  46. ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
  47. ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS'
  48. ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
  49. ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
  50. ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
  51. def add_pythond_packages():
  52. pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
  53. pythond = os.path.abspath(pluginsd + '/../python.d')
  54. packages = os.path.join(pythond, 'python_modules')
  55. sys.path.append(packages)
  56. add_pythond_packages()
  57. from bases.collection import safe_print
  58. from bases.loggers import PythonDLogger
  59. from bases.loaders import load_config
  60. from third_party import filelock
  61. try:
  62. from collections import OrderedDict
  63. except ImportError:
  64. from third_party.ordereddict import OrderedDict
  65. def dirs():
  66. var_lib = os.getenv(
  67. ENV_NETDATA_LIB_DIR,
  68. '@varlibdir_POST@',
  69. )
  70. plugin_user_config = os.getenv(
  71. ENV_NETDATA_USER_CONFIG_DIR,
  72. '@configdir_POST@',
  73. )
  74. plugin_stock_config = os.getenv(
  75. ENV_NETDATA_STOCK_CONFIG_DIR,
  76. '@libconfigdir_POST@',
  77. )
  78. pluginsd = os.getenv(
  79. ENV_NETDATA_PLUGINS_DIR,
  80. os.path.dirname(__file__),
  81. )
  82. locks = os.getenv(
  83. ENV_NETDATA_LOCK_DIR,
  84. os.path.join('@varlibdir_POST@', 'lock')
  85. )
  86. modules_user_config = os.path.join(plugin_user_config, 'python.d')
  87. modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
  88. modules = os.path.abspath(pluginsd + '/../python.d')
  89. user_modules = [os.path.join(p, 'python.d') for p in
  90. os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if
  91. p]
  92. Dirs = collections.namedtuple(
  93. 'Dirs',
  94. [
  95. 'plugin_user_config',
  96. 'plugin_stock_config',
  97. 'modules_user_config',
  98. 'modules_stock_config',
  99. 'modules',
  100. 'user_modules',
  101. 'var_lib',
  102. 'locks',
  103. ]
  104. )
  105. return Dirs(
  106. plugin_user_config,
  107. plugin_stock_config,
  108. modules_user_config,
  109. modules_stock_config,
  110. modules,
  111. user_modules,
  112. var_lib,
  113. locks,
  114. )
  115. DIRS = dirs()
  116. IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty()
  117. MODULE_SUFFIX = '.chart.py'
  118. def find_available_modules(*directories):
  119. AvailableModule = collections.namedtuple(
  120. 'AvailableModule',
  121. [
  122. 'filepath',
  123. 'name',
  124. ]
  125. )
  126. available = list()
  127. for d in directories:
  128. try:
  129. if not os.path.isdir(d):
  130. continue
  131. files = sorted(os.listdir(d))
  132. except OSError:
  133. continue
  134. modules = [m for m in files if m.endswith(MODULE_SUFFIX)]
  135. available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules])
  136. return available
  137. def available_modules():
  138. obsolete = (
  139. 'apache_cache', # replaced by web_log
  140. 'cpuidle', # rewritten in C
  141. 'cpufreq', # rewritten in C
  142. 'gunicorn_log', # replaced by web_log
  143. 'linux_power_supply', # rewritten in C
  144. 'nginx_log', # replaced by web_log
  145. 'mdstat', # rewritten in C
  146. 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
  147. 'unbound', # rewritten in Go
  148. )
  149. stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete]
  150. user = find_available_modules(*DIRS.user_modules)
  151. available, seen = list(), set()
  152. for m in user + stock:
  153. if m.name in seen:
  154. continue
  155. seen.add(m.name)
  156. available.append(m)
  157. return available
  158. AVAILABLE_MODULES = available_modules()
  159. JOB_BASE_CONF = {
  160. 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
  161. 'priority': 60000,
  162. 'autodetection_retry': 0,
  163. 'chart_cleanup': 10,
  164. 'penalty': True,
  165. 'name': str(),
  166. }
  167. PLUGIN_BASE_CONF = {
  168. 'enabled': True,
  169. 'default_run': True,
  170. 'gc_run': True,
  171. 'gc_interval': 300,
  172. }
  173. def multi_path_find(name, *paths):
  174. for path in paths:
  175. abs_name = os.path.join(path, name)
  176. if os.path.isfile(abs_name):
  177. return abs_name
  178. return str()
  179. def load_module(name, filepath):
  180. module = SourceFileLoader('pythond_' + name, filepath)
  181. if isinstance(module, types.ModuleType):
  182. return module
  183. return module.load_module()
  184. class ModuleConfig:
  185. def __init__(self, name, config=None):
  186. self.name = name
  187. self.config = config or OrderedDict()
  188. self.is_stock = False
  189. def load(self, abs_path):
  190. if not IS_ATTY:
  191. self.is_stock = abs_path.startswith(DIRS.modules_stock_config)
  192. self.config.update(load_config(abs_path) or dict())
  193. def defaults(self):
  194. keys = (
  195. 'update_every',
  196. 'priority',
  197. 'autodetection_retry',
  198. 'chart_cleanup',
  199. 'penalty',
  200. )
  201. return dict((k, self.config[k]) for k in keys if k in self.config)
  202. def create_job(self, job_name, job_config=None):
  203. job_config = job_config or dict()
  204. config = OrderedDict()
  205. config.update(job_config)
  206. config['job_name'] = job_name
  207. config['__is_stock'] = self.is_stock
  208. for k, v in self.defaults().items():
  209. config.setdefault(k, v)
  210. return config
  211. def job_names(self):
  212. return [v for v in self.config if isinstance(self.config.get(v), dict)]
  213. def single_job(self):
  214. return [self.create_job(self.name, self.config)]
  215. def multi_job(self):
  216. return [self.create_job(n, self.config[n]) for n in self.job_names()]
  217. def create_jobs(self):
  218. return self.multi_job() or self.single_job()
  219. class JobsConfigsBuilder:
  220. def __init__(self, config_dirs):
  221. self.config_dirs = config_dirs
  222. self.log = PythonDLogger()
  223. self.job_defaults = None
  224. self.module_defaults = None
  225. self.min_update_every = None
  226. def load_module_config(self, module_name):
  227. name = '{0}.conf'.format(module_name)
  228. self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
  229. config = ModuleConfig(module_name)
  230. abs_path = multi_path_find(name, *self.config_dirs)
  231. if not abs_path:
  232. self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
  233. return config
  234. self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
  235. try:
  236. config.load(abs_path)
  237. except Exception as error:
  238. self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
  239. return None
  240. self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
  241. return config
  242. @staticmethod
  243. def apply_defaults(jobs, defaults):
  244. if defaults is None:
  245. return
  246. for k, v in defaults.items():
  247. for job in jobs:
  248. job.setdefault(k, v)
  249. def set_min_update_every(self, jobs, min_update_every):
  250. if min_update_every is None:
  251. return
  252. for job in jobs:
  253. if 'update_every' in job and job['update_every'] < self.min_update_every:
  254. job['update_every'] = self.min_update_every
  255. def build(self, module_name):
  256. config = self.load_module_config(module_name)
  257. if config is None:
  258. return None
  259. configs = config.create_jobs()
  260. if not config.is_stock:
  261. self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
  262. self.apply_defaults(configs, self.module_defaults)
  263. self.apply_defaults(configs, self.job_defaults)
  264. self.set_min_update_every(configs, self.min_update_every)
  265. return configs
  266. JOB_STATUS_ACTIVE = 'active'
  267. JOB_STATUS_RECOVERING = 'recovering'
  268. JOB_STATUS_DROPPED = 'dropped'
  269. JOB_STATUS_INIT = 'initial'
  270. class Job(threading.Thread):
  271. inf = -1
  272. def __init__(self, service, module_name, config):
  273. threading.Thread.__init__(self)
  274. self.daemon = True
  275. self.service = service
  276. self.module_name = module_name
  277. self.config = config
  278. self.real_name = config['job_name']
  279. self.actual_name = config['override_name'] or self.real_name
  280. self.autodetection_retry = config['autodetection_retry']
  281. self.checks = self.inf
  282. self.job = None
  283. self.is_stock = config.get('__is_stock', False)
  284. self.status = JOB_STATUS_INIT
  285. def is_inited(self):
  286. return self.job is not None
  287. def init(self):
  288. self.job = self.service(configuration=copy.deepcopy(self.config))
  289. def full_name(self):
  290. return self.job.name
  291. def check(self):
  292. if self.is_stock:
  293. self.job.logger.mute()
  294. ok = self.job.check()
  295. self.job.logger.unmute()
  296. self.checks -= self.checks != self.inf and not ok
  297. return ok
  298. def create(self):
  299. self.job.create()
  300. def need_to_recheck(self):
  301. return self.autodetection_retry != 0 and self.checks != 0
  302. def run(self):
  303. self.job.run()
  304. class ModuleSrc:
  305. def __init__(self, m):
  306. self.name = m.name
  307. self.filepath = m.filepath
  308. self.src = None
  309. def load(self):
  310. self.src = load_module(self.name, self.filepath)
  311. def get(self, key):
  312. return getattr(self.src, key, None)
  313. def service(self):
  314. return self.get('Service')
  315. def defaults(self):
  316. keys = (
  317. 'update_every',
  318. 'priority',
  319. 'autodetection_retry',
  320. 'chart_cleanup',
  321. 'penalty',
  322. )
  323. return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
  324. def is_disabled_by_default(self):
  325. return bool(self.get('disabled_by_default'))
  326. class JobsStatuses:
  327. def __init__(self):
  328. self.items = OrderedDict()
  329. def dump(self):
  330. return json.dumps(self.items, indent=2)
  331. def get(self, module_name, job_name):
  332. if module_name not in self.items:
  333. return None
  334. return self.items[module_name].get(job_name)
  335. def has(self, module_name, job_name):
  336. return self.get(module_name, job_name) is not None
  337. def from_file(self, path):
  338. with open(path) as f:
  339. data = json.load(f)
  340. return self.from_json(data)
  341. @staticmethod
  342. def from_json(items):
  343. if not isinstance(items, dict):
  344. raise Exception('items obj has wrong type : {0}'.format(type(items)))
  345. if not items:
  346. return JobsStatuses()
  347. v = OrderedDict()
  348. for mod_name in sorted(items):
  349. if not items[mod_name]:
  350. continue
  351. v[mod_name] = OrderedDict()
  352. for job_name in sorted(items[mod_name]):
  353. v[mod_name][job_name] = items[mod_name][job_name]
  354. rv = JobsStatuses()
  355. rv.items = v
  356. return rv
  357. @staticmethod
  358. def from_jobs(jobs):
  359. v = OrderedDict()
  360. for job in jobs:
  361. status = job.status
  362. if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
  363. continue
  364. if job.module_name not in v:
  365. v[job.module_name] = OrderedDict()
  366. v[job.module_name][job.real_name] = status
  367. rv = JobsStatuses()
  368. rv.items = v
  369. return rv
  370. class StdoutSaver:
  371. @staticmethod
  372. def save(dump):
  373. print(dump)
  374. class CachedFileSaver:
  375. def __init__(self, path):
  376. self.last_save_success = False
  377. self.last_saved_dump = str()
  378. self.path = path
  379. def save(self, dump):
  380. if self.last_save_success and self.last_saved_dump == dump:
  381. return
  382. try:
  383. with open(self.path, 'w') as out:
  384. out.write(dump)
  385. except Exception:
  386. self.last_save_success = False
  387. raise
  388. self.last_saved_dump = dump
  389. self.last_save_success = True
  390. class PluginConfig(dict):
  391. def __init__(self, *args):
  392. dict.__init__(self, *args)
  393. def is_module_explicitly_enabled(self, module_name):
  394. return self._is_module_enabled(module_name, True)
  395. def is_module_enabled(self, module_name):
  396. return self._is_module_enabled(module_name, False)
  397. def _is_module_enabled(self, module_name, explicit):
  398. if module_name in self:
  399. return self[module_name]
  400. if explicit:
  401. return False
  402. return self['default_run']
  403. class FileLockRegistry:
  404. def __init__(self, path):
  405. self.path = path
  406. self.locks = dict()
  407. @staticmethod
  408. def rename(name):
  409. # go version name is 'docker'
  410. if name.startswith("dockerd"):
  411. name = "docker" + name[7:]
  412. return name
  413. def register(self, name):
  414. name = self.rename(name)
  415. if name in self.locks:
  416. return
  417. file = os.path.join(self.path, '{0}.collector.lock'.format(name))
  418. lock = filelock.FileLock(file)
  419. lock.acquire(timeout=0)
  420. self.locks[name] = lock
  421. def unregister(self, name):
  422. name = self.rename(name)
  423. if name not in self.locks:
  424. return
  425. lock = self.locks[name]
  426. lock.release()
  427. del self.locks[name]
  428. class DummyRegistry:
  429. def register(self, name):
  430. pass
  431. def unregister(self, name):
  432. pass
  433. class Plugin:
  434. config_name = 'python.d.conf'
  435. jobs_status_dump_name = 'pythond-jobs-statuses.json'
  436. def __init__(self, modules_to_run, min_update_every, registry):
  437. self.modules_to_run = modules_to_run
  438. self.min_update_every = min_update_every
  439. self.config = PluginConfig(PLUGIN_BASE_CONF)
  440. self.log = PythonDLogger()
  441. self.registry = registry
  442. self.started_jobs = collections.defaultdict(dict)
  443. self.jobs = list()
  444. self.saver = None
  445. self.runs = 0
  446. def load_config_file(self, filepath, expected):
  447. self.log.debug("looking for '{0}'".format(filepath))
  448. if not os.path.isfile(filepath):
  449. log = self.log.info if not expected else self.log.error
  450. log("'{0}' was not found".format(filepath))
  451. return dict()
  452. try:
  453. config = load_config(filepath)
  454. except Exception as error:
  455. self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
  456. return dict()
  457. self.log.debug("'{0}' is loaded".format(filepath))
  458. return config
  459. def load_config(self):
  460. user_config = self.load_config_file(
  461. filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
  462. expected=False,
  463. )
  464. stock_config = self.load_config_file(
  465. filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
  466. expected=True,
  467. )
  468. self.config.update(stock_config)
  469. self.config.update(user_config)
  470. def load_job_statuses(self):
  471. self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
  472. abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
  473. if not abs_path:
  474. self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
  475. return
  476. self.log.debug("loading '{0}'".format(abs_path))
  477. try:
  478. statuses = JobsStatuses().from_file(abs_path)
  479. except Exception as error:
  480. self.log.error("'{0}' invalid JSON format: {1}".format(
  481. abs_path, ' '.join([v.strip() for v in str(error).split('\n')])))
  482. return None
  483. self.log.debug("'{0}' is loaded".format(abs_path))
  484. return statuses
  485. def create_jobs(self, job_statuses=None):
  486. paths = [
  487. DIRS.modules_user_config,
  488. DIRS.modules_stock_config,
  489. ]
  490. builder = JobsConfigsBuilder(paths)
  491. builder.job_defaults = JOB_BASE_CONF
  492. builder.min_update_every = self.min_update_every
  493. jobs = list()
  494. for m in self.modules_to_run:
  495. if not self.config.is_module_enabled(m.name):
  496. self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name))
  497. continue
  498. src = ModuleSrc(m)
  499. try:
  500. src.load()
  501. except Exception as error:
  502. self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error)))
  503. continue
  504. self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath))
  505. if not (src.service() and callable(src.service())):
  506. self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name))
  507. continue
  508. if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name):
  509. self.log.info("[{0}] is disabled by default, skipping it".format(m.name))
  510. continue
  511. builder.module_defaults = src.defaults()
  512. configs = builder.build(m.name)
  513. if not configs:
  514. self.log.info("[{0}] has no job configs, skipping it".format(m.name))
  515. continue
  516. for config in configs:
  517. config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
  518. config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
  519. job = Job(src.service(), m.name, config)
  520. was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
  521. if was_previously_active and job.autodetection_retry == 0:
  522. self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
  523. job.module_name, job.real_name))
  524. job.checks = 11
  525. job.autodetection_retry = 30
  526. jobs.append(job)
  527. return jobs
  528. def setup(self):
  529. self.load_config()
  530. if not self.config['enabled']:
  531. self.log.info('disabled in the configuration file')
  532. return False
  533. statuses = self.load_job_statuses()
  534. self.jobs = self.create_jobs(statuses)
  535. if not self.jobs:
  536. self.log.info('no jobs to run')
  537. return False
  538. if not IS_ATTY:
  539. abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
  540. self.saver = CachedFileSaver(abs_path)
  541. return True
  542. def start_jobs(self, *jobs):
  543. for job in jobs:
  544. if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
  545. continue
  546. if job.actual_name in self.started_jobs[job.module_name]:
  547. self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
  548. job.module_name, job.real_name))
  549. job.status = JOB_STATUS_DROPPED
  550. continue
  551. if not job.is_inited():
  552. try:
  553. job.init()
  554. except Exception as error:
  555. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
  556. job.module_name, job.real_name, repr(error)))
  557. job.status = JOB_STATUS_DROPPED
  558. continue
  559. try:
  560. ok = job.check()
  561. except Exception as error:
  562. if not job.is_stock:
  563. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  564. job.module_name, job.real_name, repr(error)))
  565. job.status = JOB_STATUS_DROPPED
  566. continue
  567. if not ok:
  568. if not job.is_stock:
  569. self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
  570. job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
  571. continue
  572. self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
  573. try:
  574. self.registry.register(job.full_name())
  575. except filelock.Timeout as error:
  576. self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
  577. job.module_name, job.real_name, error))
  578. job.status = JOB_STATUS_DROPPED
  579. continue
  580. except Exception as error:
  581. self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
  582. job.module_name, job.real_name, error))
  583. job.status = JOB_STATUS_DROPPED
  584. continue
  585. try:
  586. job.create()
  587. except Exception as error:
  588. self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
  589. job.module_name, job.real_name, repr(error)))
  590. job.status = JOB_STATUS_DROPPED
  591. try:
  592. self.registry.unregister(job.full_name())
  593. except Exception as error:
  594. self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
  595. job.module_name, job.real_name, error))
  596. continue
  597. self.started_jobs[job.module_name] = job.actual_name
  598. job.status = JOB_STATUS_ACTIVE
  599. job.start()
  600. @staticmethod
  601. def keep_alive():
  602. if not IS_ATTY:
  603. safe_print('\n')
  604. def garbage_collection(self):
  605. if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
  606. v = gc.collect()
  607. self.log.debug('GC collection run result: {0}'.format(v))
  608. def restart_recovering_jobs(self):
  609. for job in self.jobs:
  610. if job.status != JOB_STATUS_RECOVERING:
  611. continue
  612. if self.runs % job.autodetection_retry != 0:
  613. continue
  614. self.start_jobs(job)
  615. def cleanup_jobs(self):
  616. self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
  617. def have_alive_jobs(self):
  618. return next(
  619. (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
  620. False,
  621. )
  622. def save_job_statuses(self):
  623. if self.saver is None:
  624. return
  625. if self.runs % 10 != 0:
  626. return
  627. dump = JobsStatuses().from_jobs(self.jobs).dump()
  628. try:
  629. self.saver.save(dump)
  630. except Exception as error:
  631. self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
  632. def serve_once(self):
  633. if not self.have_alive_jobs():
  634. self.log.info('no jobs to serve')
  635. return False
  636. time.sleep(1)
  637. self.runs += 1
  638. self.keep_alive()
  639. self.garbage_collection()
  640. self.cleanup_jobs()
  641. self.restart_recovering_jobs()
  642. self.save_job_statuses()
  643. return True
  644. def serve(self):
  645. while self.serve_once():
  646. pass
  647. def run(self):
  648. self.start_jobs(*self.jobs)
  649. self.serve()
  650. def parse_command_line():
  651. opts = sys.argv[:][1:]
  652. debug = False
  653. trace = False
  654. nolock = False
  655. update_every = 1
  656. modules_to_run = list()
  657. def find_first_positive_int(values):
  658. return next((v for v in values if v.isdigit() and int(v) >= 1), None)
  659. u = find_first_positive_int(opts)
  660. if u is not None:
  661. update_every = int(u)
  662. opts.remove(u)
  663. if 'debug' in opts:
  664. debug = True
  665. opts.remove('debug')
  666. if 'trace' in opts:
  667. trace = True
  668. opts.remove('trace')
  669. if 'nolock' in opts:
  670. nolock = True
  671. opts.remove('nolock')
  672. if opts:
  673. modules_to_run = list(opts)
  674. cmd = collections.namedtuple(
  675. 'CMD',
  676. [
  677. 'update_every',
  678. 'debug',
  679. 'trace',
  680. 'nolock',
  681. 'modules_to_run',
  682. ])
  683. return cmd(
  684. update_every,
  685. debug,
  686. trace,
  687. nolock,
  688. modules_to_run,
  689. )
  690. def guess_module(modules, *names):
  691. def guess(n):
  692. found = None
  693. for i, _ in enumerate(n):
  694. cur = [x for x in modules if x.startswith(name[:i + 1])]
  695. if not cur:
  696. return found
  697. found = cur
  698. return found
  699. guessed = list()
  700. for name in names:
  701. name = name.lower()
  702. m = guess(name)
  703. if m:
  704. guessed.extend(m)
  705. return sorted(set(guessed))
  706. def disable():
  707. if not IS_ATTY:
  708. safe_print('DISABLE')
  709. exit(0)
  710. def get_modules_to_run(cmd):
  711. if not cmd.modules_to_run:
  712. return AVAILABLE_MODULES
  713. modules_to_run, seen = list(), set()
  714. for m in AVAILABLE_MODULES:
  715. if m.name not in cmd.modules_to_run or m.name in seen:
  716. continue
  717. seen.add(m.name)
  718. modules_to_run.append(m)
  719. return modules_to_run
  720. def main():
  721. cmd = parse_command_line()
  722. log = PythonDLogger()
  723. level = os.getenv('NETDATA_LOG_LEVEL') or str()
  724. level = level.lower()
  725. if level == 'debug':
  726. log.logger.severity = 'DEBUG'
  727. elif level == 'info':
  728. log.logger.severity = 'INFO'
  729. elif level == 'warn' or level == 'warning':
  730. log.logger.severity = 'WARNING'
  731. elif level == 'err' or level == 'error':
  732. log.logger.severity = 'ERROR'
  733. if cmd.debug:
  734. log.logger.severity = 'DEBUG'
  735. if cmd.trace:
  736. log.log_traceback = True
  737. log.info('using python v{0}'.format(PY_VERSION[0]))
  738. if DIRS.locks and not cmd.nolock:
  739. registry = FileLockRegistry(DIRS.locks)
  740. else:
  741. registry = DummyRegistry()
  742. unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES])
  743. unknown = set(cmd.modules_to_run) - unique_avail_module_names
  744. if unknown:
  745. log.error('unknown modules : {0}'.format(sorted(list(unknown))))
  746. guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run)
  747. if guessed:
  748. log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
  749. return
  750. p = Plugin(
  751. get_modules_to_run(cmd),
  752. cmd.update_every,
  753. registry,
  754. )
  755. # cheap attempt to reduce chance of python.d job running before go.d
  756. # TODO: better implementation needed
  757. if not IS_ATTY:
  758. time.sleep(1.5)
  759. try:
  760. if not p.setup():
  761. return
  762. p.run()
  763. except KeyboardInterrupt:
  764. pass
  765. log.info('exiting from main...')
  766. if __name__ == "__main__":
  767. main()
  768. disable()