postgres.chart.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # -*- coding: utf-8 -*-
  2. # Description: example netdata python.d module
  3. # Authors: facetoe, dangtranhoang
  4. from copy import deepcopy
  5. try:
  6. import psycopg2
  7. from psycopg2 import extensions
  8. from psycopg2.extras import DictCursor
  9. from psycopg2 import OperationalError
  10. PSYCOPG2 = True
  11. except ImportError:
  12. PSYCOPG2 = False
  13. from bases.FrameworkServices.SimpleService import SimpleService
  14. # default module values
  15. update_every = 1
  16. priority = 60000
  17. retries = 60
  18. METRICS = dict(
  19. DATABASE=['connections',
  20. 'xact_commit',
  21. 'xact_rollback',
  22. 'blks_read',
  23. 'blks_hit',
  24. 'tup_returned',
  25. 'tup_fetched',
  26. 'tup_inserted',
  27. 'tup_updated',
  28. 'tup_deleted',
  29. 'conflicts',
  30. 'size'],
  31. BACKENDS=['backends_active',
  32. 'backends_idle'],
  33. INDEX_STATS=['index_count',
  34. 'index_size'],
  35. TABLE_STATS=['table_size',
  36. 'table_count'],
  37. ARCHIVE=['ready_count',
  38. 'done_count',
  39. 'file_count'],
  40. BGWRITER=['writer_scheduled',
  41. 'writer_requested'],
  42. LOCKS=['ExclusiveLock',
  43. 'RowShareLock',
  44. 'SIReadLock',
  45. 'ShareUpdateExclusiveLock',
  46. 'AccessExclusiveLock',
  47. 'AccessShareLock',
  48. 'ShareRowExclusiveLock',
  49. 'ShareLock',
  50. 'RowExclusiveLock']
  51. )
  52. QUERIES = dict(
  53. ARCHIVE="""
  54. SELECT
  55. CAST(COUNT(*) AS INT) AS file_count,
  56. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
  57. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
  58. FROM
  59. pg_catalog.pg_ls_dir('{0}/archive_status') AS archive_files (archive_file);
  60. """,
  61. BACKENDS="""
  62. SELECT
  63. count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
  64. (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
  65. FROM pg_stat_activity;
  66. """,
  67. TABLE_STATS="""
  68. SELECT
  69. ((sum(relpages) * 8) * 1024) AS table_size,
  70. count(1) AS table_count
  71. FROM pg_class
  72. WHERE relkind IN ('r', 't');
  73. """,
  74. INDEX_STATS="""
  75. SELECT
  76. ((sum(relpages) * 8) * 1024) AS index_size,
  77. count(1) AS index_count
  78. FROM pg_class
  79. WHERE relkind = 'i';""",
  80. DATABASE="""
  81. SELECT
  82. datname AS database_name,
  83. sum(numbackends) AS connections,
  84. sum(xact_commit) AS xact_commit,
  85. sum(xact_rollback) AS xact_rollback,
  86. sum(blks_read) AS blks_read,
  87. sum(blks_hit) AS blks_hit,
  88. sum(tup_returned) AS tup_returned,
  89. sum(tup_fetched) AS tup_fetched,
  90. sum(tup_inserted) AS tup_inserted,
  91. sum(tup_updated) AS tup_updated,
  92. sum(tup_deleted) AS tup_deleted,
  93. sum(conflicts) AS conflicts,
  94. pg_database_size(datname) AS size
  95. FROM pg_stat_database
  96. WHERE datname IN %(databases)s
  97. GROUP BY datname;
  98. """,
  99. BGWRITER="""
  100. SELECT
  101. checkpoints_timed AS writer_scheduled,
  102. checkpoints_req AS writer_requested
  103. FROM pg_stat_bgwriter;""",
  104. LOCKS="""
  105. SELECT
  106. pg_database.datname as database_name,
  107. mode,
  108. count(mode) AS locks_count
  109. FROM pg_locks
  110. INNER JOIN pg_database ON pg_database.oid = pg_locks.database
  111. GROUP BY datname, mode
  112. ORDER BY datname, mode;
  113. """,
  114. FIND_DATABASES="""
  115. SELECT datname
  116. FROM pg_stat_database
  117. WHERE has_database_privilege((SELECT current_user), datname, 'connect')
  118. AND NOT datname ~* '^template\d+';
  119. """,
  120. IF_SUPERUSER="""
  121. SELECT current_setting('is_superuser') = 'on' AS is_superuser;
  122. """,
  123. DETECT_SERVER_VERSION="""
  124. SHOW server_version_num;
  125. """
  126. )
  127. QUERY_STATS = {
  128. QUERIES['DATABASE']: METRICS['DATABASE'],
  129. QUERIES['BACKENDS']: METRICS['BACKENDS'],
  130. QUERIES['LOCKS']: METRICS['LOCKS']
  131. }
  132. ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size',
  133. 'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
  134. CHARTS = {
  135. 'db_stat_transactions': {
  136. 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
  137. 'line'],
  138. 'lines': [
  139. ['xact_commit', 'committed', 'incremental'],
  140. ['xact_rollback', 'rolled back', 'incremental']
  141. ]},
  142. 'db_stat_connections': {
  143. 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
  144. 'line'],
  145. 'lines': [
  146. ['connections', 'connections', 'absolute']
  147. ]},
  148. 'db_stat_tuple_read': {
  149. 'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
  150. 'lines': [
  151. ['blks_read', 'disk', 'incremental'],
  152. ['blks_hit', 'cache', 'incremental']
  153. ]},
  154. 'db_stat_tuple_returned': {
  155. 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
  156. 'line'],
  157. 'lines': [
  158. ['tup_returned', 'sequential', 'incremental'],
  159. ['tup_fetched', 'bitmap', 'incremental']
  160. ]},
  161. 'db_stat_tuple_write': {
  162. 'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
  163. 'lines': [
  164. ['tup_inserted', 'inserted', 'incremental'],
  165. ['tup_updated', 'updated', 'incremental'],
  166. ['tup_deleted', 'deleted', 'incremental'],
  167. ['conflicts', 'conflicts', 'incremental']
  168. ]},
  169. 'database_size': {
  170. 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'],
  171. 'lines': [
  172. ]},
  173. 'backend_process': {
  174. 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
  175. 'line'],
  176. 'lines': [
  177. ['backends_active', 'active', 'absolute'],
  178. ['backends_idle', 'idle', 'absolute']
  179. ]},
  180. 'index_count': {
  181. 'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
  182. 'lines': [
  183. ['index_count', 'total', 'absolute']
  184. ]},
  185. 'index_size': {
  186. 'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
  187. 'lines': [
  188. ['index_size', 'size', 'absolute', 1, 1024 * 1024]
  189. ]},
  190. 'table_count': {
  191. 'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
  192. 'lines': [
  193. ['table_count', 'total', 'absolute']
  194. ]},
  195. 'table_size': {
  196. 'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
  197. 'lines': [
  198. ['table_size', 'size', 'absolute', 1, 1024 * 1024]
  199. ]},
  200. 'wal': {
  201. 'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
  202. 'lines': [
  203. ['file_count', 'total', 'incremental'],
  204. ['ready_count', 'ready', 'incremental'],
  205. ['done_count', 'done', 'incremental']
  206. ]},
  207. 'background_writer': {
  208. 'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
  209. 'lines': [
  210. ['writer_scheduled', 'scheduled', 'incremental'],
  211. ['writer_requested', 'requested', 'incremental']
  212. ]}
  213. }
  214. class Service(SimpleService):
  215. def __init__(self, configuration=None, name=None):
  216. SimpleService.__init__(self, configuration=configuration, name=name)
  217. self.order = ORDER[:]
  218. self.definitions = deepcopy(CHARTS)
  219. self.table_stats = configuration.pop('table_stats', False)
  220. self.index_stats = configuration.pop('index_stats', False)
  221. self.database_poll = configuration.pop('database_poll', None)
  222. self.configuration = configuration
  223. self.connection = False
  224. self.server_version = None
  225. self.data = dict()
  226. self.locks_zeroed = dict()
  227. self.databases = list()
  228. self.queries = QUERY_STATS.copy()
  229. def _connect(self):
  230. params = dict(user='postgres',
  231. database=None,
  232. password=None,
  233. host=None,
  234. port=5432)
  235. params.update(self.configuration)
  236. if not self.connection:
  237. try:
  238. self.connection = psycopg2.connect(**params)
  239. self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
  240. self.connection.set_session(readonly=True)
  241. except OperationalError as error:
  242. return False, str(error)
  243. return True, True
  244. def check(self):
  245. if not PSYCOPG2:
  246. self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
  247. return False
  248. result, error = self._connect()
  249. if not result:
  250. conf = dict((k, (lambda k, v: v if k != 'password' else '*****')(k, v))
  251. for k, v in self.configuration.items())
  252. self.error('Failed to connect to %s. Error: %s' % (str(conf), error))
  253. return False
  254. try:
  255. cursor = self.connection.cursor()
  256. self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
  257. is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
  258. self.server_version = detect_server_version(cursor, QUERIES['DETECT_SERVER_VERSION'])
  259. cursor.close()
  260. if self.database_poll and isinstance(self.database_poll, str):
  261. self.databases = [dbase for dbase in self.databases if dbase in self.database_poll.split()]\
  262. or self.databases
  263. self.locks_zeroed = populate_lock_types(self.databases)
  264. self.add_additional_queries_(is_superuser)
  265. self.create_dynamic_charts_()
  266. return True
  267. except Exception as error:
  268. self.error(str(error))
  269. return False
  270. def add_additional_queries_(self, is_superuser):
  271. if self.index_stats:
  272. self.queries[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
  273. if self.table_stats:
  274. self.queries[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
  275. if is_superuser:
  276. self.queries[QUERIES['BGWRITER']] = METRICS['BGWRITER']
  277. if self.server_version >= 100000:
  278. wal_dir_name = 'pg_wal'
  279. else:
  280. wal_dir_name = 'pg_xlog'
  281. self.queries[QUERIES['ARCHIVE'].format(wal_dir_name)] = METRICS['ARCHIVE']
  282. def create_dynamic_charts_(self):
  283. for database_name in self.databases[::-1]:
  284. self.definitions['database_size']['lines'].append([database_name + '_size',
  285. database_name, 'absolute', 1, 1024 * 1024])
  286. for chart_name in [name for name in CHARTS if name.startswith('db_stat')]:
  287. add_database_stat_chart_(order=self.order, definitions=self.definitions,
  288. name=chart_name, database_name=database_name)
  289. add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
  290. def _get_data(self):
  291. result, error = self._connect()
  292. if result:
  293. cursor = self.connection.cursor(cursor_factory=DictCursor)
  294. try:
  295. self.data.update(self.locks_zeroed)
  296. for query, metrics in self.queries.items():
  297. self.query_stats_(cursor, query, metrics)
  298. except OperationalError:
  299. self.connection = False
  300. cursor.close()
  301. return None
  302. else:
  303. cursor.close()
  304. return self.data
  305. else:
  306. return None
  307. def query_stats_(self, cursor, query, metrics):
  308. cursor.execute(query, dict(databases=tuple(self.databases)))
  309. for row in cursor:
  310. for metric in metrics:
  311. dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric
  312. if metric in row:
  313. self.data[dimension_id] = int(row[metric])
  314. elif 'locks_count' in row:
  315. self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0
  316. def discover_databases_(cursor, query):
  317. cursor.execute(query)
  318. result = list()
  319. for db in [database[0] for database in cursor]:
  320. if db not in result:
  321. result.append(db)
  322. return result
  323. def check_if_superuser_(cursor, query):
  324. cursor.execute(query)
  325. return cursor.fetchone()[0]
  326. def detect_server_version(cursor, query):
  327. cursor.execute(query)
  328. return int(cursor.fetchone()[0])
  329. def populate_lock_types(databases):
  330. result = dict()
  331. for database in databases:
  332. for lock_type in METRICS['LOCKS']:
  333. key = '_'.join([database, lock_type])
  334. result[key] = 0
  335. return result
  336. def add_database_lock_chart_(order, definitions, database_name):
  337. def create_lines(database):
  338. result = list()
  339. for lock_type in METRICS['LOCKS']:
  340. dimension_id = '_'.join([database, lock_type])
  341. result.append([dimension_id, lock_type, 'absolute'])
  342. return result
  343. chart_name = database_name + '_locks'
  344. order.insert(-1, chart_name)
  345. definitions[chart_name] = {
  346. 'options':
  347. [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
  348. 'lines': create_lines(database_name)
  349. }
  350. def add_database_stat_chart_(order, definitions, name, database_name):
  351. def create_lines(database, lines):
  352. result = list()
  353. for line in lines:
  354. new_line = ['_'.join([database, line[0]])] + line[1:]
  355. result.append(new_line)
  356. return result
  357. chart_template = CHARTS[name]
  358. chart_name = '_'.join([database_name, name])
  359. order.insert(0, chart_name)
  360. name, title, units, family, context, chart_type = chart_template['options']
  361. definitions[chart_name] = {
  362. 'options': [name, title + ': ' + database_name, units, 'db ' + database_name, context, chart_type],
  363. 'lines': create_lines(database_name, chart_template['lines'])}