rabbitmq.chart.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. # -*- coding: utf-8 -*-
  2. # Description: rabbitmq netdata python.d module
  3. # Author: l2isbad
  4. from collections import namedtuple
  5. from json import loads
  6. from socket import gethostbyname, gaierror
  7. from threading import Thread
  8. try:
  9. from queue import Queue
  10. except ImportError:
  11. from Queue import Queue
  12. from bases.FrameworkServices.UrlService import UrlService
  13. # default module values (can be overridden per job in `config`)
  14. update_every = 1
  15. priority = 60000
  16. retries = 60
  17. METHODS = namedtuple('METHODS', ['get_data', 'url', 'stats'])
  18. NODE_STATS = ['fd_used',
  19. 'mem_used',
  20. 'sockets_used',
  21. 'proc_used',
  22. 'disk_free'
  23. ]
  24. OVERVIEW_STATS = ['object_totals.channels',
  25. 'object_totals.consumers',
  26. 'object_totals.connections',
  27. 'object_totals.queues',
  28. 'object_totals.exchanges',
  29. 'queue_totals.messages_ready',
  30. 'queue_totals.messages_unacknowledged',
  31. 'message_stats.ack',
  32. 'message_stats.redeliver',
  33. 'message_stats.deliver',
  34. 'message_stats.publish'
  35. ]
  36. ORDER = ['queued_messages', 'message_rates', 'global_counts',
  37. 'file_descriptors', 'socket_descriptors', 'erlang_processes', 'memory', 'disk_space']
  38. CHARTS = {
  39. 'file_descriptors': {
  40. 'options': [None, 'File Descriptors', 'descriptors', 'overview',
  41. 'rabbitmq.file_descriptors', 'line'],
  42. 'lines': [
  43. ['fd_used', 'used', 'absolute']
  44. ]},
  45. 'memory': {
  46. 'options': [None, 'Memory', 'MB', 'overview',
  47. 'rabbitmq.memory', 'line'],
  48. 'lines': [
  49. ['mem_used', 'used', 'absolute', 1, 1024 << 10]
  50. ]},
  51. 'disk_space': {
  52. 'options': [None, 'Disk Space', 'GB', 'overview',
  53. 'rabbitmq.disk_space', 'line'],
  54. 'lines': [
  55. ['disk_free', 'free', 'absolute', 1, 1024 ** 3]
  56. ]},
  57. 'socket_descriptors': {
  58. 'options': [None, 'Socket Descriptors', 'descriptors', 'overview',
  59. 'rabbitmq.sockets', 'line'],
  60. 'lines': [
  61. ['sockets_used', 'used', 'absolute']
  62. ]},
  63. 'erlang_processes': {
  64. 'options': [None, 'Erlang Processes', 'processes', 'overview',
  65. 'rabbitmq.processes', 'line'],
  66. 'lines': [
  67. ['proc_used', 'used', 'absolute']
  68. ]},
  69. 'global_counts': {
  70. 'options': [None, 'Global Counts', 'counts', 'overview',
  71. 'rabbitmq.global_counts', 'line'],
  72. 'lines': [
  73. ['object_totals_channels', 'channels', 'absolute'],
  74. ['object_totals_consumers', 'consumers', 'absolute'],
  75. ['object_totals_connections', 'connections', 'absolute'],
  76. ['object_totals_queues', 'queues', 'absolute'],
  77. ['object_totals_exchanges', 'exchanges', 'absolute']
  78. ]},
  79. 'queued_messages': {
  80. 'options': [None, 'Queued Messages', 'messages', 'overview',
  81. 'rabbitmq.queued_messages', 'stacked'],
  82. 'lines': [
  83. ['queue_totals_messages_ready', 'ready', 'absolute'],
  84. ['queue_totals_messages_unacknowledged', 'unacknowledged', 'absolute']
  85. ]},
  86. 'message_rates': {
  87. 'options': [None, 'Message Rates', 'messages/s', 'overview',
  88. 'rabbitmq.message_rates', 'stacked'],
  89. 'lines': [
  90. ['message_stats_ack', 'ack', 'incremental'],
  91. ['message_stats_redeliver', 'redeliver', 'incremental'],
  92. ['message_stats_deliver', 'deliver', 'incremental'],
  93. ['message_stats_publish', 'publish', 'incremental']
  94. ]}
  95. }
  96. class Service(UrlService):
  97. def __init__(self, configuration=None, name=None):
  98. UrlService.__init__(self, configuration=configuration, name=name)
  99. self.order = ORDER
  100. self.definitions = CHARTS
  101. self.host = self.configuration.get('host', '127.0.0.1')
  102. self.port = self.configuration.get('port', 15672)
  103. self.scheme = self.configuration.get('scheme', 'http')
  104. def check(self):
  105. # We can't start if <host> AND <port> not specified
  106. if not (self.host and self.port):
  107. self.error('Host is not defined in the module configuration file')
  108. return False
  109. # Hostname -> ip address
  110. try:
  111. self.host = gethostbyname(self.host)
  112. except gaierror as error:
  113. self.error(str(error))
  114. return False
  115. # Add handlers (auth, self signed cert accept)
  116. self.url = '{scheme}://{host}:{port}/api'.format(scheme=self.scheme,
  117. host=self.host,
  118. port=self.port)
  119. # Add methods
  120. api_node = self.url + '/nodes'
  121. api_overview = self.url + '/overview'
  122. self.methods = [METHODS(get_data=self._get_overview_stats,
  123. url=api_node,
  124. stats=NODE_STATS),
  125. METHODS(get_data=self._get_overview_stats,
  126. url=api_overview,
  127. stats=OVERVIEW_STATS)]
  128. return UrlService.check(self)
  129. def _get_data(self):
  130. threads = list()
  131. queue = Queue()
  132. result = dict()
  133. for method in self.methods:
  134. th = Thread(target=method.get_data,
  135. args=(queue, method.url, method.stats))
  136. th.start()
  137. threads.append(th)
  138. for thread in threads:
  139. thread.join()
  140. result.update(queue.get())
  141. return result or None
  142. def _get_overview_stats(self, queue, url, stats):
  143. """
  144. Format data received from http request
  145. :return: dict
  146. """
  147. raw_data = self._get_raw_data(url)
  148. if not raw_data:
  149. return queue.put(dict())
  150. data = loads(raw_data)
  151. data = data[0] if isinstance(data, list) else data
  152. to_netdata = fetch_data(raw_data=data, metrics=stats)
  153. return queue.put(to_netdata)
  154. def fetch_data(raw_data, metrics):
  155. data = dict()
  156. for metric in metrics:
  157. value = raw_data
  158. metrics_list = metric.split('.')
  159. try:
  160. for m in metrics_list:
  161. value = value[m]
  162. except KeyError:
  163. continue
  164. data['_'.join(metrics_list)] = value
  165. return data