rabbitmq.chart.py 15 KB


  1. # -*- coding: utf-8 -*-
  2. # Description: rabbitmq netdata python.d module
  3. # Author: ilyam8
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. from json import loads
  6. from bases.FrameworkServices.UrlService import UrlService
  7. API_NODE = 'api/nodes'
  8. API_OVERVIEW = 'api/overview'
  9. API_QUEUES = 'api/queues'
  10. API_VHOSTS = 'api/vhosts'
  11. NODE_STATS = [
  12. 'fd_used',
  13. 'mem_used',
  14. 'sockets_used',
  15. 'proc_used',
  16. 'disk_free',
  17. 'run_queue'
  18. ]
  19. OVERVIEW_STATS = [
  20. 'object_totals.channels',
  21. 'object_totals.consumers',
  22. 'object_totals.connections',
  23. 'object_totals.queues',
  24. 'object_totals.exchanges',
  25. 'queue_totals.messages_ready',
  26. 'queue_totals.messages_unacknowledged',
  27. 'message_stats.ack',
  28. 'message_stats.redeliver',
  29. 'message_stats.deliver',
  30. 'message_stats.publish',
  31. 'churn_rates.connection_created_details.rate',
  32. 'churn_rates.connection_closed_details.rate',
  33. 'churn_rates.channel_created_details.rate',
  34. 'churn_rates.channel_closed_details.rate',
  35. 'churn_rates.queue_created_details.rate',
  36. 'churn_rates.queue_declared_details.rate',
  37. 'churn_rates.queue_deleted_details.rate'
  38. ]
  39. QUEUE_STATS = [
  40. 'messages',
  41. 'messages_paged_out',
  42. 'messages_persistent',
  43. 'messages_ready',
  44. 'messages_unacknowledged',
  45. 'message_stats.ack',
  46. 'message_stats.confirm',
  47. 'message_stats.deliver',
  48. 'message_stats.get',
  49. 'message_stats.get_no_ack',
  50. 'message_stats.publish',
  51. 'message_stats.redeliver',
  52. 'message_stats.return_unroutable',
  53. ]
  54. VHOST_MESSAGE_STATS = [
  55. 'message_stats.ack',
  56. 'message_stats.confirm',
  57. 'message_stats.deliver',
  58. 'message_stats.get',
  59. 'message_stats.get_no_ack',
  60. 'message_stats.publish',
  61. 'message_stats.redeliver',
  62. 'message_stats.return_unroutable',
  63. ]
  64. ORDER = [
  65. 'queued_messages',
  66. 'connection_churn_rates',
  67. 'channel_churn_rates',
  68. 'queue_churn_rates',
  69. 'message_rates',
  70. 'global_counts',
  71. 'file_descriptors',
  72. 'socket_descriptors',
  73. 'erlang_processes',
  74. 'erlang_run_queue',
  75. 'memory',
  76. 'disk_space'
  77. ]
  78. CHARTS = {
  79. 'file_descriptors': {
  80. 'options': [None, 'File Descriptors', 'descriptors', 'overview', 'rabbitmq.file_descriptors', 'line'],
  81. 'lines': [
  82. ['fd_used', 'used', 'absolute']
  83. ]
  84. },
  85. 'memory': {
  86. 'options': [None, 'Memory', 'MiB', 'overview', 'rabbitmq.memory', 'area'],
  87. 'lines': [
  88. ['mem_used', 'used', 'absolute', 1, 1 << 20]
  89. ]
  90. },
  91. 'disk_space': {
  92. 'options': [None, 'Disk Space', 'GiB', 'overview', 'rabbitmq.disk_space', 'area'],
  93. 'lines': [
  94. ['disk_free', 'free', 'absolute', 1, 1 << 30]
  95. ]
  96. },
  97. 'socket_descriptors': {
  98. 'options': [None, 'Socket Descriptors', 'descriptors', 'overview', 'rabbitmq.sockets', 'line'],
  99. 'lines': [
  100. ['sockets_used', 'used', 'absolute']
  101. ]
  102. },
  103. 'erlang_processes': {
  104. 'options': [None, 'Erlang Processes', 'processes', 'overview', 'rabbitmq.processes', 'line'],
  105. 'lines': [
  106. ['proc_used', 'used', 'absolute']
  107. ]
  108. },
  109. 'erlang_run_queue': {
  110. 'options': [None, 'Erlang Run Queue', 'processes', 'overview', 'rabbitmq.erlang_run_queue', 'line'],
  111. 'lines': [
  112. ['run_queue', 'length', 'absolute']
  113. ]
  114. },
  115. 'global_counts': {
  116. 'options': [None, 'Global Counts', 'counts', 'overview', 'rabbitmq.global_counts', 'line'],
  117. 'lines': [
  118. ['object_totals_channels', 'channels', 'absolute'],
  119. ['object_totals_consumers', 'consumers', 'absolute'],
  120. ['object_totals_connections', 'connections', 'absolute'],
  121. ['object_totals_queues', 'queues', 'absolute'],
  122. ['object_totals_exchanges', 'exchanges', 'absolute']
  123. ]
  124. },
  125. 'connection_churn_rates': {
  126. 'options': [None, 'Connection Churn Rates', 'operations/s', 'overview', 'rabbitmq.connection_churn_rates', 'line'],
  127. 'lines': [
  128. ['churn_rates_connection_created_details_rate', 'created', 'absolute'],
  129. ['churn_rates_connection_closed_details_rate', 'closed', 'absolute']
  130. ]
  131. },
  132. 'channel_churn_rates': {
  133. 'options': [None, 'Channel Churn Rates', 'operations/s', 'overview', 'rabbitmq.channel_churn_rates', 'line'],
  134. 'lines': [
  135. ['churn_rates_channel_created_details_rate', 'created', 'absolute'],
  136. ['churn_rates_channel_closed_details_rate', 'closed', 'absolute']
  137. ]
  138. },
  139. 'queue_churn_rates': {
  140. 'options': [None, 'Queue Churn Rates', 'operations/s', 'overview', 'rabbitmq.queue_churn_rates', 'line'],
  141. 'lines': [
  142. ['churn_rates_queue_created_details_rate', 'created', 'absolute'],
  143. ['churn_rates_queue_declared_details_rate', 'declared', 'absolute'],
  144. ['churn_rates_queue_deleted_details_rate', 'deleted', 'absolute']
  145. ]
  146. },
  147. 'queued_messages': {
  148. 'options': [None, 'Queued Messages', 'messages', 'overview', 'rabbitmq.queued_messages', 'stacked'],
  149. 'lines': [
  150. ['queue_totals_messages_ready', 'ready', 'absolute'],
  151. ['queue_totals_messages_unacknowledged', 'unacknowledged', 'absolute']
  152. ]
  153. },
  154. 'message_rates': {
  155. 'options': [None, 'Message Rates', 'messages/s', 'overview', 'rabbitmq.message_rates', 'line'],
  156. 'lines': [
  157. ['message_stats_ack', 'ack', 'incremental'],
  158. ['message_stats_redeliver', 'redeliver', 'incremental'],
  159. ['message_stats_deliver', 'deliver', 'incremental'],
  160. ['message_stats_publish', 'publish', 'incremental']
  161. ]
  162. }
  163. }
  164. def vhost_chart_template(name):
  165. order = [
  166. 'vhost_{0}_message_stats'.format(name),
  167. ]
  168. family = 'vhost {0}'.format(name)
  169. charts = {
  170. order[0]: {
  171. 'options': [
  172. None, 'Vhost "{0}" Messages'.format(name), 'messages/s', family, 'rabbitmq.vhost_messages', 'stacked'],
  173. 'lines': [
  174. ['vhost_{0}_message_stats_ack'.format(name), 'ack', 'incremental'],
  175. ['vhost_{0}_message_stats_confirm'.format(name), 'confirm', 'incremental'],
  176. ['vhost_{0}_message_stats_deliver'.format(name), 'deliver', 'incremental'],
  177. ['vhost_{0}_message_stats_get'.format(name), 'get', 'incremental'],
  178. ['vhost_{0}_message_stats_get_no_ack'.format(name), 'get_no_ack', 'incremental'],
  179. ['vhost_{0}_message_stats_publish'.format(name), 'publish', 'incremental'],
  180. ['vhost_{0}_message_stats_redeliver'.format(name), 'redeliver', 'incremental'],
  181. ['vhost_{0}_message_stats_return_unroutable'.format(name), 'return_unroutable', 'incremental'],
  182. ]
  183. },
  184. }
  185. return order, charts
  186. def queue_chart_template(queue_id):
  187. vhost, name = queue_id
  188. order = [
  189. 'vhost_{0}_queue_{1}_queued_message'.format(vhost, name),
  190. 'vhost_{0}_queue_{1}_messages_stats'.format(vhost, name),
  191. ]
  192. family = 'vhost {0}'.format(vhost)
  193. charts = {
  194. order[0]: {
  195. 'options': [
  196. None, 'Queue "{0}" in "{1}" queued messages'.format(name, vhost), 'messages', family, 'rabbitmq.queue_messages', 'line'],
  197. 'lines': [
  198. ['vhost_{0}_queue_{1}_messages'.format(vhost, name), 'messages', 'absolute'],
  199. ['vhost_{0}_queue_{1}_messages_paged_out'.format(vhost, name), 'paged_out', 'absolute'],
  200. ['vhost_{0}_queue_{1}_messages_persistent'.format(vhost, name), 'persistent', 'absolute'],
  201. ['vhost_{0}_queue_{1}_messages_ready'.format(vhost, name), 'ready', 'absolute'],
  202. ['vhost_{0}_queue_{1}_messages_unacknowledged'.format(vhost, name), 'unack', 'absolute'],
  203. ]
  204. },
  205. order[1]: {
  206. 'options': [
  207. None, 'Queue "{0}" in "{1}" messages stats'.format(name, vhost), 'messages/s', family, 'rabbitmq.queue_messages_stats', 'line'],
  208. 'lines': [
  209. ['vhost_{0}_queue_{1}_message_stats_ack'.format(vhost, name), 'ack', 'incremental'],
  210. ['vhost_{0}_queue_{1}_message_stats_confirm'.format(vhost, name), 'confirm', 'incremental'],
  211. ['vhost_{0}_queue_{1}_message_stats_deliver'.format(vhost, name), 'deliver', 'incremental'],
  212. ['vhost_{0}_queue_{1}_message_stats_get'.format(vhost, name), 'get', 'incremental'],
  213. ['vhost_{0}_queue_{1}_message_stats_get_no_ack'.format(vhost, name), 'get_no_ack', 'incremental'],
  214. ['vhost_{0}_queue_{1}_message_stats_publish'.format(vhost, name), 'publish', 'incremental'],
  215. ['vhost_{0}_queue_{1}_message_stats_redeliver'.format(vhost, name), 'redeliver', 'incremental'],
  216. ['vhost_{0}_queue_{1}_message_stats_return_unroutable'.format(vhost, name), 'return_unroutable', 'incremental'],
  217. ]
  218. },
  219. }
  220. return order, charts
  221. class VhostStatsBuilder:
  222. def __init__(self):
  223. self.stats = None
  224. def set(self, raw_stats):
  225. self.stats = raw_stats
  226. def name(self):
  227. return self.stats['name']
  228. def has_msg_stats(self):
  229. return bool(self.stats.get('message_stats'))
  230. def msg_stats(self):
  231. name = self.name()
  232. stats = fetch_data(raw_data=self.stats, metrics=VHOST_MESSAGE_STATS)
  233. return dict(('vhost_{0}_{1}'.format(name, k), v) for k, v in stats.items())
  234. class QueueStatsBuilder:
  235. def __init__(self):
  236. self.stats = None
  237. def set(self, raw_stats):
  238. self.stats = raw_stats
  239. def id(self):
  240. return self.stats['vhost'], self.stats['name']
  241. def queue_stats(self):
  242. vhost, name = self.id()
  243. stats = fetch_data(raw_data=self.stats, metrics=QUEUE_STATS)
  244. return dict(('vhost_{0}_queue_{1}_{2}'.format(vhost, name, k), v) for k, v in stats.items())
  245. class Service(UrlService):
  246. def __init__(self, configuration=None, name=None):
  247. UrlService.__init__(self, configuration=configuration, name=name)
  248. self.order = ORDER
  249. self.definitions = CHARTS
  250. self.url = '{0}://{1}:{2}'.format(
  251. configuration.get('scheme', 'http'),
  252. configuration.get('host', '127.0.0.1'),
  253. configuration.get('port', 15672),
  254. )
  255. self.node_name = str()
  256. self.vhost = VhostStatsBuilder()
  257. self.collected_vhosts = set()
  258. self.collect_queues_metrics = configuration.get('collect_queues_metrics', False)
  259. self.debug("collect_queues_metrics is {0}".format("enabled" if self.collect_queues_metrics else "disabled"))
  260. if self.collect_queues_metrics:
  261. self.queue = QueueStatsBuilder()
  262. self.collected_queues = set()
  263. def _get_data(self):
  264. data = dict()
  265. stats = self.get_overview_stats()
  266. if not stats:
  267. return None
  268. data.update(stats)
  269. stats = self.get_nodes_stats()
  270. if not stats:
  271. return None
  272. data.update(stats)
  273. stats = self.get_vhosts_stats()
  274. if stats:
  275. data.update(stats)
  276. if self.collect_queues_metrics:
  277. stats = self.get_queues_stats()
  278. if stats:
  279. data.update(stats)
  280. return data or None
  281. def get_overview_stats(self):
  282. url = '{0}/{1}'.format(self.url, API_OVERVIEW)
  283. self.debug("doing http request to '{0}'".format(url))
  284. raw = self._get_raw_data(url)
  285. if not raw:
  286. return None
  287. data = loads(raw)
  288. self.node_name = data['node']
  289. self.debug("found node name: '{0}'".format(self.node_name))
  290. stats = fetch_data(raw_data=data, metrics=OVERVIEW_STATS)
  291. self.debug("number of metrics: {0}".format(len(stats)))
  292. return stats
  293. def get_nodes_stats(self):
  294. if self.node_name == "":
  295. self.error("trying to get node stats, but node name is not set")
  296. return None
  297. url = '{0}/{1}/{2}'.format(self.url, API_NODE, self.node_name)
  298. self.debug("doing http request to '{0}'".format(url))
  299. raw = self._get_raw_data(url)
  300. if not raw:
  301. return None
  302. data = loads(raw)
  303. stats = fetch_data(raw_data=data, metrics=NODE_STATS)
  304. handle_disabled_disk_monitoring(stats)
  305. self.debug("number of metrics: {0}".format(len(stats)))
  306. return stats
  307. def get_vhosts_stats(self):
  308. url = '{0}/{1}'.format(self.url, API_VHOSTS)
  309. self.debug("doing http request to '{0}'".format(url))
  310. raw = self._get_raw_data(url)
  311. if not raw:
  312. return None
  313. data = dict()
  314. vhosts = loads(raw)
  315. charts_initialized = len(self.charts) > 0
  316. for vhost in vhosts:
  317. self.vhost.set(vhost)
  318. if not self.vhost.has_msg_stats():
  319. continue
  320. if charts_initialized and self.vhost.name() not in self.collected_vhosts:
  321. self.collected_vhosts.add(self.vhost.name())
  322. self.add_vhost_charts(self.vhost.name())
  323. data.update(self.vhost.msg_stats())
  324. self.debug("number of vhosts: {0}, metrics: {1}".format(len(vhosts), len(data)))
  325. return data
  326. def get_queues_stats(self):
  327. url = '{0}/{1}'.format(self.url, API_QUEUES)
  328. self.debug("doing http request to '{0}'".format(url))
  329. raw = self._get_raw_data(url)
  330. if not raw:
  331. return None
  332. data = dict()
  333. queues = loads(raw)
  334. charts_initialized = len(self.charts) > 0
  335. for queue in queues:
  336. self.queue.set(queue)
  337. if self.queue.id()[0] not in self.collected_vhosts:
  338. continue
  339. if charts_initialized and self.queue.id() not in self.collected_queues:
  340. self.collected_queues.add(self.queue.id())
  341. self.add_queue_charts(self.queue.id())
  342. data.update(self.queue.queue_stats())
  343. self.debug("number of queues: {0}, metrics: {1}".format(len(queues), len(data)))
  344. return data
  345. def add_vhost_charts(self, vhost_name):
  346. order, charts = vhost_chart_template(vhost_name)
  347. for chart_name in order:
  348. params = [chart_name] + charts[chart_name]['options']
  349. dimensions = charts[chart_name]['lines']
  350. new_chart = self.charts.add_chart(params)
  351. for dimension in dimensions:
  352. new_chart.add_dimension(dimension)
  353. def add_queue_charts(self, queue_id):
  354. order, charts = queue_chart_template(queue_id)
  355. for chart_name in order:
  356. params = [chart_name] + charts[chart_name]['options']
  357. dimensions = charts[chart_name]['lines']
  358. new_chart = self.charts.add_chart(params)
  359. for dimension in dimensions:
  360. new_chart.add_dimension(dimension)
  361. def fetch_data(raw_data, metrics):
  362. data = dict()
  363. for metric in metrics:
  364. value = raw_data
  365. metrics_list = metric.split('.')
  366. try:
  367. for m in metrics_list:
  368. value = value[m]
  369. except (KeyError, TypeError):
  370. continue
  371. data['_'.join(metrics_list)] = value
  372. return data
  373. def handle_disabled_disk_monitoring(node_stats):
  374. # https://github.com/netdata/netdata/issues/7218
  375. # can be "disk_free": "disk_free_monitoring_disabled"
  376. v = node_stats.get('disk_free')
  377. if v and not isinstance(v, int):
  378. del node_stats['disk_free']