rabbitmq.chart.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # -*- coding: utf-8 -*-
  2. # Description: rabbitmq netdata python.d module
  3. # Author: l2isbad
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. from collections import namedtuple
  6. from json import loads
  7. from socket import gethostbyname, gaierror
  8. from threading import Thread
  9. try:
  10. from queue import Queue
  11. except ImportError:
  12. from Queue import Queue
  13. from bases.FrameworkServices.UrlService import UrlService
  14. # default module values (can be overridden per job in `config`)
  15. update_every = 1
  16. priority = 60000
  17. METHODS = namedtuple('METHODS', ['get_data', 'url', 'stats'])
  18. NODE_STATS = [
  19. 'fd_used',
  20. 'mem_used',
  21. 'sockets_used',
  22. 'proc_used',
  23. 'disk_free',
  24. 'run_queue'
  25. ]
  26. OVERVIEW_STATS = [
  27. 'object_totals.channels',
  28. 'object_totals.consumers',
  29. 'object_totals.connections',
  30. 'object_totals.queues',
  31. 'object_totals.exchanges',
  32. 'queue_totals.messages_ready',
  33. 'queue_totals.messages_unacknowledged',
  34. 'message_stats.ack',
  35. 'message_stats.redeliver',
  36. 'message_stats.deliver',
  37. 'message_stats.publish'
  38. ]
  39. ORDER = [
  40. 'queued_messages',
  41. 'message_rates',
  42. 'global_counts',
  43. 'file_descriptors',
  44. 'socket_descriptors',
  45. 'erlang_processes',
  46. 'erlang_run_queue',
  47. 'memory',
  48. 'disk_space'
  49. ]
  50. CHARTS = {
  51. 'file_descriptors': {
  52. 'options': [None, 'File Descriptors', 'descriptors', 'overview', 'rabbitmq.file_descriptors', 'line'],
  53. 'lines': [
  54. ['fd_used', 'used', 'absolute']
  55. ]
  56. },
  57. 'memory': {
  58. 'options': [None, 'Memory', 'MB', 'overview', 'rabbitmq.memory', 'area'],
  59. 'lines': [
  60. ['mem_used', 'used', 'absolute', 1, 1024 << 10]
  61. ]
  62. },
  63. 'disk_space': {
  64. 'options': [None, 'Disk Space', 'GB', 'overview', 'rabbitmq.disk_space', 'area'],
  65. 'lines': [
  66. ['disk_free', 'free', 'absolute', 1, 1024 ** 3]
  67. ]
  68. },
  69. 'socket_descriptors': {
  70. 'options': [None, 'Socket Descriptors', 'descriptors', 'overview', 'rabbitmq.sockets', 'line'],
  71. 'lines': [
  72. ['sockets_used', 'used', 'absolute']
  73. ]
  74. },
  75. 'erlang_processes': {
  76. 'options': [None, 'Erlang Processes', 'processes', 'overview', 'rabbitmq.processes', 'line'],
  77. 'lines': [
  78. ['proc_used', 'used', 'absolute']
  79. ]
  80. },
  81. 'erlang_run_queue': {
  82. 'options': [None, 'Erlang Run Queue', 'processes', 'overview', 'rabbitmq.erlang_run_queue', 'line'],
  83. 'lines': [
  84. ['run_queue', 'length', 'absolute']
  85. ]
  86. },
  87. 'global_counts': {
  88. 'options': [None, 'Global Counts', 'counts', 'overview', 'rabbitmq.global_counts', 'line'],
  89. 'lines': [
  90. ['object_totals_channels', 'channels', 'absolute'],
  91. ['object_totals_consumers', 'consumers', 'absolute'],
  92. ['object_totals_connections', 'connections', 'absolute'],
  93. ['object_totals_queues', 'queues', 'absolute'],
  94. ['object_totals_exchanges', 'exchanges', 'absolute']
  95. ]
  96. },
  97. 'queued_messages': {
  98. 'options': [None, 'Queued Messages', 'messages', 'overview', 'rabbitmq.queued_messages', 'stacked'],
  99. 'lines': [
  100. ['queue_totals_messages_ready', 'ready', 'absolute'],
  101. ['queue_totals_messages_unacknowledged', 'unacknowledged', 'absolute']
  102. ]
  103. },
  104. 'message_rates': {
  105. 'options': [None, 'Message Rates', 'messages/s', 'overview', 'rabbitmq.message_rates', 'line'],
  106. 'lines': [
  107. ['message_stats_ack', 'ack', 'incremental'],
  108. ['message_stats_redeliver', 'redeliver', 'incremental'],
  109. ['message_stats_deliver', 'deliver', 'incremental'],
  110. ['message_stats_publish', 'publish', 'incremental']
  111. ]
  112. }
  113. }
  114. class Service(UrlService):
  115. def __init__(self, configuration=None, name=None):
  116. UrlService.__init__(self, configuration=configuration, name=name)
  117. self.order = ORDER
  118. self.definitions = CHARTS
  119. self.host = self.configuration.get('host', '127.0.0.1')
  120. self.port = self.configuration.get('port', 15672)
  121. self.scheme = self.configuration.get('scheme', 'http')
  122. def check(self):
  123. # We can't start if <host> AND <port> not specified
  124. if not (self.host and self.port):
  125. self.error('Host is not defined in the module configuration file')
  126. return False
  127. # Hostname -> ip address
  128. try:
  129. self.host = gethostbyname(self.host)
  130. except gaierror as error:
  131. self.error(str(error))
  132. return False
  133. # Add handlers (auth, self signed cert accept)
  134. self.url = '{scheme}://{host}:{port}/api'.format(scheme=self.scheme,
  135. host=self.host,
  136. port=self.port)
  137. # Add methods
  138. api_node = self.url + '/nodes'
  139. api_overview = self.url + '/overview'
  140. self.methods = [METHODS(get_data=self._get_overview_stats,
  141. url=api_node,
  142. stats=NODE_STATS),
  143. METHODS(get_data=self._get_overview_stats,
  144. url=api_overview,
  145. stats=OVERVIEW_STATS)]
  146. return UrlService.check(self)
  147. def _get_data(self):
  148. threads = list()
  149. queue = Queue()
  150. result = dict()
  151. for method in self.methods:
  152. th = Thread(target=method.get_data,
  153. args=(queue, method.url, method.stats))
  154. th.start()
  155. threads.append(th)
  156. for thread in threads:
  157. thread.join()
  158. result.update(queue.get())
  159. return result or None
  160. def _get_overview_stats(self, queue, url, stats):
  161. """
  162. Format data received from http request
  163. :return: dict
  164. """
  165. raw_data = self._get_raw_data(url)
  166. if not raw_data:
  167. return queue.put(dict())
  168. data = loads(raw_data)
  169. data = data[0] if isinstance(data, list) else data
  170. to_netdata = fetch_data(raw_data=data, metrics=stats)
  171. return queue.put(to_netdata)
  172. def fetch_data(raw_data, metrics):
  173. data = dict()
  174. for metric in metrics:
  175. value = raw_data
  176. metrics_list = metric.split('.')
  177. try:
  178. for m in metrics_list:
  179. value = value[m]
  180. except KeyError:
  181. continue
  182. data['_'.join(metrics_list)] = value
  183. return data