rabbitmq.chart.py 6.4 KB


  1. # -*- coding: utf-8 -*-
  2. # Description: rabbitmq netdata python.d module
  3. # Author: l2isbad
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. from collections import namedtuple
  6. from json import loads
  7. from socket import gethostbyname, gaierror
  8. from threading import Thread
  9. try:
  10. from queue import Queue
  11. except ImportError:
  12. from Queue import Queue
  13. from bases.FrameworkServices.UrlService import UrlService
  14. # default module values (can be overridden per job in `config`)
  15. update_every = 1
  16. priority = 60000
  17. retries = 60
  18. METHODS = namedtuple('METHODS', ['get_data', 'url', 'stats'])
  19. NODE_STATS = [
  20. 'fd_used',
  21. 'mem_used',
  22. 'sockets_used',
  23. 'proc_used',
  24. 'disk_free',
  25. 'run_queue'
  26. ]
  27. OVERVIEW_STATS = [
  28. 'object_totals.channels',
  29. 'object_totals.consumers',
  30. 'object_totals.connections',
  31. 'object_totals.queues',
  32. 'object_totals.exchanges',
  33. 'queue_totals.messages_ready',
  34. 'queue_totals.messages_unacknowledged',
  35. 'message_stats.ack',
  36. 'message_stats.redeliver',
  37. 'message_stats.deliver',
  38. 'message_stats.publish'
  39. ]
  40. ORDER = [
  41. 'queued_messages',
  42. 'message_rates',
  43. 'global_counts',
  44. 'file_descriptors',
  45. 'socket_descriptors',
  46. 'erlang_processes',
  47. 'erlang_run_queue',
  48. 'memory',
  49. 'disk_space'
  50. ]
  51. CHARTS = {
  52. 'file_descriptors': {
  53. 'options': [None, 'File Descriptors', 'descriptors', 'overview', 'rabbitmq.file_descriptors', 'line'],
  54. 'lines': [
  55. ['fd_used', 'used', 'absolute']
  56. ]
  57. },
  58. 'memory': {
  59. 'options': [None, 'Memory', 'MB', 'overview', 'rabbitmq.memory', 'line'],
  60. 'lines': [
  61. ['mem_used', 'used', 'absolute', 1, 1024 << 10]
  62. ]
  63. },
  64. 'disk_space': {
  65. 'options': [None, 'Disk Space', 'GB', 'overview', 'rabbitmq.disk_space', 'line'],
  66. 'lines': [
  67. ['disk_free', 'free', 'absolute', 1, 1024 ** 3]
  68. ]
  69. },
  70. 'socket_descriptors': {
  71. 'options': [None, 'Socket Descriptors', 'descriptors', 'overview', 'rabbitmq.sockets', 'line'],
  72. 'lines': [
  73. ['sockets_used', 'used', 'absolute']
  74. ]
  75. },
  76. 'erlang_processes': {
  77. 'options': [None, 'Erlang Processes', 'processes', 'overview', 'rabbitmq.processes', 'line'],
  78. 'lines': [
  79. ['proc_used', 'used', 'absolute']
  80. ]
  81. },
  82. 'erlang_run_queue': {
  83. 'options': [None, 'Erlang Run Queue', 'processes', 'overview', 'rabbitmq.erlang_run_queue', 'line'],
  84. 'lines': [
  85. ['run_queue', 'length', 'absolute']
  86. ]
  87. },
  88. 'global_counts': {
  89. 'options': [None, 'Global Counts', 'counts', 'overview', 'rabbitmq.global_counts', 'line'],
  90. 'lines': [
  91. ['object_totals_channels', 'channels', 'absolute'],
  92. ['object_totals_consumers', 'consumers', 'absolute'],
  93. ['object_totals_connections', 'connections', 'absolute'],
  94. ['object_totals_queues', 'queues', 'absolute'],
  95. ['object_totals_exchanges', 'exchanges', 'absolute']
  96. ]
  97. },
  98. 'queued_messages': {
  99. 'options': [None, 'Queued Messages', 'messages', 'overview', 'rabbitmq.queued_messages', 'stacked'],
  100. 'lines': [
  101. ['queue_totals_messages_ready', 'ready', 'absolute'],
  102. ['queue_totals_messages_unacknowledged', 'unacknowledged', 'absolute']
  103. ]
  104. },
  105. 'message_rates': {
  106. 'options': [None, 'Message Rates', 'messages/s', 'overview', 'rabbitmq.message_rates', 'stacked'],
  107. 'lines': [
  108. ['message_stats_ack', 'ack', 'incremental'],
  109. ['message_stats_redeliver', 'redeliver', 'incremental'],
  110. ['message_stats_deliver', 'deliver', 'incremental'],
  111. ['message_stats_publish', 'publish', 'incremental']
  112. ]
  113. }
  114. }
  115. class Service(UrlService):
  116. def __init__(self, configuration=None, name=None):
  117. UrlService.__init__(self, configuration=configuration, name=name)
  118. self.order = ORDER
  119. self.definitions = CHARTS
  120. self.host = self.configuration.get('host', '127.0.0.1')
  121. self.port = self.configuration.get('port', 15672)
  122. self.scheme = self.configuration.get('scheme', 'http')
  123. def check(self):
  124. # We can't start if <host> AND <port> not specified
  125. if not (self.host and self.port):
  126. self.error('Host is not defined in the module configuration file')
  127. return False
  128. # Hostname -> ip address
  129. try:
  130. self.host = gethostbyname(self.host)
  131. except gaierror as error:
  132. self.error(str(error))
  133. return False
  134. # Add handlers (auth, self signed cert accept)
  135. self.url = '{scheme}://{host}:{port}/api'.format(scheme=self.scheme,
  136. host=self.host,
  137. port=self.port)
  138. # Add methods
  139. api_node = self.url + '/nodes'
  140. api_overview = self.url + '/overview'
  141. self.methods = [METHODS(get_data=self._get_overview_stats,
  142. url=api_node,
  143. stats=NODE_STATS),
  144. METHODS(get_data=self._get_overview_stats,
  145. url=api_overview,
  146. stats=OVERVIEW_STATS)]
  147. return UrlService.check(self)
  148. def _get_data(self):
  149. threads = list()
  150. queue = Queue()
  151. result = dict()
  152. for method in self.methods:
  153. th = Thread(target=method.get_data,
  154. args=(queue, method.url, method.stats))
  155. th.start()
  156. threads.append(th)
  157. for thread in threads:
  158. thread.join()
  159. result.update(queue.get())
  160. return result or None
  161. def _get_overview_stats(self, queue, url, stats):
  162. """
  163. Format data received from http request
  164. :return: dict
  165. """
  166. raw_data = self._get_raw_data(url)
  167. if not raw_data:
  168. return queue.put(dict())
  169. data = loads(raw_data)
  170. data = data[0] if isinstance(data, list) else data
  171. to_netdata = fetch_data(raw_data=data, metrics=stats)
  172. return queue.put(to_netdata)
  173. def fetch_data(raw_data, metrics):
  174. data = dict()
  175. for metric in metrics:
  176. value = raw_data
  177. metrics_list = metric.split('.')
  178. try:
  179. for m in metrics_list:
  180. value = value[m]
  181. except KeyError:
  182. continue
  183. data['_'.join(metrics_list)] = value
  184. return data