python.d.plugin.in 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. #!/usr/bin/env bash
  2. '''':;
  3. pybinary=$(which python3 || which python || which python2)
  4. filtered=()
  5. for arg in "$@"
  6. do
  7. case $arg in
  8. -p*) pybinary=${arg:2}
  9. shift 1 ;;
  10. *) filtered+=("$arg") ;;
  11. esac
  12. done
  13. if [ "$pybinary" = "" ]
  14. then
  15. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
  16. exit 1
  17. fi
  18. exec "$pybinary" "$0" "${filtered[@]}" # '''
  19. # -*- coding: utf-8 -*-
  20. # Description:
  21. # Author: Pawel Krupa (paulfantom)
  22. # Author: Ilya Mashchenko (l2isbad)
  23. # SPDX-License-Identifier: GPL-3.0-or-later
  24. import collections
  25. import copy
  26. import gc
  27. import json
  28. import os
  29. import pprint
  30. import re
  31. import sys
  32. import threading
  33. import time
  34. import types
  35. try:
  36. from queue import Queue
  37. except ImportError:
  38. from Queue import Queue
  39. PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
  40. if PY_VERSION > (3, 1):
  41. from importlib.machinery import SourceFileLoader
  42. else:
  43. from imp import load_source as SourceFileLoader
  44. ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
  45. ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
  46. ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
  47. ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS'
  48. ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
  49. ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
  50. ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
  51. def add_pythond_packages():
  52. pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
  53. pythond = os.path.abspath(pluginsd + '/../python.d')
  54. packages = os.path.join(pythond, 'python_modules')
  55. sys.path.append(packages)
  56. add_pythond_packages()
  57. from bases.collection import safe_print
  58. from bases.loggers import PythonDLogger
  59. from bases.loaders import load_config
  60. from third_party import filelock
  61. try:
  62. from collections import OrderedDict
  63. except ImportError:
  64. from third_party.ordereddict import OrderedDict
  65. def dirs():
  66. var_lib = os.getenv(
  67. ENV_NETDATA_LIB_DIR,
  68. '@varlibdir_POST@',
  69. )
  70. plugin_user_config = os.getenv(
  71. ENV_NETDATA_USER_CONFIG_DIR,
  72. '@configdir_POST@',
  73. )
  74. plugin_stock_config = os.getenv(
  75. ENV_NETDATA_STOCK_CONFIG_DIR,
  76. '@libconfigdir_POST@',
  77. )
  78. pluginsd = os.getenv(
  79. ENV_NETDATA_PLUGINS_DIR,
  80. os.path.dirname(__file__),
  81. )
  82. locks = os.getenv(
  83. ENV_NETDATA_LOCK_DIR,
  84. os.path.join('@varlibdir_POST@', 'lock')
  85. )
  86. modules_user_config = os.path.join(plugin_user_config, 'python.d')
  87. modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
  88. modules = os.path.abspath(pluginsd + '/../python.d')
  89. user_modules = [os.path.join(p, 'python.d') for p in
  90. os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if
  91. p]
  92. Dirs = collections.namedtuple(
  93. 'Dirs',
  94. [
  95. 'plugin_user_config',
  96. 'plugin_stock_config',
  97. 'modules_user_config',
  98. 'modules_stock_config',
  99. 'modules',
  100. 'user_modules',
  101. 'var_lib',
  102. 'locks',
  103. ]
  104. )
  105. return Dirs(
  106. plugin_user_config,
  107. plugin_stock_config,
  108. modules_user_config,
  109. modules_stock_config,
  110. modules,
  111. user_modules,
  112. var_lib,
  113. locks,
  114. )
  115. DIRS = dirs()
  116. IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty()
  117. MODULE_SUFFIX = '.chart.py'
  118. def find_available_modules(*directories):
  119. AvailableModule = collections.namedtuple(
  120. 'AvailableModule',
  121. [
  122. 'filepath',
  123. 'name',
  124. ]
  125. )
  126. available = list()
  127. for d in directories:
  128. try:
  129. if not os.path.isdir(d):
  130. continue
  131. files = sorted(os.listdir(d))
  132. except OSError:
  133. continue
  134. modules = [m for m in files if m.endswith(MODULE_SUFFIX)]
  135. available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules])
  136. return available
  137. def available_modules():
  138. obsolete = (
  139. 'apache_cache', # replaced by web_log
  140. 'cpuidle', # rewritten in C
  141. 'cpufreq', # rewritten in C
  142. 'gunicorn_log', # replaced by web_log
  143. 'linux_power_supply', # rewritten in C
  144. 'nginx_log', # replaced by web_log
  145. 'mdstat', # rewritten in C
  146. 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
  147. 'unbound', # rewritten in Go
  148. )
  149. stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete]
  150. user = find_available_modules(*DIRS.user_modules)
  151. available, seen = list(), set()
  152. for m in user + stock:
  153. if m.name in seen:
  154. continue
  155. seen.add(m.name)
  156. available.append(m)
  157. return available
  158. AVAILABLE_MODULES = available_modules()
  159. JOB_BASE_CONF = {
  160. 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
  161. 'priority': 60000,
  162. 'autodetection_retry': 0,
  163. 'chart_cleanup': 10,
  164. 'penalty': True,
  165. 'name': str(),
  166. }
  167. PLUGIN_BASE_CONF = {
  168. 'enabled': True,
  169. 'default_run': True,
  170. 'gc_run': True,
  171. 'gc_interval': 300,
  172. }
  173. def multi_path_find(name, *paths):
  174. for path in paths:
  175. abs_name = os.path.join(path, name)
  176. if os.path.isfile(abs_name):
  177. return abs_name
  178. return str()
  179. def load_module(name, filepath):
  180. module = SourceFileLoader('pythond_' + name, filepath)
  181. if isinstance(module, types.ModuleType):
  182. return module
  183. return module.load_module()
  184. class ModuleConfig:
  185. def __init__(self, name, config=None):
  186. self.name = name
  187. self.config = config or OrderedDict()
  188. def load(self, abs_path):
  189. self.config.update(load_config(abs_path) or dict())
  190. def defaults(self):
  191. keys = (
  192. 'update_every',
  193. 'priority',
  194. 'autodetection_retry',
  195. 'chart_cleanup',
  196. 'penalty',
  197. )
  198. return dict((k, self.config[k]) for k in keys if k in self.config)
  199. def create_job(self, job_name, job_config=None):
  200. job_config = job_config or dict()
  201. config = OrderedDict()
  202. config.update(job_config)
  203. config['job_name'] = job_name
  204. for k, v in self.defaults().items():
  205. config.setdefault(k, v)
  206. return config
  207. def job_names(self):
  208. return [v for v in self.config if isinstance(self.config.get(v), dict)]
  209. def single_job(self):
  210. return [self.create_job(self.name, self.config)]
  211. def multi_job(self):
  212. return [self.create_job(n, self.config[n]) for n in self.job_names()]
  213. def create_jobs(self):
  214. return self.multi_job() or self.single_job()
  215. class JobsConfigsBuilder:
  216. def __init__(self, config_dirs):
  217. self.config_dirs = config_dirs
  218. self.log = PythonDLogger()
  219. self.job_defaults = None
  220. self.module_defaults = None
  221. self.min_update_every = None
  222. def load_module_config(self, module_name):
  223. name = '{0}.conf'.format(module_name)
  224. self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
  225. config = ModuleConfig(module_name)
  226. abs_path = multi_path_find(name, *self.config_dirs)
  227. if not abs_path:
  228. self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
  229. return config
  230. self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
  231. try:
  232. config.load(abs_path)
  233. except Exception as error:
  234. self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
  235. return None
  236. self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
  237. return config
  238. @staticmethod
  239. def apply_defaults(jobs, defaults):
  240. if defaults is None:
  241. return
  242. for k, v in defaults.items():
  243. for job in jobs:
  244. job.setdefault(k, v)
  245. def set_min_update_every(self, jobs, min_update_every):
  246. if min_update_every is None:
  247. return
  248. for job in jobs:
  249. if 'update_every' in job and job['update_every'] < self.min_update_every:
  250. job['update_every'] = self.min_update_every
  251. def build(self, module_name):
  252. config = self.load_module_config(module_name)
  253. if config is None:
  254. return None
  255. configs = config.create_jobs()
  256. self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
  257. self.apply_defaults(configs, self.module_defaults)
  258. self.apply_defaults(configs, self.job_defaults)
  259. self.set_min_update_every(configs, self.min_update_every)
  260. return configs
  261. JOB_STATUS_ACTIVE = 'active'
  262. JOB_STATUS_RECOVERING = 'recovering'
  263. JOB_STATUS_DROPPED = 'dropped'
  264. JOB_STATUS_INIT = 'initial'
  265. class Job(threading.Thread):
  266. inf = -1
  267. def __init__(self, service, module_name, config):
  268. threading.Thread.__init__(self)
  269. self.daemon = True
  270. self.service = service
  271. self.module_name = module_name
  272. self.config = config
  273. self.real_name = config['job_name']
  274. self.actual_name = config['override_name'] or self.real_name
  275. self.autodetection_retry = config['autodetection_retry']
  276. self.checks = self.inf
  277. self.job = None
  278. self.status = JOB_STATUS_INIT
  279. def is_inited(self):
  280. return self.job is not None
  281. def init(self):
  282. self.job = self.service(configuration=copy.deepcopy(self.config))
  283. def full_name(self):
  284. return self.job.name
  285. def check(self):
  286. ok = self.job.check()
  287. self.checks -= self.checks != self.inf and not ok
  288. return ok
  289. def create(self):
  290. self.job.create()
  291. def need_to_recheck(self):
  292. return self.autodetection_retry != 0 and self.checks != 0
  293. def run(self):
  294. self.job.run()
  295. class ModuleSrc:
  296. def __init__(self, m):
  297. self.name = m.name
  298. self.filepath = m.filepath
  299. self.src = None
  300. def load(self):
  301. self.src = load_module(self.name, self.filepath)
  302. def get(self, key):
  303. return getattr(self.src, key, None)
  304. def service(self):
  305. return self.get('Service')
  306. def defaults(self):
  307. keys = (
  308. 'update_every',
  309. 'priority',
  310. 'autodetection_retry',
  311. 'chart_cleanup',
  312. 'penalty',
  313. )
  314. return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
  315. def is_disabled_by_default(self):
  316. return bool(self.get('disabled_by_default'))
  317. class JobsStatuses:
  318. def __init__(self):
  319. self.items = OrderedDict()
  320. def dump(self):
  321. return json.dumps(self.items, indent=2)
  322. def get(self, module_name, job_name):
  323. if module_name not in self.items:
  324. return None
  325. return self.items[module_name].get(job_name)
  326. def has(self, module_name, job_name):
  327. return self.get(module_name, job_name) is not None
  328. def from_file(self, path):
  329. with open(path) as f:
  330. data = json.load(f)
  331. return self.from_json(data)
  332. @staticmethod
  333. def from_json(items):
  334. if not isinstance(items, dict):
  335. raise Exception('items obj has wrong type : {0}'.format(type(items)))
  336. if not items:
  337. return JobsStatuses()
  338. v = OrderedDict()
  339. for mod_name in sorted(items):
  340. if not items[mod_name]:
  341. continue
  342. v[mod_name] = OrderedDict()
  343. for job_name in sorted(items[mod_name]):
  344. v[mod_name][job_name] = items[mod_name][job_name]
  345. rv = JobsStatuses()
  346. rv.items = v
  347. return rv
  348. @staticmethod
  349. def from_jobs(jobs):
  350. v = OrderedDict()
  351. for job in jobs:
  352. status = job.status
  353. if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
  354. continue
  355. if job.module_name not in v:
  356. v[job.module_name] = OrderedDict()
  357. v[job.module_name][job.real_name] = status
  358. rv = JobsStatuses()
  359. rv.items = v
  360. return rv
  361. class StdoutSaver:
  362. @staticmethod
  363. def save(dump):
  364. print(dump)
  365. class CachedFileSaver:
  366. def __init__(self, path):
  367. self.last_save_success = False
  368. self.last_saved_dump = str()
  369. self.path = path
  370. def save(self, dump):
  371. if self.last_save_success and self.last_saved_dump == dump:
  372. return
  373. try:
  374. with open(self.path, 'w') as out:
  375. out.write(dump)
  376. except Exception:
  377. self.last_save_success = False
  378. raise
  379. self.last_saved_dump = dump
  380. self.last_save_success = True
  381. class PluginConfig(dict):
  382. def __init__(self, *args):
  383. dict.__init__(self, *args)
  384. def is_module_explicitly_enabled(self, module_name):
  385. return self._is_module_enabled(module_name, True)
  386. def is_module_enabled(self, module_name):
  387. return self._is_module_enabled(module_name, False)
  388. def _is_module_enabled(self, module_name, explicit):
  389. if module_name in self:
  390. return self[module_name]
  391. if explicit:
  392. return False
  393. return self['default_run']
  394. class FileLockRegistry:
  395. def __init__(self, path):
  396. self.path = path
  397. self.locks = dict()
  398. @staticmethod
  399. def rename(name):
  400. # go version name is 'docker'
  401. if name.startswith("dockerd"):
  402. name = "docker" + name[7:]
  403. return name
  404. def register(self, name):
  405. name = self.rename(name)
  406. if name in self.locks:
  407. return
  408. file = os.path.join(self.path, '{0}.collector.lock'.format(name))
  409. lock = filelock.FileLock(file)
  410. lock.acquire(timeout=0)
  411. self.locks[name] = lock
  412. def unregister(self, name):
  413. name = self.rename(name)
  414. if name not in self.locks:
  415. return
  416. lock = self.locks[name]
  417. lock.release()
  418. del self.locks[name]
  419. class DummyRegistry:
  420. def register(self, name):
  421. pass
  422. def unregister(self, name):
  423. pass
  424. class Plugin:
  425. config_name = 'python.d.conf'
  426. jobs_status_dump_name = 'pythond-jobs-statuses.json'
  427. def __init__(self, modules_to_run, min_update_every, registry):
  428. self.modules_to_run = modules_to_run
  429. self.min_update_every = min_update_every
  430. self.config = PluginConfig(PLUGIN_BASE_CONF)
  431. self.log = PythonDLogger()
  432. self.registry = registry
  433. self.started_jobs = collections.defaultdict(dict)
  434. self.jobs = list()
  435. self.saver = None
  436. self.runs = 0
  437. def load_config_file(self, filepath, expected):
  438. self.log.debug("looking for '{0}'".format(filepath))
  439. if not os.path.isfile(filepath):
  440. log = self.log.info if not expected else self.log.error
  441. log("'{0}' was not found".format(filepath))
  442. return dict()
  443. try:
  444. config = load_config(filepath)
  445. except Exception as error:
  446. self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
  447. return dict()
  448. self.log.debug("'{0}' is loaded".format(filepath))
  449. return config
  450. def load_config(self):
  451. user_config = self.load_config_file(
  452. filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
  453. expected=False,
  454. )
  455. stock_config = self.load_config_file(
  456. filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
  457. expected=True,
  458. )
  459. self.config.update(stock_config)
  460. self.config.update(user_config)
  461. def load_job_statuses(self):
  462. self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
  463. abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
  464. if not abs_path:
  465. self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
  466. return
  467. self.log.debug("loading '{0}'".format(abs_path))
  468. try:
  469. statuses = JobsStatuses().from_file(abs_path)
  470. except Exception as error:
  471. self.log.error("[{0}] config file invalid YAML format: {1}".format(
  472. module_name, ' '.join([v.strip() for v in str(error).split('\n')])))
  473. return None
  474. self.log.debug("'{0}' is loaded".format(abs_path))
  475. return statuses
  476. def create_jobs(self, job_statuses=None):
  477. paths = [
  478. DIRS.modules_user_config,
  479. DIRS.modules_stock_config,
  480. ]
  481. builder = JobsConfigsBuilder(paths)
  482. builder.job_defaults = JOB_BASE_CONF
  483. builder.min_update_every = self.min_update_every
  484. jobs = list()
  485. for m in self.modules_to_run:
  486. if not self.config.is_module_enabled(m.name):
  487. self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name))
  488. continue
  489. src = ModuleSrc(m)
  490. try:
  491. src.load()
  492. except Exception as error:
  493. self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error)))
  494. continue
  495. self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath))
  496. if not (src.service() and callable(src.service())):
  497. self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name))
  498. continue
  499. if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name):
  500. self.log.info("[{0}] is disabled by default, skipping it".format(m.name))
  501. continue
  502. builder.module_defaults = src.defaults()
  503. configs = builder.build(m.name)
  504. if not configs:
  505. self.log.info("[{0}] has no job configs, skipping it".format(m.name))
  506. continue
  507. for config in configs:
  508. config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
  509. config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
  510. job = Job(src.service(), m.name, config)
  511. was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
  512. if was_previously_active and job.autodetection_retry == 0:
  513. self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
  514. job.module_name, job.real_name))
  515. job.checks = 11
  516. job.autodetection_retry = 30
  517. jobs.append(job)
  518. return jobs
  519. def setup(self):
  520. self.load_config()
  521. if not self.config['enabled']:
  522. self.log.info('disabled in the configuration file')
  523. return False
  524. statuses = self.load_job_statuses()
  525. self.jobs = self.create_jobs(statuses)
  526. if not self.jobs:
  527. self.log.info('no jobs to run')
  528. return False
  529. if not IS_ATTY:
  530. abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
  531. self.saver = CachedFileSaver(abs_path)
  532. return True
  533. def start_jobs(self, *jobs):
  534. for job in jobs:
  535. if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
  536. continue
  537. if job.actual_name in self.started_jobs[job.module_name]:
  538. self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
  539. job.module_name, job.real_name))
  540. job.status = JOB_STATUS_DROPPED
  541. continue
  542. if not job.is_inited():
  543. try:
  544. job.init()
  545. except Exception as error:
  546. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
  547. job.module_name, job.real_name, repr(error)))
  548. job.status = JOB_STATUS_DROPPED
  549. continue
  550. try:
  551. ok = job.check()
  552. except Exception as error:
  553. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  554. job.module_name, job.real_name, repr(error)))
  555. job.status = JOB_STATUS_DROPPED
  556. continue
  557. if not ok:
  558. self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
  559. job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
  560. continue
  561. self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
  562. try:
  563. self.registry.register(job.full_name())
  564. except filelock.Timeout as error:
  565. self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
  566. job.module_name, job.real_name, error))
  567. job.status = JOB_STATUS_DROPPED
  568. continue
  569. except Exception as error:
  570. self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
  571. job.module_name, job.real_name, error))
  572. job.status = JOB_STATUS_DROPPED
  573. continue
  574. try:
  575. job.create()
  576. except Exception as error:
  577. self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
  578. job.module_name, job.real_name, repr(error)))
  579. job.status = JOB_STATUS_DROPPED
  580. try:
  581. self.registry.unregister(job.full_name())
  582. except Exception as error:
  583. self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
  584. job.module_name, job.real_name, error))
  585. continue
  586. self.started_jobs[job.module_name] = job.actual_name
  587. job.status = JOB_STATUS_ACTIVE
  588. job.start()
  589. @staticmethod
  590. def keep_alive():
  591. if not IS_ATTY:
  592. safe_print('\n')
  593. def garbage_collection(self):
  594. if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
  595. v = gc.collect()
  596. self.log.debug('GC collection run result: {0}'.format(v))
  597. def restart_recovering_jobs(self):
  598. for job in self.jobs:
  599. if job.status != JOB_STATUS_RECOVERING:
  600. continue
  601. if self.runs % job.autodetection_retry != 0:
  602. continue
  603. self.start_jobs(job)
  604. def cleanup_jobs(self):
  605. self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
  606. def have_alive_jobs(self):
  607. return next(
  608. (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
  609. False,
  610. )
  611. def save_job_statuses(self):
  612. if self.saver is None:
  613. return
  614. if self.runs % 10 != 0:
  615. return
  616. dump = JobsStatuses().from_jobs(self.jobs).dump()
  617. try:
  618. self.saver.save(dump)
  619. except Exception as error:
  620. self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
  621. def serve_once(self):
  622. if not self.have_alive_jobs():
  623. self.log.info('no jobs to serve')
  624. return False
  625. time.sleep(1)
  626. self.runs += 1
  627. self.keep_alive()
  628. self.garbage_collection()
  629. self.cleanup_jobs()
  630. self.restart_recovering_jobs()
  631. self.save_job_statuses()
  632. return True
  633. def serve(self):
  634. while self.serve_once():
  635. pass
  636. def run(self):
  637. self.start_jobs(*self.jobs)
  638. self.serve()
  639. def parse_command_line():
  640. opts = sys.argv[:][1:]
  641. debug = False
  642. trace = False
  643. nolock = False
  644. update_every = 1
  645. modules_to_run = list()
  646. def find_first_positive_int(values):
  647. return next((v for v in values if v.isdigit() and int(v) >= 1), None)
  648. u = find_first_positive_int(opts)
  649. if u is not None:
  650. update_every = int(u)
  651. opts.remove(u)
  652. if 'debug' in opts:
  653. debug = True
  654. opts.remove('debug')
  655. if 'trace' in opts:
  656. trace = True
  657. opts.remove('trace')
  658. if 'nolock' in opts:
  659. nolock = True
  660. opts.remove('nolock')
  661. if opts:
  662. modules_to_run = list(opts)
  663. cmd = collections.namedtuple(
  664. 'CMD',
  665. [
  666. 'update_every',
  667. 'debug',
  668. 'trace',
  669. 'nolock',
  670. 'modules_to_run',
  671. ])
  672. return cmd(
  673. update_every,
  674. debug,
  675. trace,
  676. nolock,
  677. modules_to_run,
  678. )
  679. def guess_module(modules, *names):
  680. def guess(n):
  681. found = None
  682. for i, _ in enumerate(n):
  683. cur = [x for x in modules if x.startswith(name[:i + 1])]
  684. if not cur:
  685. return found
  686. found = cur
  687. return found
  688. guessed = list()
  689. for name in names:
  690. name = name.lower()
  691. m = guess(name)
  692. if m:
  693. guessed.extend(m)
  694. return sorted(set(guessed))
  695. def disable():
  696. if not IS_ATTY:
  697. safe_print('DISABLE')
  698. exit(0)
  699. def get_modules_to_run(cmd):
  700. if not cmd.modules_to_run:
  701. return AVAILABLE_MODULES
  702. modules_to_run, seen = list(), set()
  703. for m in AVAILABLE_MODULES:
  704. if m.name not in cmd.modules_to_run or m.name in seen:
  705. continue
  706. seen.add(m.name)
  707. modules_to_run.append(m)
  708. return modules_to_run
  709. def main():
  710. cmd = parse_command_line()
  711. log = PythonDLogger()
  712. if cmd.debug:
  713. log.logger.severity = 'DEBUG'
  714. if cmd.trace:
  715. log.log_traceback = True
  716. log.info('using python v{0}'.format(PY_VERSION[0]))
  717. if DIRS.locks and not cmd.nolock:
  718. registry = FileLockRegistry(DIRS.locks)
  719. else:
  720. registry = DummyRegistry()
  721. unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES])
  722. unknown = set(cmd.modules_to_run) - unique_avail_module_names
  723. if unknown:
  724. log.error('unknown modules : {0}'.format(sorted(list(unknown))))
  725. guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run)
  726. if guessed:
  727. log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
  728. return
  729. p = Plugin(
  730. get_modules_to_run(cmd),
  731. cmd.update_every,
  732. registry,
  733. )
  734. # cheap attempt to reduce chance of python.d job running before go.d
  735. # TODO: better implementation needed
  736. if not IS_ATTY:
  737. time.sleep(1.5)
  738. try:
  739. if not p.setup():
  740. return
  741. p.run()
  742. except KeyboardInterrupt:
  743. pass
  744. log.info('exiting from main...')
  745. if __name__ == "__main__":
  746. main()
  747. disable()