SimpleService.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. # -*- coding: utf-8 -*-
  2. # Description:
  3. # Author: Pawel Krupa (paulfantom)
  4. # Author: Ilya Mashchenko (ilyam8)
  5. # SPDX-License-Identifier: GPL-3.0-or-later
  6. import os
  7. from bases.charts import Charts, ChartError, create_runtime_chart
  8. from bases.collection import safe_print
  9. from bases.loggers import PythonDLogger
  10. from third_party.monotonic import monotonic
  11. from time import sleep, time
  12. RUNTIME_CHART_UPDATE = 'BEGIN netdata.runtime_{job_name} {since_last}\n' \
  13. 'SET run_time = {elapsed}\n' \
  14. 'END\n'
  15. PENALTY_EVERY = 5
  16. MAX_PENALTY = 10 * 60 # 10 minutes
  17. ND_INTERNAL_MONITORING_DISABLED = os.getenv("NETDATA_INTERNALS_MONITORING") == "NO"
  18. class RuntimeCounters:
  19. def __init__(self, configuration):
  20. """
  21. :param configuration: <dict>
  22. """
  23. self.update_every = int(configuration.pop('update_every'))
  24. self.do_penalty = configuration.pop('penalty')
  25. self.start_mono = 0
  26. self.start_real = 0
  27. self.retries = 0
  28. self.penalty = 0
  29. self.elapsed = 0
  30. self.prev_update = 0
  31. self.runs = 1
  32. def calc_next(self):
  33. self.start_mono = monotonic()
  34. return self.start_mono - (self.start_mono % self.update_every) + self.update_every + self.penalty
  35. def sleep_until_next(self):
  36. next_time = self.calc_next()
  37. while self.start_mono < next_time:
  38. sleep(next_time - self.start_mono)
  39. self.start_mono = monotonic()
  40. self.start_real = time()
  41. def handle_retries(self):
  42. self.retries += 1
  43. if self.do_penalty and self.retries % PENALTY_EVERY == 0:
  44. self.penalty = round(min(self.retries * self.update_every / 2, MAX_PENALTY))
  45. def clean_module_name(name):
  46. if name.startswith('pythond_'):
  47. return name[8:]
  48. return name
  49. class SimpleService(PythonDLogger, object):
  50. """
  51. Prototype of Service class.
  52. Implemented basic functionality to run jobs by `python.d.plugin`
  53. """
  54. def __init__(self, configuration, name=''):
  55. """
  56. :param configuration: <dict>
  57. :param name: <str>
  58. """
  59. PythonDLogger.__init__(self)
  60. self.configuration = configuration
  61. self.order = list()
  62. self.definitions = dict()
  63. self.module_name = clean_module_name(self.__module__)
  64. self.job_name = configuration.pop('job_name')
  65. self.actual_job_name = self.job_name or self.module_name
  66. self.override_name = configuration.pop('override_name')
  67. self.fake_name = None
  68. self._runtime_counters = RuntimeCounters(configuration=configuration)
  69. self.charts = Charts(job_name=self.actual_name,
  70. actual_job_name=self.actual_job_name,
  71. priority=configuration.pop('priority'),
  72. cleanup=configuration.pop('chart_cleanup'),
  73. get_update_every=self.get_update_every,
  74. module_name=self.module_name)
  75. def __repr__(self):
  76. return '<{cls_bases}: {name}>'.format(cls_bases=', '.join(c.__name__ for c in self.__class__.__bases__),
  77. name=self.name)
  78. @property
  79. def name(self):
  80. name = self.override_name or self.job_name
  81. if name and name != self.module_name:
  82. return '_'.join([self.module_name, name])
  83. return self.module_name
  84. def actual_name(self):
  85. return self.fake_name or self.name
  86. @property
  87. def runs_counter(self):
  88. return self._runtime_counters.runs
  89. @property
  90. def update_every(self):
  91. return self._runtime_counters.update_every
  92. @update_every.setter
  93. def update_every(self, value):
  94. """
  95. :param value: <int>
  96. :return:
  97. """
  98. self._runtime_counters.update_every = value
  99. def get_update_every(self):
  100. return self.update_every
  101. def check(self):
  102. """
  103. check() prototype
  104. :return: boolean
  105. """
  106. self.debug("job doesn't implement check() method. Using default which simply invokes get_data().")
  107. data = self.get_data()
  108. if data and isinstance(data, dict):
  109. return True
  110. self.debug('returned value is wrong: {0}'.format(data))
  111. return False
  112. @create_runtime_chart
  113. def create(self):
  114. for chart_name in self.order:
  115. chart_config = self.definitions.get(chart_name)
  116. if not chart_config:
  117. self.debug("create() => [NOT ADDED] chart '{chart_name}' not in definitions. "
  118. "Skipping it.".format(chart_name=chart_name))
  119. continue
  120. # create chart
  121. chart_params = [chart_name] + chart_config['options']
  122. try:
  123. self.charts.add_chart(params=chart_params)
  124. except ChartError as error:
  125. self.error("create() => [NOT ADDED] (chart '{chart}': {error})".format(chart=chart_name,
  126. error=error))
  127. continue
  128. # add dimensions to chart
  129. for dimension in chart_config['lines']:
  130. try:
  131. self.charts[chart_name].add_dimension(dimension)
  132. except ChartError as error:
  133. self.error("create() => [NOT ADDED] (dimension '{dimension}': {error})".format(dimension=dimension,
  134. error=error))
  135. continue
  136. # add variables to chart
  137. if 'variables' in chart_config:
  138. for variable in chart_config['variables']:
  139. try:
  140. self.charts[chart_name].add_variable(variable)
  141. except ChartError as error:
  142. self.error("create() => [NOT ADDED] (variable '{var}': {error})".format(var=variable,
  143. error=error))
  144. continue
  145. del self.order
  146. del self.definitions
  147. # True if job has at least 1 chart else False
  148. return bool(self.charts)
  149. def run(self):
  150. """
  151. Runs job in thread. Handles retries.
  152. Exits when job failed or timed out.
  153. :return: None
  154. """
  155. job = self._runtime_counters
  156. self.debug('started, update frequency: {freq}'.format(freq=job.update_every))
  157. while True:
  158. job.sleep_until_next()
  159. since = 0
  160. if job.prev_update:
  161. since = int((job.start_real - job.prev_update) * 1e6)
  162. try:
  163. updated = self.update(interval=since)
  164. except Exception as error:
  165. self.error('update() unhandled exception: {error}'.format(error=error))
  166. updated = False
  167. job.runs += 1
  168. if not updated:
  169. job.handle_retries()
  170. else:
  171. job.elapsed = int((monotonic() - job.start_mono) * 1e3)
  172. job.prev_update = job.start_real
  173. job.retries, job.penalty = 0, 0
  174. if not ND_INTERNAL_MONITORING_DISABLED:
  175. safe_print(RUNTIME_CHART_UPDATE.format(job_name=self.name,
  176. since_last=since,
  177. elapsed=job.elapsed))
  178. self.debug('update => [{status}] (elapsed time: {elapsed}, failed retries in a row: {retries})'.format(
  179. status='OK' if updated else 'FAILED',
  180. elapsed=job.elapsed if updated else '-',
  181. retries=job.retries))
  182. def update(self, interval):
  183. """
  184. :return:
  185. """
  186. data = self.get_data()
  187. if not data:
  188. self.debug('get_data() returned no data')
  189. return False
  190. elif not isinstance(data, dict):
  191. self.debug('get_data() returned incorrect type data')
  192. return False
  193. updated = False
  194. for chart in self.charts:
  195. if chart.flags.obsoleted:
  196. if chart.can_be_updated(data):
  197. chart.refresh()
  198. else:
  199. continue
  200. elif self.charts.cleanup and chart.penalty >= self.charts.cleanup:
  201. chart.obsolete()
  202. self.info("chart '{0}' was suppressed due to non updating".format(chart.name))
  203. continue
  204. ok = chart.update(data, interval)
  205. if ok:
  206. updated = True
  207. if not updated:
  208. self.debug('none of the charts has been updated')
  209. return updated
  210. def get_data(self):
  211. return self._get_data()
  212. def _get_data(self):
  213. raise NotImplementedError