postgres.chart.py 35 KB


  1. # -*- coding: utf-8 -*-
  2. # Description: example netdata python.d module
  3. # Authors: facetoe, dangtranhoang
  4. # SPDX-License-Identifier: GPL-3.0-or-later
  5. from copy import deepcopy
  6. try:
  7. import psycopg2
  8. from psycopg2 import extensions
  9. from psycopg2.extras import DictCursor
  10. from psycopg2 import OperationalError
  11. PSYCOPG2 = True
  12. except ImportError:
  13. PSYCOPG2 = False
  14. from bases.FrameworkServices.SimpleService import SimpleService
  15. DEFAULT_PORT = 5432
  16. DEFAULT_USER = 'postgres'
  17. DEFAULT_CONNECT_TIMEOUT = 2 # seconds
  18. DEFAULT_STATEMENT_TIMEOUT = 5000 # ms
  19. CONN_PARAM_DSN = 'dsn'
  20. CONN_PARAM_HOST = 'host'
  21. CONN_PARAM_PORT = 'port'
  22. CONN_PARAM_DATABASE = 'database'
  23. CONN_PARAM_USER = 'user'
  24. CONN_PARAM_PASSWORD = 'password'
  25. CONN_PARAM_CONN_TIMEOUT = 'connect_timeout'
  26. CONN_PARAM_STATEMENT_TIMEOUT = 'statement_timeout'
  27. CONN_PARAM_SSL_MODE = 'sslmode'
  28. CONN_PARAM_SSL_ROOT_CERT = 'sslrootcert'
  29. CONN_PARAM_SSL_CRL = 'sslcrl'
  30. CONN_PARAM_SSL_CERT = 'sslcert'
  31. CONN_PARAM_SSL_KEY = 'sslkey'
  32. QUERY_NAME_WAL = 'WAL'
  33. QUERY_NAME_ARCHIVE = 'ARCHIVE'
  34. QUERY_NAME_BACKENDS = 'BACKENDS'
  35. QUERY_NAME_TABLE_STATS = 'TABLE_STATS'
  36. QUERY_NAME_INDEX_STATS = 'INDEX_STATS'
  37. QUERY_NAME_DATABASE = 'DATABASE'
  38. QUERY_NAME_BGWRITER = 'BGWRITER'
  39. QUERY_NAME_LOCKS = 'LOCKS'
  40. QUERY_NAME_DATABASES = 'DATABASES'
  41. QUERY_NAME_STANDBY = 'STANDBY'
  42. QUERY_NAME_REPLICATION_SLOT = 'REPLICATION_SLOT'
  43. QUERY_NAME_STANDBY_DELTA = 'STANDBY_DELTA'
  44. QUERY_NAME_REPSLOT_FILES = 'REPSLOT_FILES'
  45. QUERY_NAME_IF_SUPERUSER = 'IF_SUPERUSER'
  46. QUERY_NAME_SERVER_VERSION = 'SERVER_VERSION'
  47. QUERY_NAME_AUTOVACUUM = 'AUTOVACUUM'
  48. QUERY_NAME_DIFF_LSN = 'DIFF_LSN'
  49. QUERY_NAME_WAL_WRITES = 'WAL_WRITES'
  50. METRICS = {
  51. QUERY_NAME_DATABASE: [
  52. 'connections',
  53. 'xact_commit',
  54. 'xact_rollback',
  55. 'blks_read',
  56. 'blks_hit',
  57. 'tup_returned',
  58. 'tup_fetched',
  59. 'tup_inserted',
  60. 'tup_updated',
  61. 'tup_deleted',
  62. 'conflicts',
  63. 'temp_files',
  64. 'temp_bytes',
  65. 'size'
  66. ],
  67. QUERY_NAME_BACKENDS: [
  68. 'backends_active',
  69. 'backends_idle'
  70. ],
  71. QUERY_NAME_INDEX_STATS: [
  72. 'index_count',
  73. 'index_size'
  74. ],
  75. QUERY_NAME_TABLE_STATS: [
  76. 'table_size',
  77. 'table_count'
  78. ],
  79. QUERY_NAME_WAL: [
  80. 'written_wal',
  81. 'recycled_wal',
  82. 'total_wal'
  83. ],
  84. QUERY_NAME_WAL_WRITES: [
  85. 'wal_writes'
  86. ],
  87. QUERY_NAME_ARCHIVE: [
  88. 'ready_count',
  89. 'done_count',
  90. 'file_count'
  91. ],
  92. QUERY_NAME_BGWRITER: [
  93. 'checkpoint_scheduled',
  94. 'checkpoint_requested',
  95. 'buffers_checkpoint',
  96. 'buffers_clean',
  97. 'maxwritten_clean',
  98. 'buffers_backend',
  99. 'buffers_alloc',
  100. 'buffers_backend_fsync'
  101. ],
  102. QUERY_NAME_LOCKS: [
  103. 'ExclusiveLock',
  104. 'RowShareLock',
  105. 'SIReadLock',
  106. 'ShareUpdateExclusiveLock',
  107. 'AccessExclusiveLock',
  108. 'AccessShareLock',
  109. 'ShareRowExclusiveLock',
  110. 'ShareLock',
  111. 'RowExclusiveLock'
  112. ],
  113. QUERY_NAME_AUTOVACUUM: [
  114. 'analyze',
  115. 'vacuum_analyze',
  116. 'vacuum',
  117. 'vacuum_freeze',
  118. 'brin_summarize'
  119. ],
  120. QUERY_NAME_STANDBY_DELTA: [
  121. 'sent_delta',
  122. 'write_delta',
  123. 'flush_delta',
  124. 'replay_delta'
  125. ],
  126. QUERY_NAME_REPSLOT_FILES: [
  127. 'replslot_wal_keep',
  128. 'replslot_files'
  129. ]
  130. }
  131. NO_VERSION = 0
  132. DEFAULT = 'DEFAULT'
  133. V96 = 'V96'
  134. V10 = 'V10'
  135. V11 = 'V11'
  136. QUERY_WAL = {
  137. DEFAULT: """
  138. SELECT
  139. count(*) as total_wal,
  140. count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
  141. count(*) FILTER (WHERE type = 'written') AS written_wal
  142. FROM
  143. (SELECT
  144. wal.name,
  145. pg_walfile_name(
  146. CASE pg_is_in_recovery()
  147. WHEN true THEN NULL
  148. ELSE pg_current_wal_lsn()
  149. END ),
  150. CASE
  151. WHEN wal.name > pg_walfile_name(
  152. CASE pg_is_in_recovery()
  153. WHEN true THEN NULL
  154. ELSE pg_current_wal_lsn()
  155. END ) THEN 'recycled'
  156. ELSE 'written'
  157. END AS type
  158. FROM pg_catalog.pg_ls_dir('pg_wal') AS wal(name)
  159. WHERE name ~ '^[0-9A-F]{24}$'
  160. ORDER BY
  161. (pg_stat_file('pg_wal/'||name)).modification,
  162. wal.name DESC) sub;
  163. """,
  164. V96: """
  165. SELECT
  166. count(*) as total_wal,
  167. count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
  168. count(*) FILTER (WHERE type = 'written') AS written_wal
  169. FROM
  170. (SELECT
  171. wal.name,
  172. pg_xlogfile_name(
  173. CASE pg_is_in_recovery()
  174. WHEN true THEN NULL
  175. ELSE pg_current_xlog_location()
  176. END ),
  177. CASE
  178. WHEN wal.name > pg_xlogfile_name(
  179. CASE pg_is_in_recovery()
  180. WHEN true THEN NULL
  181. ELSE pg_current_xlog_location()
  182. END ) THEN 'recycled'
  183. ELSE 'written'
  184. END AS type
  185. FROM pg_catalog.pg_ls_dir('pg_xlog') AS wal(name)
  186. WHERE name ~ '^[0-9A-F]{24}$'
  187. ORDER BY
  188. (pg_stat_file('pg_xlog/'||name)).modification,
  189. wal.name DESC) sub;
  190. """,
  191. }
  192. QUERY_ARCHIVE = {
  193. DEFAULT: """
  194. SELECT
  195. CAST(COUNT(*) AS INT) AS file_count,
  196. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
  197. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
  198. FROM
  199. pg_catalog.pg_ls_dir('pg_wal/archive_status') AS archive_files (archive_file);
  200. """,
  201. V96: """
  202. SELECT
  203. CAST(COUNT(*) AS INT) AS file_count,
  204. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
  205. CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
  206. FROM
  207. pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
  208. """,
  209. }
  210. QUERY_BACKEND = {
  211. DEFAULT: """
  212. SELECT
  213. count(*) - (SELECT count(*)
  214. FROM pg_stat_activity
  215. WHERE state = 'idle')
  216. AS backends_active,
  217. (SELECT count(*)
  218. FROM pg_stat_activity
  219. WHERE state = 'idle')
  220. AS backends_idle
  221. FROM pg_stat_activity;
  222. """,
  223. }
  224. QUERY_TABLE_STATS = {
  225. DEFAULT: """
  226. SELECT
  227. ((sum(relpages) * 8) * 1024) AS table_size,
  228. count(1) AS table_count
  229. FROM pg_class
  230. WHERE relkind IN ('r', 't');
  231. """,
  232. }
  233. QUERY_INDEX_STATS = {
  234. DEFAULT: """
  235. SELECT
  236. ((sum(relpages) * 8) * 1024) AS index_size,
  237. count(1) AS index_count
  238. FROM pg_class
  239. WHERE relkind = 'i';
  240. """,
  241. }
  242. QUERY_DATABASE = {
  243. DEFAULT: """
  244. SELECT
  245. datname AS database_name,
  246. numbackends AS connections,
  247. xact_commit AS xact_commit,
  248. xact_rollback AS xact_rollback,
  249. blks_read AS blks_read,
  250. blks_hit AS blks_hit,
  251. tup_returned AS tup_returned,
  252. tup_fetched AS tup_fetched,
  253. tup_inserted AS tup_inserted,
  254. tup_updated AS tup_updated,
  255. tup_deleted AS tup_deleted,
  256. conflicts AS conflicts,
  257. pg_database_size(datname) AS size,
  258. temp_files AS temp_files,
  259. temp_bytes AS temp_bytes
  260. FROM pg_stat_database
  261. WHERE datname IN %(databases)s ;
  262. """,
  263. }
  264. QUERY_BGWRITER = {
  265. DEFAULT: """
  266. SELECT
  267. checkpoints_timed AS checkpoint_scheduled,
  268. checkpoints_req AS checkpoint_requested,
  269. buffers_checkpoint * current_setting('block_size')::numeric buffers_checkpoint,
  270. buffers_clean * current_setting('block_size')::numeric buffers_clean,
  271. maxwritten_clean,
  272. buffers_backend * current_setting('block_size')::numeric buffers_backend,
  273. buffers_alloc * current_setting('block_size')::numeric buffers_alloc,
  274. buffers_backend_fsync
  275. FROM pg_stat_bgwriter;
  276. """,
  277. }
  278. QUERY_LOCKS = {
  279. DEFAULT: """
  280. SELECT
  281. pg_database.datname as database_name,
  282. mode,
  283. count(mode) AS locks_count
  284. FROM pg_locks
  285. INNER JOIN pg_database
  286. ON pg_database.oid = pg_locks.database
  287. GROUP BY datname, mode
  288. ORDER BY datname, mode;
  289. """,
  290. }
  291. QUERY_DATABASES = {
  292. DEFAULT: """
  293. SELECT
  294. datname
  295. FROM pg_stat_database
  296. WHERE
  297. has_database_privilege(
  298. (SELECT current_user), datname, 'connect')
  299. AND NOT datname ~* '^template\d ';
  300. """,
  301. }
  302. QUERY_STANDBY = {
  303. DEFAULT: """
  304. SELECT
  305. application_name
  306. FROM pg_stat_replication
  307. WHERE application_name IS NOT NULL
  308. GROUP BY application_name;
  309. """,
  310. }
  311. QUERY_REPLICATION_SLOT = {
  312. DEFAULT: """
  313. SELECT slot_name
  314. FROM pg_replication_slots;
  315. """
  316. }
  317. QUERY_STANDBY_DELTA = {
  318. DEFAULT: """
  319. SELECT
  320. application_name,
  321. pg_wal_lsn_diff(
  322. CASE pg_is_in_recovery()
  323. WHEN true THEN pg_last_wal_receive_lsn()
  324. ELSE pg_current_wal_lsn()
  325. END,
  326. sent_lsn) AS sent_delta,
  327. pg_wal_lsn_diff(
  328. CASE pg_is_in_recovery()
  329. WHEN true THEN pg_last_wal_receive_lsn()
  330. ELSE pg_current_wal_lsn()
  331. END,
  332. write_lsn) AS write_delta,
  333. pg_wal_lsn_diff(
  334. CASE pg_is_in_recovery()
  335. WHEN true THEN pg_last_wal_receive_lsn()
  336. ELSE pg_current_wal_lsn()
  337. END,
  338. flush_lsn) AS flush_delta,
  339. pg_wal_lsn_diff(
  340. CASE pg_is_in_recovery()
  341. WHEN true THEN pg_last_wal_receive_lsn()
  342. ELSE pg_current_wal_lsn()
  343. END,
  344. replay_lsn) AS replay_delta
  345. FROM pg_stat_replication
  346. WHERE application_name IS NOT NULL;
  347. """,
  348. V96: """
  349. SELECT
  350. application_name,
  351. pg_xlog_location_diff(
  352. CASE pg_is_in_recovery()
  353. WHEN true THEN pg_last_xlog_receive_location()
  354. ELSE pg_current_xlog_location()
  355. END,
  356. sent_location) AS sent_delta,
  357. pg_xlog_location_diff(
  358. CASE pg_is_in_recovery()
  359. WHEN true THEN pg_last_xlog_receive_location()
  360. ELSE pg_current_xlog_location()
  361. END,
  362. write_location) AS write_delta,
  363. pg_xlog_location_diff(
  364. CASE pg_is_in_recovery()
  365. WHEN true THEN pg_last_xlog_receive_location()
  366. ELSE pg_current_xlog_location()
  367. END,
  368. flush_location) AS flush_delta,
  369. pg_xlog_location_diff(
  370. CASE pg_is_in_recovery()
  371. WHEN true THEN pg_last_xlog_receive_location()
  372. ELSE pg_current_xlog_location()
  373. END,
  374. replay_location) AS replay_delta
  375. FROM pg_stat_replication
  376. WHERE application_name IS NOT NULL;
  377. """,
  378. }
  379. QUERY_REPSLOT_FILES = {
  380. DEFAULT: """
  381. WITH wal_size AS (
  382. SELECT
  383. setting::int AS val
  384. FROM pg_settings
  385. WHERE name = 'wal_segment_size'
  386. )
  387. SELECT
  388. slot_name,
  389. slot_type,
  390. replslot_wal_keep,
  391. count(slot_file) AS replslot_files
  392. FROM
  393. (SELECT
  394. slot.slot_name,
  395. CASE
  396. WHEN slot_file <> 'state' THEN 1
  397. END AS slot_file ,
  398. slot_type,
  399. COALESCE (
  400. floor(
  401. (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
  402. - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
  403. ),0) AS replslot_wal_keep
  404. FROM pg_replication_slots slot
  405. LEFT JOIN (
  406. SELECT
  407. slot2.slot_name,
  408. pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
  409. FROM pg_replication_slots slot2
  410. ) files (slot_name, slot_file)
  411. ON slot.slot_name = files.slot_name
  412. CROSS JOIN wal_size s
  413. ) AS d
  414. GROUP BY
  415. slot_name,
  416. slot_type,
  417. replslot_wal_keep;
  418. """,
  419. V10: """
  420. WITH wal_size AS (
  421. SELECT
  422. current_setting('wal_block_size')::INT * setting::INT AS val
  423. FROM pg_settings
  424. WHERE name = 'wal_segment_size'
  425. )
  426. SELECT
  427. slot_name,
  428. slot_type,
  429. replslot_wal_keep,
  430. count(slot_file) AS replslot_files
  431. FROM
  432. (SELECT
  433. slot.slot_name,
  434. CASE
  435. WHEN slot_file <> 'state' THEN 1
  436. END AS slot_file ,
  437. slot_type,
  438. COALESCE (
  439. floor(
  440. (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
  441. - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
  442. ),0) AS replslot_wal_keep
  443. FROM pg_replication_slots slot
  444. LEFT JOIN (
  445. SELECT
  446. slot2.slot_name,
  447. pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
  448. FROM pg_replication_slots slot2
  449. ) files (slot_name, slot_file)
  450. ON slot.slot_name = files.slot_name
  451. CROSS JOIN wal_size s
  452. ) AS d
  453. GROUP BY
  454. slot_name,
  455. slot_type,
  456. replslot_wal_keep;
  457. """,
  458. }
  459. QUERY_SUPERUSER = {
  460. DEFAULT: """
  461. SELECT current_setting('is_superuser') = 'on' AS is_superuser;
  462. """,
  463. }
  464. QUERY_SHOW_VERSION = {
  465. DEFAULT: """
  466. SHOW server_version_num;
  467. """,
  468. }
  469. QUERY_AUTOVACUUM = {
  470. DEFAULT: """
  471. SELECT
  472. count(*) FILTER (WHERE query LIKE 'autovacuum: ANALYZE%%') AS analyze,
  473. count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM ANALYZE%%') AS vacuum_analyze,
  474. count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM%%'
  475. AND query NOT LIKE 'autovacuum: VACUUM ANALYZE%%'
  476. AND query NOT LIKE '%%to prevent wraparound%%') AS vacuum,
  477. count(*) FILTER (WHERE query LIKE '%%to prevent wraparound%%') AS vacuum_freeze,
  478. count(*) FILTER (WHERE query LIKE 'autovacuum: BRIN summarize%%') AS brin_summarize
  479. FROM pg_stat_activity
  480. WHERE query NOT LIKE '%%pg_stat_activity%%';
  481. """,
  482. }
  483. QUERY_DIFF_LSN = {
  484. DEFAULT: """
  485. SELECT
  486. pg_wal_lsn_diff(
  487. CASE pg_is_in_recovery()
  488. WHEN true THEN pg_last_wal_receive_lsn()
  489. ELSE pg_current_wal_lsn()
  490. END,
  491. '0/0') as wal_writes ;
  492. """,
  493. V96: """
  494. SELECT
  495. pg_xlog_location_diff(
  496. CASE pg_is_in_recovery()
  497. WHEN true THEN pg_last_xlog_receive_location()
  498. ELSE pg_current_xlog_location()
  499. END,
  500. '0/0') as wal_writes ;
  501. """,
  502. }
  503. def query_factory(name, version=NO_VERSION):
  504. if name == QUERY_NAME_BACKENDS:
  505. return QUERY_BACKEND[DEFAULT]
  506. elif name == QUERY_NAME_TABLE_STATS:
  507. return QUERY_TABLE_STATS[DEFAULT]
  508. elif name == QUERY_NAME_INDEX_STATS:
  509. return QUERY_INDEX_STATS[DEFAULT]
  510. elif name == QUERY_NAME_DATABASE:
  511. return QUERY_DATABASE[DEFAULT]
  512. elif name == QUERY_NAME_BGWRITER:
  513. return QUERY_BGWRITER[DEFAULT]
  514. elif name == QUERY_NAME_LOCKS:
  515. return QUERY_LOCKS[DEFAULT]
  516. elif name == QUERY_NAME_DATABASES:
  517. return QUERY_DATABASES[DEFAULT]
  518. elif name == QUERY_NAME_STANDBY:
  519. return QUERY_STANDBY[DEFAULT]
  520. elif name == QUERY_NAME_REPLICATION_SLOT:
  521. return QUERY_REPLICATION_SLOT[DEFAULT]
  522. elif name == QUERY_NAME_IF_SUPERUSER:
  523. return QUERY_SUPERUSER[DEFAULT]
  524. elif name == QUERY_NAME_SERVER_VERSION:
  525. return QUERY_SHOW_VERSION[DEFAULT]
  526. elif name == QUERY_NAME_AUTOVACUUM:
  527. return QUERY_AUTOVACUUM[DEFAULT]
  528. elif name == QUERY_NAME_WAL:
  529. if version < 100000:
  530. return QUERY_WAL[V96]
  531. return QUERY_WAL[DEFAULT]
  532. elif name == QUERY_NAME_ARCHIVE:
  533. if version < 100000:
  534. return QUERY_ARCHIVE[V96]
  535. return QUERY_ARCHIVE[DEFAULT]
  536. elif name == QUERY_NAME_STANDBY_DELTA:
  537. if version < 100000:
  538. return QUERY_STANDBY_DELTA[V96]
  539. return QUERY_STANDBY_DELTA[DEFAULT]
  540. elif name == QUERY_NAME_REPSLOT_FILES:
  541. if version < 110000:
  542. return QUERY_REPSLOT_FILES[V10]
  543. return QUERY_REPSLOT_FILES[DEFAULT]
  544. elif name == QUERY_NAME_DIFF_LSN:
  545. if version < 100000:
  546. return QUERY_DIFF_LSN[V96]
  547. return QUERY_DIFF_LSN[DEFAULT]
  548. raise ValueError('unknown query')
  549. ORDER = [
  550. 'db_stat_temp_files',
  551. 'db_stat_temp_bytes',
  552. 'db_stat_blks',
  553. 'db_stat_tuple_returned',
  554. 'db_stat_tuple_write',
  555. 'db_stat_transactions',
  556. 'db_stat_connections',
  557. 'database_size',
  558. 'backend_process',
  559. 'index_count',
  560. 'index_size',
  561. 'table_count',
  562. 'table_size',
  563. 'wal',
  564. 'wal_writes',
  565. 'archive_wal',
  566. 'checkpointer',
  567. 'stat_bgwriter_alloc',
  568. 'stat_bgwriter_checkpoint',
  569. 'stat_bgwriter_backend',
  570. 'stat_bgwriter_backend_fsync',
  571. 'stat_bgwriter_bgwriter',
  572. 'stat_bgwriter_maxwritten',
  573. 'replication_slot',
  574. 'standby_delta',
  575. 'autovacuum'
  576. ]
  577. CHARTS = {
  578. 'db_stat_transactions': {
  579. 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
  580. 'line'],
  581. 'lines': [
  582. ['xact_commit', 'committed', 'incremental'],
  583. ['xact_rollback', 'rolled back', 'incremental']
  584. ]
  585. },
  586. 'db_stat_connections': {
  587. 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
  588. 'line'],
  589. 'lines': [
  590. ['connections', 'connections', 'absolute']
  591. ]
  592. },
  593. 'db_stat_blks': {
  594. 'options': [None, 'Disk blocks reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_blks', 'line'],
  595. 'lines': [
  596. ['blks_read', 'disk', 'incremental'],
  597. ['blks_hit', 'cache', 'incremental']
  598. ]
  599. },
  600. 'db_stat_tuple_returned': {
  601. 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
  602. 'line'],
  603. 'lines': [
  604. ['tup_returned', 'sequential', 'incremental'],
  605. ['tup_fetched', 'bitmap', 'incremental']
  606. ]
  607. },
  608. 'db_stat_tuple_write': {
  609. 'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
  610. 'lines': [
  611. ['tup_inserted', 'inserted', 'incremental'],
  612. ['tup_updated', 'updated', 'incremental'],
  613. ['tup_deleted', 'deleted', 'incremental'],
  614. ['conflicts', 'conflicts', 'incremental']
  615. ]
  616. },
  617. 'db_stat_temp_bytes': {
  618. 'options': [None, 'Temp files written to disk', 'KiB/s', 'db statistics', 'postgres.db_stat_temp_bytes',
  619. 'line'],
  620. 'lines': [
  621. ['temp_bytes', 'size', 'incremental', 1, 1024]
  622. ]
  623. },
  624. 'db_stat_temp_files': {
  625. 'options': [None, 'Temp files written to disk', 'files', 'db statistics', 'postgres.db_stat_temp_files',
  626. 'line'],
  627. 'lines': [
  628. ['temp_files', 'files', 'incremental']
  629. ]
  630. },
  631. 'database_size': {
  632. 'options': [None, 'Database size', 'MiB', 'database size', 'postgres.db_size', 'stacked'],
  633. 'lines': [
  634. ]
  635. },
  636. 'backend_process': {
  637. 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
  638. 'line'],
  639. 'lines': [
  640. ['backends_active', 'active', 'absolute'],
  641. ['backends_idle', 'idle', 'absolute']
  642. ]
  643. },
  644. 'index_count': {
  645. 'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
  646. 'lines': [
  647. ['index_count', 'total', 'absolute']
  648. ]
  649. },
  650. 'index_size': {
  651. 'options': [None, 'Indexes size', 'MiB', 'indexes', 'postgres.index_size', 'line'],
  652. 'lines': [
  653. ['index_size', 'size', 'absolute', 1, 1024 * 1024]
  654. ]
  655. },
  656. 'table_count': {
  657. 'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
  658. 'lines': [
  659. ['table_count', 'total', 'absolute']
  660. ]
  661. },
  662. 'table_size': {
  663. 'options': [None, 'Tables size', 'MiB', 'tables', 'postgres.table_size', 'line'],
  664. 'lines': [
  665. ['table_size', 'size', 'absolute', 1, 1024 * 1024]
  666. ]
  667. },
  668. 'wal': {
  669. 'options': [None, 'Write-Ahead Logs', 'files', 'wal', 'postgres.wal', 'line'],
  670. 'lines': [
  671. ['written_wal', 'written', 'absolute'],
  672. ['recycled_wal', 'recycled', 'absolute'],
  673. ['total_wal', 'total', 'absolute']
  674. ]
  675. },
  676. 'wal_writes': {
  677. 'options': [None, 'Write-Ahead Logs', 'KiB/s', 'wal_writes', 'postgres.wal_writes', 'line'],
  678. 'lines': [
  679. ['wal_writes', 'writes', 'incremental', 1, 1024]
  680. ]
  681. },
  682. 'archive_wal': {
  683. 'options': [None, 'Archive Write-Ahead Logs', 'files/s', 'archive wal', 'postgres.archive_wal', 'line'],
  684. 'lines': [
  685. ['file_count', 'total', 'incremental'],
  686. ['ready_count', 'ready', 'incremental'],
  687. ['done_count', 'done', 'incremental']
  688. ]
  689. },
  690. 'checkpointer': {
  691. 'options': [None, 'Checkpoints', 'writes', 'checkpointer', 'postgres.checkpointer', 'line'],
  692. 'lines': [
  693. ['checkpoint_scheduled', 'scheduled', 'incremental'],
  694. ['checkpoint_requested', 'requested', 'incremental']
  695. ]
  696. },
  697. 'stat_bgwriter_alloc': {
  698. 'options': [None, 'Buffers allocated', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'],
  699. 'lines': [
  700. ['buffers_alloc', 'alloc', 'incremental', 1, 1024]
  701. ]
  702. },
  703. 'stat_bgwriter_checkpoint': {
  704. 'options': [None, 'Buffers written during checkpoints', 'KiB/s', 'bgwriter',
  705. 'postgres.stat_bgwriter_checkpoint', 'line'],
  706. 'lines': [
  707. ['buffers_checkpoint', 'checkpoint', 'incremental', 1, 1024]
  708. ]
  709. },
  710. 'stat_bgwriter_backend': {
  711. 'options': [None, 'Buffers written directly by a backend', 'KiB/s', 'bgwriter',
  712. 'postgres.stat_bgwriter_backend', 'line'],
  713. 'lines': [
  714. ['buffers_backend', 'backend', 'incremental', 1, 1024]
  715. ]
  716. },
  717. 'stat_bgwriter_backend_fsync': {
  718. 'options': [None, 'Fsync by backend', 'times', 'bgwriter', 'postgres.stat_bgwriter_backend_fsync', 'line'],
  719. 'lines': [
  720. ['buffers_backend_fsync', 'backend fsync', 'incremental']
  721. ]
  722. },
  723. 'stat_bgwriter_bgwriter': {
  724. 'options': [None, 'Buffers written by the background writer', 'KiB/s', 'bgwriter',
  725. 'postgres.bgwriter_bgwriter', 'line'],
  726. 'lines': [
  727. ['buffers_clean', 'clean', 'incremental', 1, 1024]
  728. ]
  729. },
  730. 'stat_bgwriter_maxwritten': {
  731. 'options': [None, 'Too many buffers written', 'times', 'bgwriter', 'postgres.stat_bgwriter_maxwritten',
  732. 'line'],
  733. 'lines': [
  734. ['maxwritten_clean', 'maxwritten', 'incremental']
  735. ]
  736. },
  737. 'autovacuum': {
  738. 'options': [None, 'Autovacuum workers', 'workers', 'autovacuum', 'postgres.autovacuum', 'line'],
  739. 'lines': [
  740. ['analyze', 'analyze', 'absolute'],
  741. ['vacuum', 'vacuum', 'absolute'],
  742. ['vacuum_analyze', 'vacuum analyze', 'absolute'],
  743. ['vacuum_freeze', 'vacuum freeze', 'absolute'],
  744. ['brin_summarize', 'brin summarize', 'absolute']
  745. ]
  746. },
  747. 'standby_delta': {
  748. 'options': [None, 'Standby delta', 'KiB', 'replication delta', 'postgres.standby_delta', 'line'],
  749. 'lines': [
  750. ['sent_delta', 'sent delta', 'absolute', 1, 1024],
  751. ['write_delta', 'write delta', 'absolute', 1, 1024],
  752. ['flush_delta', 'flush delta', 'absolute', 1, 1024],
  753. ['replay_delta', 'replay delta', 'absolute', 1, 1024]
  754. ]
  755. },
  756. 'replication_slot': {
  757. 'options': [None, 'Replication slot files', 'files', 'replication slot', 'postgres.replication_slot', 'line'],
  758. 'lines': [
  759. ['replslot_wal_keep', 'wal keeped', 'absolute'],
  760. ['replslot_files', 'pg_replslot files', 'absolute']
  761. ]
  762. }
  763. }
  764. class Service(SimpleService):
  765. def __init__(self, configuration=None, name=None):
  766. SimpleService.__init__(self, configuration=configuration, name=name)
  767. self.order = list(ORDER)
  768. self.definitions = deepcopy(CHARTS)
  769. self.do_table_stats = configuration.pop('table_stats', False)
  770. self.do_index_stats = configuration.pop('index_stats', False)
  771. self.databases_to_poll = configuration.pop('database_poll', None)
  772. self.configuration = configuration
  773. self.conn = None
  774. self.conn_params = dict()
  775. self.server_version = None
  776. self.is_superuser = False
  777. self.alive = False
  778. self.databases = list()
  779. self.secondaries = list()
  780. self.replication_slots = list()
  781. self.queries = dict()
  782. self.data = dict()
  783. def reconnect(self):
  784. return self.connect()
  785. def build_conn_params(self):
  786. conf = self.configuration
  787. # connection URIs: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
  788. if conf.get(CONN_PARAM_DSN):
  789. return {'dsn': conf[CONN_PARAM_DSN]}
  790. params = {
  791. CONN_PARAM_HOST: conf.get(CONN_PARAM_HOST),
  792. CONN_PARAM_PORT: conf.get(CONN_PARAM_PORT, DEFAULT_PORT),
  793. CONN_PARAM_DATABASE: conf.get(CONN_PARAM_DATABASE),
  794. CONN_PARAM_USER: conf.get(CONN_PARAM_USER, DEFAULT_USER),
  795. CONN_PARAM_PASSWORD: conf.get(CONN_PARAM_PASSWORD),
  796. CONN_PARAM_CONN_TIMEOUT: conf.get(CONN_PARAM_CONN_TIMEOUT, DEFAULT_CONNECT_TIMEOUT),
  797. 'options': '-c statement_timeout={0}'.format(
  798. conf.get(CONN_PARAM_STATEMENT_TIMEOUT, DEFAULT_STATEMENT_TIMEOUT)),
  799. }
  800. # https://www.postgresql.org/docs/current/libpq-ssl.html
  801. ssl_params = dict(
  802. (k, v) for k, v in {
  803. CONN_PARAM_SSL_MODE: conf.get(CONN_PARAM_SSL_MODE),
  804. CONN_PARAM_SSL_ROOT_CERT: conf.get(CONN_PARAM_SSL_ROOT_CERT),
  805. CONN_PARAM_SSL_CRL: conf.get(CONN_PARAM_SSL_CRL),
  806. CONN_PARAM_SSL_CERT: conf.get(CONN_PARAM_SSL_CERT),
  807. CONN_PARAM_SSL_KEY: conf.get(CONN_PARAM_SSL_KEY),
  808. }.items() if v)
  809. if CONN_PARAM_SSL_MODE not in ssl_params and len(ssl_params) > 0:
  810. raise ValueError("mandatory 'sslmode' param is missing, please set")
  811. params.update(ssl_params)
  812. return params
  813. def connect(self):
  814. if self.conn:
  815. self.conn.close()
  816. self.conn = None
  817. try:
  818. self.conn = psycopg2.connect(**self.conn_params)
  819. self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
  820. self.conn.set_session(readonly=True)
  821. except OperationalError as error:
  822. self.error(error)
  823. self.alive = False
  824. else:
  825. self.alive = True
  826. return self.alive
  827. def check(self):
  828. if not PSYCOPG2:
  829. self.error("'python-psycopg2' package is needed to use postgres module")
  830. return False
  831. try:
  832. self.conn_params = self.build_conn_params()
  833. except ValueError as error:
  834. self.error('error on creating connection params : {0}', error)
  835. return False
  836. if not self.connect():
  837. self.error('failed to connect to {0}'.format(hide_password(self.conn_params)))
  838. return False
  839. try:
  840. self.check_queries()
  841. except Exception as error:
  842. self.error(error)
  843. return False
  844. self.populate_queries()
  845. self.create_dynamic_charts()
  846. return True
  847. def get_data(self):
  848. if not self.alive and not self.reconnect():
  849. return None
  850. try:
  851. cursor = self.conn.cursor(cursor_factory=DictCursor)
  852. self.data.update(zero_lock_types(self.databases))
  853. for query, metrics in self.queries.items():
  854. self.query_stats(cursor, query, metrics)
  855. except OperationalError:
  856. self.alive = False
  857. return None
  858. cursor.close()
  859. return self.data
  860. def query_stats(self, cursor, query, metrics):
  861. cursor.execute(query, dict(databases=tuple(self.databases)))
  862. for row in cursor:
  863. for metric in metrics:
  864. # databases
  865. if 'database_name' in row:
  866. dimension_id = '_'.join([row['database_name'], metric])
  867. # secondaries
  868. elif 'application_name' in row:
  869. dimension_id = '_'.join([row['application_name'], metric])
  870. # replication slots
  871. elif 'slot_name' in row:
  872. dimension_id = '_'.join([row['slot_name'], metric])
  873. # other
  874. else:
  875. dimension_id = metric
  876. if metric in row:
  877. if row[metric] is not None:
  878. self.data[dimension_id] = int(row[metric])
  879. elif 'locks_count' in row:
  880. if metric == row['mode']:
  881. self.data[dimension_id] = row['locks_count']
  882. def check_queries(self):
  883. cursor = self.conn.cursor()
  884. self.server_version = detect_server_version(cursor, query_factory(QUERY_NAME_SERVER_VERSION))
  885. self.debug('server version: {0}'.format(self.server_version))
  886. self.is_superuser = check_if_superuser(cursor, query_factory(QUERY_NAME_IF_SUPERUSER))
  887. self.debug('superuser: {0}'.format(self.is_superuser))
  888. self.databases = discover(cursor, query_factory(QUERY_NAME_DATABASES))
  889. self.debug('discovered databases {0}'.format(self.databases))
  890. if self.databases_to_poll:
  891. to_poll = self.databases_to_poll.split()
  892. self.databases = [db for db in self.databases if db in to_poll] or self.databases
  893. self.secondaries = discover(cursor, query_factory(QUERY_NAME_STANDBY))
  894. self.debug('discovered secondaries: {0}'.format(self.secondaries))
  895. if self.server_version >= 94000:
  896. self.replication_slots = discover(cursor, query_factory(QUERY_NAME_REPLICATION_SLOT))
  897. self.debug('discovered replication slots: {0}'.format(self.replication_slots))
  898. cursor.close()
  899. def populate_queries(self):
  900. self.queries[query_factory(QUERY_NAME_DATABASE)] = METRICS[QUERY_NAME_DATABASE]
  901. self.queries[query_factory(QUERY_NAME_BACKENDS)] = METRICS[QUERY_NAME_BACKENDS]
  902. self.queries[query_factory(QUERY_NAME_LOCKS)] = METRICS[QUERY_NAME_LOCKS]
  903. self.queries[query_factory(QUERY_NAME_BGWRITER)] = METRICS[QUERY_NAME_BGWRITER]
  904. self.queries[query_factory(QUERY_NAME_DIFF_LSN, self.server_version)] = METRICS[QUERY_NAME_WAL_WRITES]
  905. self.queries[query_factory(QUERY_NAME_STANDBY_DELTA, self.server_version)] = METRICS[QUERY_NAME_STANDBY_DELTA]
  906. if self.do_index_stats:
  907. self.queries[query_factory(QUERY_NAME_INDEX_STATS)] = METRICS[QUERY_NAME_INDEX_STATS]
  908. if self.do_table_stats:
  909. self.queries[query_factory(QUERY_NAME_TABLE_STATS)] = METRICS[QUERY_NAME_TABLE_STATS]
  910. if self.is_superuser:
  911. self.queries[query_factory(QUERY_NAME_ARCHIVE, self.server_version)] = METRICS[QUERY_NAME_ARCHIVE]
  912. if self.server_version >= 90400:
  913. self.queries[query_factory(QUERY_NAME_WAL, self.server_version)] = METRICS[QUERY_NAME_WAL]
  914. if self.server_version >= 100000:
  915. self.queries[query_factory(QUERY_NAME_REPSLOT_FILES, self.server_version)] = METRICS[QUERY_NAME_REPSLOT_FILES]
  916. if self.server_version >= 90400:
  917. self.queries[query_factory(QUERY_NAME_AUTOVACUUM)] = METRICS[QUERY_NAME_AUTOVACUUM]
  918. def create_dynamic_charts(self):
  919. for database_name in self.databases[::-1]:
  920. dim = [
  921. database_name + '_size',
  922. database_name,
  923. 'absolute',
  924. 1,
  925. 1024 * 1024,
  926. ]
  927. self.definitions['database_size']['lines'].append(dim)
  928. for chart_name in [name for name in self.order if name.startswith('db_stat')]:
  929. add_database_stat_chart(
  930. order=self.order,
  931. definitions=self.definitions,
  932. name=chart_name,
  933. database_name=database_name,
  934. )
  935. add_database_lock_chart(
  936. order=self.order,
  937. definitions=self.definitions,
  938. database_name=database_name,
  939. )
  940. for application_name in self.secondaries[::-1]:
  941. add_replication_delta_chart(
  942. order=self.order,
  943. definitions=self.definitions,
  944. name='standby_delta',
  945. application_name=application_name,
  946. )
  947. for slot_name in self.replication_slots[::-1]:
  948. add_replication_slot_chart(
  949. order=self.order,
  950. definitions=self.definitions,
  951. name='replication_slot',
  952. slot_name=slot_name,
  953. )
  954. def discover(cursor, query):
  955. cursor.execute(query)
  956. result = list()
  957. for v in [value[0] for value in cursor]:
  958. if v not in result:
  959. result.append(v)
  960. return result
  961. def check_if_superuser(cursor, query):
  962. cursor.execute(query)
  963. return cursor.fetchone()[0]
  964. def detect_server_version(cursor, query):
  965. cursor.execute(query)
  966. return int(cursor.fetchone()[0])
  967. def zero_lock_types(databases):
  968. result = dict()
  969. for database in databases:
  970. for lock_type in METRICS['LOCKS']:
  971. key = '_'.join([database, lock_type])
  972. result[key] = 0
  973. return result
  974. def hide_password(config):
  975. return dict((k, v if k != 'password' else '*****') for k, v in config.items())
  976. def add_database_lock_chart(order, definitions, database_name):
  977. def create_lines(database):
  978. result = list()
  979. for lock_type in METRICS['LOCKS']:
  980. dimension_id = '_'.join([database, lock_type])
  981. result.append([dimension_id, lock_type, 'absolute'])
  982. return result
  983. chart_name = database_name + '_locks'
  984. order.insert(-1, chart_name)
  985. definitions[chart_name] = {
  986. 'options':
  987. [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
  988. 'lines': create_lines(database_name)
  989. }
  990. def add_database_stat_chart(order, definitions, name, database_name):
  991. def create_lines(database, lines):
  992. result = list()
  993. for line in lines:
  994. new_line = ['_'.join([database, line[0]])] + line[1:]
  995. result.append(new_line)
  996. return result
  997. chart_template = CHARTS[name]
  998. chart_name = '_'.join([database_name, name])
  999. order.insert(0, chart_name)
  1000. name, title, units, _, context, chart_type = chart_template['options']
  1001. definitions[chart_name] = {
  1002. 'options': [name, title + ': ' + database_name, units, 'db ' + database_name, context, chart_type],
  1003. 'lines': create_lines(database_name, chart_template['lines'])}
  1004. def add_replication_delta_chart(order, definitions, name, application_name):
  1005. def create_lines(standby, lines):
  1006. result = list()
  1007. for line in lines:
  1008. new_line = ['_'.join([standby, line[0]])] + line[1:]
  1009. result.append(new_line)
  1010. return result
  1011. chart_template = CHARTS[name]
  1012. chart_name = '_'.join([application_name, name])
  1013. position = order.index('database_size')
  1014. order.insert(position, chart_name)
  1015. name, title, units, _, context, chart_type = chart_template['options']
  1016. definitions[chart_name] = {
  1017. 'options': [name, title + ': ' + application_name, units, 'replication delta', context, chart_type],
  1018. 'lines': create_lines(application_name, chart_template['lines'])}
  1019. def add_replication_slot_chart(order, definitions, name, slot_name):
  1020. def create_lines(slot, lines):
  1021. result = list()
  1022. for line in lines:
  1023. new_line = ['_'.join([slot, line[0]])] + line[1:]
  1024. result.append(new_line)
  1025. return result
  1026. chart_template = CHARTS[name]
  1027. chart_name = '_'.join([slot_name, name])
  1028. position = order.index('database_size')
  1029. order.insert(position, chart_name)
  1030. name, title, units, _, context, chart_type = chart_template['options']
  1031. definitions[chart_name] = {
  1032. 'options': [name, title + ': ' + slot_name, units, 'replication slot files', context, chart_type],
  1033. 'lines': create_lines(slot_name, chart_template['lines'])}