123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- # Description: dovecot netdata python.d module
- # Author: Kyle Agronick (agronick)
- # SPDX-License-Identifier: GPL-3.0+
- # Gearman Netdata Plugin
- from copy import deepcopy
- from bases.FrameworkServices.SocketService import SocketService
- CHARTS = {
- 'total_workers': {
- 'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'],
- 'lines': [
- ['total_pending', 'Pending', 'absolute'],
- ['total_running', 'Running', 'absolute'],
- ]
- },
- }
- def job_chart_template(job_name):
- return {
- 'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'],
- 'lines': [
- ['{0}_pending'.format(job_name), 'Pending', 'absolute'],
- ['{0}_idle'.format(job_name), 'Idle', 'absolute'],
- ['{0}_running'.format(job_name), 'Running', 'absolute'],
- ]
- }
- def build_result_dict(job):
- """
- Get the status for each job
- :return: dict
- """
- total, running, available = job['metrics']
- idle = available - running
- pending = total - running
- return {
- '{0}_pending'.format(job['job_name']): pending,
- '{0}_idle'.format(job['job_name']): idle,
- '{0}_running'.format(job['job_name']): running,
- }
- def parse_worker_data(job):
- job_name = job[0]
- job_metrics = job[1:]
- return {
- 'job_name': job_name,
- 'metrics': job_metrics,
- }
- class GearmanReadException(BaseException):
- pass
- class Service(SocketService):
- def __init__(self, configuration=None, name=None):
- super(Service, self).__init__(configuration=configuration, name=name)
- self.request = "status\n"
- self._keep_alive = True
- self.host = self.configuration.get('host', 'localhost')
- self.port = self.configuration.get('port', 4730)
- self.tls = self.configuration.get('tls', False)
- self.cert = self.configuration.get('cert', None)
- self.key = self.configuration.get('key', None)
- self.active_jobs = set()
- self.definitions = deepcopy(CHARTS)
- self.order = ['total_workers']
- def _get_data(self):
- """
- Format data received from socket
- :return: dict
- """
- try:
- active_jobs = self.get_active_jobs()
- except GearmanReadException:
- return None
- found_jobs, job_data = self.process_jobs(active_jobs)
- self.remove_stale_jobs(found_jobs)
- return job_data
- def get_active_jobs(self):
- active_jobs = []
- for job in self.get_worker_data():
- parsed_job = parse_worker_data(job)
- # Gearman does not clean up old jobs
- # We only care about jobs that have
- # some relevant data
- if not any(parsed_job['metrics']):
- continue
- active_jobs.append(parsed_job)
- return active_jobs
- def get_worker_data(self):
- """
- Split the data returned from Gearman
- into a list of lists
- This returns the same output that you
- would get from a gearadmin --status
- command.
- Example output returned from
- _get_raw_data():
- prefix generic_worker4 78 78 500
- generic_worker2 78 78 500
- generic_worker3 0 0 760
- generic_worker1 0 0 500
- :return: list
- """
- try:
- raw = self._get_raw_data()
- except (ValueError, AttributeError):
- raise GearmanReadException()
- if raw is None:
- self.debug("Gearman returned no data")
- raise GearmanReadException()
- workers = list()
- for line in raw.splitlines()[:-1]:
- parts = line.split()
- if not parts:
- continue
- name = '_'.join(parts[:-3])
- try:
- values = [int(w) for w in parts[-3:]]
- except ValueError:
- continue
- w = [name]
- w.extend(values)
- workers.append(w)
- return workers
- def process_jobs(self, active_jobs):
- output = {
- 'total_pending': 0,
- 'total_idle': 0,
- 'total_running': 0,
- }
- found_jobs = set()
- for parsed_job in active_jobs:
- job_name = self.add_job(parsed_job)
- found_jobs.add(job_name)
- job_data = build_result_dict(parsed_job)
- for sum_value in ('pending', 'running', 'idle'):
- output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)]
- output.update(job_data)
- return found_jobs, output
- def remove_stale_jobs(self, active_job_list):
- """
- Removes jobs that have no workers, pending jobs,
- or running jobs
- :param active_job_list: The latest list of active jobs
- :type active_job_list: iterable
- :return: None
- """
- for to_remove in self.active_jobs - active_job_list:
- self.remove_job(to_remove)
- def add_job(self, parsed_job):
- """
- Adds a job to the list of active jobs
- :param parsed_job: A parsed job dict
- :type parsed_job: dict
- :return: None
- """
- def add_chart(job_name):
- """
- Adds a new job chart
- :param job_name: The name of the job to add
- :type job_name: string
- :return: None
- """
- job_key = 'job_{0}'.format(job_name)
- template = job_chart_template(job_name)
- new_chart = self.charts.add_chart([job_key] + template['options'])
- for dimension in template['lines']:
- new_chart.add_dimension(dimension)
- if parsed_job['job_name'] not in self.active_jobs:
- add_chart(parsed_job['job_name'])
- self.active_jobs.add(parsed_job['job_name'])
- return parsed_job['job_name']
- def remove_job(self, job_name):
- """
- Removes a job to the list of active jobs
- :param job_name: The name of the job to remove
- :type job_name: string
- :return: None
- """
- def remove_chart(job_name):
- """
- Removes a job chart
- :param job_name: The name of the job to remove
- :type job_name: string
- :return: None
- """
- job_key = 'job_{0}'.format(job_name)
- self.charts[job_key].obsolete()
- del self.charts[job_key]
- remove_chart(job_name)
- self.active_jobs.remove(job_name)
|