gearman.chart.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # Description: dovecot netdata python.d module
  2. # Author: Kyle Agronick (agronick)
  3. # SPDX-License-Identifier: GPL-3.0+
  4. # Gearman Netdata Plugin
  5. from copy import deepcopy
  6. from bases.FrameworkServices.SocketService import SocketService
  7. CHARTS = {
  8. 'total_workers': {
  9. 'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'],
  10. 'lines': [
  11. ['total_pending', 'Pending', 'absolute'],
  12. ['total_running', 'Running', 'absolute'],
  13. ]
  14. },
  15. }
  16. def job_chart_template(job_name):
  17. return {
  18. 'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'],
  19. 'lines': [
  20. ['{0}_pending'.format(job_name), 'Pending', 'absolute'],
  21. ['{0}_idle'.format(job_name), 'Idle', 'absolute'],
  22. ['{0}_running'.format(job_name), 'Running', 'absolute'],
  23. ]
  24. }
  25. def build_result_dict(job):
  26. """
  27. Get the status for each job
  28. :return: dict
  29. """
  30. total, running, available = job['metrics']
  31. idle = available - running
  32. pending = total - running
  33. return {
  34. '{0}_pending'.format(job['job_name']): pending,
  35. '{0}_idle'.format(job['job_name']): idle,
  36. '{0}_running'.format(job['job_name']): running,
  37. }
  38. def parse_worker_data(job):
  39. job_name = job[0]
  40. job_metrics = job[1:]
  41. return {
  42. 'job_name': job_name,
  43. 'metrics': job_metrics,
  44. }
  45. class GearmanReadException(BaseException):
  46. pass
  47. class Service(SocketService):
  48. def __init__(self, configuration=None, name=None):
  49. super(Service, self).__init__(configuration=configuration, name=name)
  50. self.request = "status\n"
  51. self._keep_alive = True
  52. self.host = self.configuration.get('host', 'localhost')
  53. self.port = self.configuration.get('port', 4730)
  54. self.tls = self.configuration.get('tls', False)
  55. self.cert = self.configuration.get('cert', None)
  56. self.key = self.configuration.get('key', None)
  57. self.active_jobs = set()
  58. self.definitions = deepcopy(CHARTS)
  59. self.order = ['total_workers']
  60. def _get_data(self):
  61. """
  62. Format data received from socket
  63. :return: dict
  64. """
  65. try:
  66. active_jobs = self.get_active_jobs()
  67. except GearmanReadException:
  68. return None
  69. found_jobs, job_data = self.process_jobs(active_jobs)
  70. self.remove_stale_jobs(found_jobs)
  71. return job_data
  72. def get_active_jobs(self):
  73. active_jobs = []
  74. for job in self.get_worker_data():
  75. parsed_job = parse_worker_data(job)
  76. # Gearman does not clean up old jobs
  77. # We only care about jobs that have
  78. # some relevant data
  79. if not any(parsed_job['metrics']):
  80. continue
  81. active_jobs.append(parsed_job)
  82. return active_jobs
  83. def get_worker_data(self):
  84. """
  85. Split the data returned from Gearman
  86. into a list of lists
  87. This returns the same output that you
  88. would get from a gearadmin --status
  89. command.
  90. Example output returned from
  91. _get_raw_data():
  92. prefix generic_worker4 78 78 500
  93. generic_worker2 78 78 500
  94. generic_worker3 0 0 760
  95. generic_worker1 0 0 500
  96. :return: list
  97. """
  98. try:
  99. raw = self._get_raw_data()
  100. except (ValueError, AttributeError):
  101. raise GearmanReadException()
  102. if raw is None:
  103. self.debug("Gearman returned no data")
  104. raise GearmanReadException()
  105. workers = list()
  106. for line in raw.splitlines()[:-1]:
  107. parts = line.split()
  108. if not parts:
  109. continue
  110. name = '_'.join(parts[:-3])
  111. try:
  112. values = [int(w) for w in parts[-3:]]
  113. except ValueError:
  114. continue
  115. w = [name]
  116. w.extend(values)
  117. workers.append(w)
  118. return workers
  119. def process_jobs(self, active_jobs):
  120. output = {
  121. 'total_pending': 0,
  122. 'total_idle': 0,
  123. 'total_running': 0,
  124. }
  125. found_jobs = set()
  126. for parsed_job in active_jobs:
  127. job_name = self.add_job(parsed_job)
  128. found_jobs.add(job_name)
  129. job_data = build_result_dict(parsed_job)
  130. for sum_value in ('pending', 'running', 'idle'):
  131. output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)]
  132. output.update(job_data)
  133. return found_jobs, output
  134. def remove_stale_jobs(self, active_job_list):
  135. """
  136. Removes jobs that have no workers, pending jobs,
  137. or running jobs
  138. :param active_job_list: The latest list of active jobs
  139. :type active_job_list: iterable
  140. :return: None
  141. """
  142. for to_remove in self.active_jobs - active_job_list:
  143. self.remove_job(to_remove)
  144. def add_job(self, parsed_job):
  145. """
  146. Adds a job to the list of active jobs
  147. :param parsed_job: A parsed job dict
  148. :type parsed_job: dict
  149. :return: None
  150. """
  151. def add_chart(job_name):
  152. """
  153. Adds a new job chart
  154. :param job_name: The name of the job to add
  155. :type job_name: string
  156. :return: None
  157. """
  158. job_key = 'job_{0}'.format(job_name)
  159. template = job_chart_template(job_name)
  160. new_chart = self.charts.add_chart([job_key] + template['options'])
  161. for dimension in template['lines']:
  162. new_chart.add_dimension(dimension)
  163. if parsed_job['job_name'] not in self.active_jobs:
  164. add_chart(parsed_job['job_name'])
  165. self.active_jobs.add(parsed_job['job_name'])
  166. return parsed_job['job_name']
  167. def remove_job(self, job_name):
  168. """
  169. Removes a job to the list of active jobs
  170. :param job_name: The name of the job to remove
  171. :type job_name: string
  172. :return: None
  173. """
  174. def remove_chart(job_name):
  175. """
  176. Removes a job chart
  177. :param job_name: The name of the job to remove
  178. :type job_name: string
  179. :return: None
  180. """
  181. job_key = 'job_{0}'.format(job_name)
  182. self.charts[job_key].obsolete()
  183. del self.charts[job_key]
  184. remove_chart(job_name)
  185. self.active_jobs.remove(job_name)