python.d.plugin.in 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. #!/usr/bin/env bash
  2. '''':;
  3. pybinary=$(which python3 || which python || which python2)
  4. filtered=()
  5. for arg in "$@"
  6. do
  7. case $arg in
  8. -p*) pybinary=${arg:2}
  9. shift 1 ;;
  10. *) filtered+=("$arg") ;;
  11. esac
  12. done
  13. if [ "$pybinary" = "" ]
  14. then
  15. echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
  16. exit 1
  17. fi
  18. exec "$pybinary" "$0" "${filtered[@]}" # '''
  19. # -*- coding: utf-8 -*-
  20. # Description:
  21. # Author: Pawel Krupa (paulfantom)
  22. # Author: Ilya Mashchenko (l2isbad)
  23. # SPDX-License-Identifier: GPL-3.0-or-later
  24. import collections
  25. import copy
  26. import gc
  27. import json
  28. import os
  29. import pprint
  30. import re
  31. import sys
  32. import time
  33. import threading
  34. import types
  35. try:
  36. from queue import Queue
  37. except ImportError:
  38. from Queue import Queue
  39. PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
  40. if PY_VERSION > (3, 1):
  41. from importlib.machinery import SourceFileLoader
  42. else:
  43. from imp import load_source as SourceFileLoader
  44. ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
  45. ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
  46. ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
  47. ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
  48. ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
  49. ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
  50. def add_pythond_packages():
  51. pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
  52. pythond = os.path.abspath(pluginsd + '/../python.d')
  53. packages = os.path.join(pythond, 'python_modules')
  54. sys.path.append(packages)
  55. add_pythond_packages()
  56. from bases.collection import safe_print
  57. from bases.loggers import PythonDLogger
  58. from bases.loaders import load_config
  59. from third_party import filelock
  60. try:
  61. from collections import OrderedDict
  62. except ImportError:
  63. from third_party.ordereddict import OrderedDict
  64. def dirs():
  65. var_lib = os.getenv(
  66. ENV_NETDATA_LIB_DIR,
  67. '@varlibdir_POST@',
  68. )
  69. plugin_user_config = os.getenv(
  70. ENV_NETDATA_USER_CONFIG_DIR,
  71. '@configdir_POST@',
  72. )
  73. plugin_stock_config = os.getenv(
  74. ENV_NETDATA_STOCK_CONFIG_DIR,
  75. '@libconfigdir_POST@',
  76. )
  77. pluginsd = os.getenv(
  78. ENV_NETDATA_PLUGINS_DIR,
  79. os.path.dirname(__file__),
  80. )
  81. locks = os.getenv(
  82. ENV_NETDATA_LOCK_DIR,
  83. os.path.join('@varlibdir_POST@', 'lock')
  84. )
  85. modules_user_config = os.path.join(plugin_user_config, 'python.d')
  86. modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
  87. modules = os.path.abspath(pluginsd + '/../python.d')
  88. Dirs = collections.namedtuple(
  89. 'Dirs',
  90. [
  91. 'plugin_user_config',
  92. 'plugin_stock_config',
  93. 'modules_user_config',
  94. 'modules_stock_config',
  95. 'modules',
  96. 'var_lib',
  97. 'locks',
  98. ]
  99. )
  100. return Dirs(
  101. plugin_user_config,
  102. plugin_stock_config,
  103. modules_user_config,
  104. modules_stock_config,
  105. modules,
  106. var_lib,
  107. locks,
  108. )
  109. DIRS = dirs()
  110. IS_ATTY = sys.stdout.isatty()
  111. MODULE_SUFFIX = '.chart.py'
  112. def available_modules():
  113. obsolete = (
  114. 'apache_cache', # replaced by web_log
  115. 'cpuidle', # rewritten in C
  116. 'cpufreq', # rewritten in C
  117. 'gunicorn_log', # replaced by web_log
  118. 'linux_power_supply', # rewritten in C
  119. 'nginx_log', # replaced by web_log
  120. 'mdstat', # rewritten in C
  121. 'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
  122. 'unbound', # rewritten in Go
  123. )
  124. files = sorted(os.listdir(DIRS.modules))
  125. modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)]
  126. avail = [m for m in modules if m not in obsolete]
  127. return tuple(avail)
  128. AVAILABLE_MODULES = available_modules()
  129. JOB_BASE_CONF = {
  130. 'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
  131. 'priority': 60000,
  132. 'autodetection_retry': 0,
  133. 'chart_cleanup': 10,
  134. 'penalty': True,
  135. 'name': str(),
  136. }
  137. PLUGIN_BASE_CONF = {
  138. 'enabled': True,
  139. 'default_run': True,
  140. 'gc_run': True,
  141. 'gc_interval': 300,
  142. }
  143. def multi_path_find(name, *paths):
  144. for path in paths:
  145. abs_name = os.path.join(path, name)
  146. if os.path.isfile(abs_name):
  147. return abs_name
  148. return str()
  149. def load_module(name):
  150. abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
  151. module = SourceFileLoader('pythond_' + name, abs_path)
  152. if isinstance(module, types.ModuleType):
  153. return module
  154. return module.load_module()
  155. class ModuleConfig:
  156. def __init__(self, name, config=None):
  157. self.name = name
  158. self.config = config or OrderedDict()
  159. def load(self, abs_path):
  160. self.config.update(load_config(abs_path) or dict())
  161. def defaults(self):
  162. keys = (
  163. 'update_every',
  164. 'priority',
  165. 'autodetection_retry',
  166. 'chart_cleanup',
  167. 'penalty',
  168. )
  169. return dict((k, self.config[k]) for k in keys if k in self.config)
  170. def create_job(self, job_name, job_config=None):
  171. job_config = job_config or dict()
  172. config = OrderedDict()
  173. config.update(job_config)
  174. config['job_name'] = job_name
  175. for k, v in self.defaults().items():
  176. config.setdefault(k, v)
  177. return config
  178. def job_names(self):
  179. return [v for v in self.config if isinstance(self.config.get(v), dict)]
  180. def single_job(self):
  181. return [self.create_job(self.name, self.config)]
  182. def multi_job(self):
  183. return [self.create_job(n, self.config[n]) for n in self.job_names()]
  184. def create_jobs(self):
  185. return self.multi_job() or self.single_job()
  186. class JobsConfigsBuilder:
  187. def __init__(self, config_dirs):
  188. self.config_dirs = config_dirs
  189. self.log = PythonDLogger()
  190. self.job_defaults = None
  191. self.module_defaults = None
  192. self.min_update_every = None
  193. def load_module_config(self, module_name):
  194. name = '{0}.conf'.format(module_name)
  195. self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
  196. config = ModuleConfig(module_name)
  197. abs_path = multi_path_find(name, *self.config_dirs)
  198. if not abs_path:
  199. self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
  200. return config
  201. self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
  202. try:
  203. config.load(abs_path)
  204. except Exception as error:
  205. self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
  206. return None
  207. self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
  208. return config
  209. @staticmethod
  210. def apply_defaults(jobs, defaults):
  211. if defaults is None:
  212. return
  213. for k, v in defaults.items():
  214. for job in jobs:
  215. job.setdefault(k, v)
  216. def set_min_update_every(self, jobs, min_update_every):
  217. if min_update_every is None:
  218. return
  219. for job in jobs:
  220. if 'update_every' in job and job['update_every'] < self.min_update_every:
  221. job['update_every'] = self.min_update_every
  222. def build(self, module_name):
  223. config = self.load_module_config(module_name)
  224. if config is None:
  225. return None
  226. configs = config.create_jobs()
  227. self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
  228. self.apply_defaults(configs, self.module_defaults)
  229. self.apply_defaults(configs, self.job_defaults)
  230. self.set_min_update_every(configs, self.min_update_every)
  231. return configs
  232. JOB_STATUS_ACTIVE = 'active'
  233. JOB_STATUS_RECOVERING = 'recovering'
  234. JOB_STATUS_DROPPED = 'dropped'
  235. JOB_STATUS_INIT = 'initial'
  236. class Job(threading.Thread):
  237. inf = -1
  238. def __init__(self, service, module_name, config):
  239. threading.Thread.__init__(self)
  240. self.daemon = True
  241. self.service = service
  242. self.module_name = module_name
  243. self.config = config
  244. self.real_name = config['job_name']
  245. self.actual_name = config['override_name'] or self.real_name
  246. self.autodetection_retry = config['autodetection_retry']
  247. self.checks = self.inf
  248. self.job = None
  249. self.status = JOB_STATUS_INIT
  250. def is_inited(self):
  251. return self.job is not None
  252. def init(self):
  253. self.job = self.service(configuration=copy.deepcopy(self.config))
  254. def full_name(self):
  255. return self.job.name
  256. def check(self):
  257. ok = self.job.check()
  258. self.checks -= self.checks != self.inf and not ok
  259. return ok
  260. def create(self):
  261. self.job.create()
  262. def need_to_recheck(self):
  263. return self.autodetection_retry != 0 and self.checks != 0
  264. def run(self):
  265. self.job.run()
  266. class ModuleSrc:
  267. def __init__(self, name):
  268. self.name = name
  269. self.src = None
  270. def load(self):
  271. self.src = load_module(self.name)
  272. def get(self, key):
  273. return getattr(self.src, key, None)
  274. def service(self):
  275. return self.get('Service')
  276. def defaults(self):
  277. keys = (
  278. 'update_every',
  279. 'priority',
  280. 'autodetection_retry',
  281. 'chart_cleanup',
  282. 'penalty',
  283. )
  284. return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
  285. def is_disabled_by_default(self):
  286. return bool(self.get('disabled_by_default'))
  287. class JobsStatuses:
  288. def __init__(self):
  289. self.items = OrderedDict()
  290. def dump(self):
  291. return json.dumps(self.items, indent=2)
  292. def get(self, module_name, job_name):
  293. if module_name not in self.items:
  294. return None
  295. return self.items[module_name].get(job_name)
  296. def has(self, module_name, job_name):
  297. return self.get(module_name, job_name) is not None
  298. def from_file(self, path):
  299. with open(path) as f:
  300. data = json.load(f)
  301. return self.from_json(data)
  302. @staticmethod
  303. def from_json(items):
  304. if not isinstance(items, dict):
  305. raise Exception('items obj has wrong type : {0}'.format(type(items)))
  306. if not items:
  307. return JobsStatuses()
  308. v = OrderedDict()
  309. for mod_name in sorted(items):
  310. if not items[mod_name]:
  311. continue
  312. v[mod_name] = OrderedDict()
  313. for job_name in sorted(items[mod_name]):
  314. v[mod_name][job_name] = items[mod_name][job_name]
  315. rv = JobsStatuses()
  316. rv.items = v
  317. return rv
  318. @staticmethod
  319. def from_jobs(jobs):
  320. v = OrderedDict()
  321. for job in jobs:
  322. status = job.status
  323. if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
  324. continue
  325. if job.module_name not in v:
  326. v[job.module_name] = OrderedDict()
  327. v[job.module_name][job.real_name] = status
  328. rv = JobsStatuses()
  329. rv.items = v
  330. return rv
  331. class StdoutSaver:
  332. @staticmethod
  333. def save(dump):
  334. print(dump)
  335. class CachedFileSaver:
  336. def __init__(self, path):
  337. self.last_save_success = False
  338. self.last_saved_dump = str()
  339. self.path = path
  340. def save(self, dump):
  341. if self.last_save_success and self.last_saved_dump == dump:
  342. return
  343. try:
  344. with open(self.path, 'w') as out:
  345. out.write(dump)
  346. except Exception:
  347. self.last_save_success = False
  348. raise
  349. self.last_saved_dump = dump
  350. self.last_save_success = True
  351. class PluginConfig(dict):
  352. def __init__(self, *args):
  353. dict.__init__(self, *args)
  354. def is_module_explicitly_enabled(self, module_name):
  355. return self._is_module_enabled(module_name, True)
  356. def is_module_enabled(self, module_name):
  357. return self._is_module_enabled(module_name, False)
  358. def _is_module_enabled(self, module_name, explicit):
  359. if module_name in self:
  360. return self[module_name]
  361. if explicit:
  362. return False
  363. return self['default_run']
  364. class FileLockRegistry:
  365. def __init__(self, path):
  366. self.path = path
  367. self.locks = dict()
  368. def register(self, name):
  369. if name in self.locks:
  370. return
  371. file = os.path.join(self.path, '{0}.collector.lock'.format(name))
  372. lock = filelock.FileLock(file)
  373. lock.acquire(timeout=0)
  374. self.locks[name] = lock
  375. def unregister(self, name):
  376. if name not in self.locks:
  377. return
  378. lock = self.locks[name]
  379. lock.release()
  380. del self.locks[name]
  381. class DummyRegistry:
  382. def register(self, name):
  383. pass
  384. def unregister(self, name):
  385. pass
  386. class Plugin:
  387. config_name = 'python.d.conf'
  388. jobs_status_dump_name = 'pythond-jobs-statuses.json'
  389. def __init__(self, modules_to_run, min_update_every, registry):
  390. self.modules_to_run = modules_to_run
  391. self.min_update_every = min_update_every
  392. self.config = PluginConfig(PLUGIN_BASE_CONF)
  393. self.log = PythonDLogger()
  394. self.registry = registry
  395. self.started_jobs = collections.defaultdict(dict)
  396. self.jobs = list()
  397. self.saver = None
  398. self.runs = 0
  399. def load_config_file(self, filepath, expected):
  400. self.log.debug("looking for '{0}'".format(filepath))
  401. if not os.path.isfile(filepath):
  402. log = self.log.info if not expected else self.log.error
  403. log("'{0}' was not found".format(filepath))
  404. return dict()
  405. try:
  406. config = load_config(filepath)
  407. except Exception as error:
  408. self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
  409. return dict()
  410. self.log.debug("'{0}' is loaded".format(filepath))
  411. return config
  412. def load_config(self):
  413. user_config = self.load_config_file(
  414. filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
  415. expected=False,
  416. )
  417. stock_config = self.load_config_file(
  418. filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
  419. expected=True,
  420. )
  421. self.config.update(stock_config)
  422. self.config.update(user_config)
  423. def load_job_statuses(self):
  424. self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
  425. abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
  426. if not abs_path:
  427. self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
  428. return
  429. self.log.debug("loading '{0}'".format(abs_path))
  430. try:
  431. statuses = JobsStatuses().from_file(abs_path)
  432. except Exception as error:
  433. self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error)))
  434. return None
  435. self.log.debug("'{0}' is loaded".format(abs_path))
  436. return statuses
  437. def create_jobs(self, job_statuses=None):
  438. paths = [
  439. DIRS.modules_user_config,
  440. DIRS.modules_stock_config,
  441. ]
  442. builder = JobsConfigsBuilder(paths)
  443. builder.job_defaults = JOB_BASE_CONF
  444. builder.min_update_every = self.min_update_every
  445. jobs = list()
  446. for mod_name in self.modules_to_run:
  447. if not self.config.is_module_enabled(mod_name):
  448. self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name))
  449. continue
  450. src = ModuleSrc(mod_name)
  451. try:
  452. src.load()
  453. except Exception as error:
  454. self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error)))
  455. continue
  456. if not (src.service() and callable(src.service())):
  457. self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name))
  458. continue
  459. if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name):
  460. self.log.info("[{0}] is disabled by default, skipping it".format(mod_name))
  461. continue
  462. builder.module_defaults = src.defaults()
  463. configs = builder.build(mod_name)
  464. if not configs:
  465. self.log.info("[{0}] has no job configs, skipping it".format(mod_name))
  466. continue
  467. for config in configs:
  468. config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
  469. config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
  470. job = Job(src.service(), mod_name, config)
  471. was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
  472. if was_previously_active and job.autodetection_retry == 0:
  473. self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
  474. job.module_name, job.real_name))
  475. job.checks = 11
  476. job.autodetection_retry = 30
  477. jobs.append(job)
  478. return jobs
  479. def setup(self):
  480. self.load_config()
  481. if not self.config['enabled']:
  482. self.log.info('disabled in the configuration file')
  483. return False
  484. statuses = self.load_job_statuses()
  485. self.jobs = self.create_jobs(statuses)
  486. if not self.jobs:
  487. self.log.info('no jobs to run')
  488. return False
  489. if not IS_ATTY:
  490. abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
  491. self.saver = CachedFileSaver(abs_path)
  492. return True
  493. def start_jobs(self, *jobs):
  494. for job in jobs:
  495. if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
  496. continue
  497. if job.actual_name in self.started_jobs[job.module_name]:
  498. self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
  499. job.module_name, job.real_name))
  500. job.status = JOB_STATUS_DROPPED
  501. continue
  502. if not job.is_inited():
  503. try:
  504. job.init()
  505. except Exception as error:
  506. self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
  507. job.module_name, job.real_name, repr(error)))
  508. job.status = JOB_STATUS_DROPPED
  509. continue
  510. try:
  511. ok = job.check()
  512. except Exception as error:
  513. self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
  514. job.module_name, job.real_name, repr(error)))
  515. job.status = JOB_STATUS_DROPPED
  516. continue
  517. if not ok:
  518. self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
  519. job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
  520. continue
  521. self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
  522. try:
  523. self.registry.register(job.full_name())
  524. except filelock.Timeout as error:
  525. self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
  526. job.module_name, job.real_name, error))
  527. job.status = JOB_STATUS_DROPPED
  528. continue
  529. except Exception as error:
  530. self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
  531. job.module_name, job.real_name, error))
  532. job.status = JOB_STATUS_DROPPED
  533. continue
  534. try:
  535. job.create()
  536. except Exception as error:
  537. self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
  538. job.module_name, job.real_name, repr(error)))
  539. job.status = JOB_STATUS_DROPPED
  540. try:
  541. self.registry.unregister(job.full_name())
  542. except Exception as error:
  543. self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
  544. job.module_name, job.real_name, error))
  545. continue
  546. self.started_jobs[job.module_name] = job.actual_name
  547. job.status = JOB_STATUS_ACTIVE
  548. job.start()
  549. @staticmethod
  550. def keep_alive():
  551. if not IS_ATTY:
  552. safe_print('\n')
  553. def garbage_collection(self):
  554. if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
  555. v = gc.collect()
  556. self.log.debug('GC collection run result: {0}'.format(v))
  557. def restart_recovering_jobs(self):
  558. for job in self.jobs:
  559. if job.status != JOB_STATUS_RECOVERING:
  560. continue
  561. if self.runs % job.autodetection_retry != 0:
  562. continue
  563. self.start_jobs(job)
  564. def cleanup_jobs(self):
  565. self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
  566. def have_alive_jobs(self):
  567. return next(
  568. (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
  569. False,
  570. )
  571. def save_job_statuses(self):
  572. if self.saver is None:
  573. return
  574. if self.runs % 10 != 0:
  575. return
  576. dump = JobsStatuses().from_jobs(self.jobs).dump()
  577. try:
  578. self.saver.save(dump)
  579. except Exception as error:
  580. self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
  581. def serve_once(self):
  582. if not self.have_alive_jobs():
  583. self.log.info('no jobs to serve')
  584. return False
  585. time.sleep(1)
  586. self.runs += 1
  587. self.keep_alive()
  588. self.garbage_collection()
  589. self.cleanup_jobs()
  590. self.restart_recovering_jobs()
  591. self.save_job_statuses()
  592. return True
  593. def serve(self):
  594. while self.serve_once():
  595. pass
  596. def run(self):
  597. self.start_jobs(*self.jobs)
  598. self.serve()
  599. def parse_command_line():
  600. opts = sys.argv[:][1:]
  601. debug = False
  602. trace = False
  603. nolock = False
  604. update_every = 1
  605. modules_to_run = list()
  606. def find_first_positive_int(values):
  607. return next((v for v in values if v.isdigit() and int(v) >= 1), None)
  608. u = find_first_positive_int(opts)
  609. if u is not None:
  610. update_every = int(u)
  611. opts.remove(u)
  612. if 'debug' in opts:
  613. debug = True
  614. opts.remove('debug')
  615. if 'trace' in opts:
  616. trace = True
  617. opts.remove('trace')
  618. if 'nolock' in opts:
  619. nolock = True
  620. opts.remove('nolock')
  621. if opts:
  622. modules_to_run = list(opts)
  623. cmd = collections.namedtuple(
  624. 'CMD',
  625. [
  626. 'update_every',
  627. 'debug',
  628. 'trace',
  629. 'nolock',
  630. 'modules_to_run',
  631. ])
  632. return cmd(
  633. update_every,
  634. debug,
  635. trace,
  636. nolock,
  637. modules_to_run,
  638. )
  639. def guess_module(modules, *names):
  640. def guess(n):
  641. found = None
  642. for i, _ in enumerate(n):
  643. cur = [x for x in modules if x.startswith(name[:i + 1])]
  644. if not cur:
  645. return found
  646. found = cur
  647. return found
  648. guessed = list()
  649. for name in names:
  650. name = name.lower()
  651. m = guess(name)
  652. if m:
  653. guessed.extend(m)
  654. return sorted(set(guessed))
  655. def disable():
  656. if not IS_ATTY:
  657. safe_print('DISABLE')
  658. exit(0)
  659. def main():
  660. cmd = parse_command_line()
  661. log = PythonDLogger()
  662. if cmd.debug:
  663. log.logger.severity = 'DEBUG'
  664. if cmd.trace:
  665. log.log_traceback = True
  666. log.info('using python v{0}'.format(PY_VERSION[0]))
  667. unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
  668. if unknown:
  669. log.error('unknown modules : {0}'.format(sorted(list(unknown))))
  670. guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run)
  671. if guessed:
  672. log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
  673. return
  674. if DIRS.locks and not cmd.nolock:
  675. registry = FileLockRegistry(DIRS.locks)
  676. else:
  677. registry = DummyRegistry()
  678. p = Plugin(
  679. cmd.modules_to_run or AVAILABLE_MODULES,
  680. cmd.update_every,
  681. registry,
  682. )
  683. try:
  684. if not p.setup():
  685. return
  686. p.run()
  687. except KeyboardInterrupt:
  688. pass
  689. log.info('exiting from main...')
  690. if __name__ == "__main__":
  691. main()
  692. disable()