introspection.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. from dataclasses import dataclass
  2. from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
  3. from django.db.backends.postgresql.introspection import ( # type: ignore[import]
  4. DatabaseIntrospection,
  5. )
  6. from psqlextra.types import PostgresPartitioningMethod
  7. from . import base_impl
  8. PARTITIONING_STRATEGY_TO_METHOD = {
  9. "r": PostgresPartitioningMethod.RANGE,
  10. "l": PostgresPartitioningMethod.LIST,
  11. "h": PostgresPartitioningMethod.HASH,
  12. }
  13. @dataclass
  14. class PostgresIntrospectedPartitionTable:
  15. """Data container for information about a partition."""
  16. name: str
  17. full_name: str
  18. comment: Optional[str]
  19. @dataclass
  20. class PostgresIntrospectedPartitonedTable:
  21. """Data container for information about a partitioned table."""
  22. name: str
  23. method: PostgresPartitioningMethod
  24. key: List[str]
  25. partitions: List[PostgresIntrospectedPartitionTable]
  26. def partition_by_name(
  27. self, name: str
  28. ) -> Optional[PostgresIntrospectedPartitionTable]:
  29. """Finds the partition with the specified name."""
  30. return next(
  31. (
  32. partition
  33. for partition in self.partitions
  34. if partition.name == name
  35. ),
  36. None,
  37. )
  38. if TYPE_CHECKING:
  39. class Introspection(DatabaseIntrospection):
  40. pass
  41. else:
  42. Introspection = base_impl.introspection()
  43. class PostgresIntrospection(Introspection):
  44. """Adds introspection features specific to PostgreSQL."""
  45. # TODO: This class is a mess, both here and in the
  46. # the base.
  47. #
  48. # Some methods return untyped dicts, some named tuples,
  49. # some flat lists of strings. It's horribly inconsistent.
  50. #
  51. # Most methods are poorly named. For example; `get_table_description`
  52. # does not return a complete table description. It merely returns
  53. # the columns.
  54. #
  55. # We do our best in this class to stay consistent with
  56. # the base in Django by respecting its naming scheme
  57. # and commonly used return types. Creating an API that
  58. # matches the look&feel from the Django base class
  59. # is more important than fixing those issues.
  60. def get_partitioned_tables(
  61. self, cursor
  62. ) -> List[PostgresIntrospectedPartitonedTable]:
  63. """Gets a list of partitioned tables."""
  64. cursor.execute(
  65. """
  66. SELECT
  67. pg_class.relname,
  68. pg_partitioned_table.partstrat
  69. FROM
  70. pg_partitioned_table
  71. JOIN
  72. pg_class
  73. ON
  74. pg_class.oid = pg_partitioned_table.partrelid
  75. """
  76. )
  77. return [
  78. PostgresIntrospectedPartitonedTable(
  79. name=row[0],
  80. method=PARTITIONING_STRATEGY_TO_METHOD[row[1]],
  81. key=self.get_partition_key(cursor, row[0]),
  82. partitions=self.get_partitions(cursor, row[0]),
  83. )
  84. for row in cursor.fetchall()
  85. ]
  86. def get_partitioned_table(self, cursor, table_name: str):
  87. """Gets a single partitioned table."""
  88. return next(
  89. (
  90. table
  91. for table in self.get_partitioned_tables(cursor)
  92. if table.name == table_name
  93. ),
  94. None,
  95. )
  96. def get_partitions(
  97. self, cursor, table_name
  98. ) -> List[PostgresIntrospectedPartitionTable]:
  99. """Gets a list of partitions belonging to the specified partitioned
  100. table."""
  101. sql = """
  102. SELECT
  103. child.relname,
  104. pg_description.description
  105. FROM pg_inherits
  106. JOIN
  107. pg_class parent
  108. ON
  109. pg_inherits.inhparent = parent.oid
  110. JOIN
  111. pg_class child
  112. ON
  113. pg_inherits.inhrelid = child.oid
  114. JOIN
  115. pg_namespace nmsp_parent
  116. ON
  117. nmsp_parent.oid = parent.relnamespace
  118. JOIN
  119. pg_namespace nmsp_child
  120. ON
  121. nmsp_child.oid = child.relnamespace
  122. LEFT JOIN
  123. pg_description
  124. ON
  125. pg_description.objoid = child.oid
  126. WHERE
  127. parent.relname = %s
  128. """
  129. cursor.execute(sql, (table_name,))
  130. return [
  131. PostgresIntrospectedPartitionTable(
  132. name=row[0].replace(f"{table_name}_", ""),
  133. full_name=row[0],
  134. comment=row[1] or None,
  135. )
  136. for row in cursor.fetchall()
  137. ]
  138. def get_partition_key(self, cursor, table_name: str) -> List[str]:
  139. """Gets the partition key for the specified partitioned table.
  140. Returns:
  141. A list of column names that are part of the
  142. partition key.
  143. """
  144. sql = """
  145. SELECT
  146. col.column_name
  147. FROM
  148. (SELECT partrelid,
  149. partnatts,
  150. CASE partstrat
  151. WHEN 'l' THEN 'list'
  152. WHEN 'r' THEN 'range'
  153. WHEN 'h' THEN 'hash'
  154. END AS partition_strategy,
  155. Unnest(partattrs) column_index
  156. FROM pg_partitioned_table) pt
  157. JOIN
  158. pg_class par
  159. ON par.oid = pt.partrelid
  160. JOIN
  161. information_schema.COLUMNS col
  162. ON
  163. col.table_schema = par.relnamespace :: regnamespace :: text
  164. AND col.table_name = par.relname
  165. AND ordinal_position = pt.column_index
  166. WHERE
  167. table_name = %s
  168. """
  169. cursor.execute(sql, (table_name,))
  170. return [row[0] for row in cursor.fetchall()]
  171. def get_columns(self, cursor, table_name: str):
  172. return self.get_table_description(cursor, table_name)
  173. def get_schema_list(self, cursor) -> List[str]:
  174. """A flat list of available schemas."""
  175. cursor.execute(
  176. """
  177. SELECT
  178. schema_name
  179. FROM
  180. information_schema.schemata
  181. """,
  182. tuple(),
  183. )
  184. return [name for name, in cursor.fetchall()]
  185. def get_constraints(self, cursor, table_name: str):
  186. """Retrieve any constraints or keys (unique, pk, fk, check, index)
  187. across one or more columns.
  188. Also retrieve the definition of expression-based indexes.
  189. """
  190. constraints = super().get_constraints(cursor, table_name)
  191. # standard Django implementation does not return the definition
  192. # for indexes, only for constraints, let's patch that up
  193. cursor.execute(
  194. "SELECT indexname, indexdef FROM pg_indexes WHERE tablename = %s",
  195. (table_name,),
  196. )
  197. for index_name, definition in cursor.fetchall():
  198. # PostgreSQL 13 or older won't give a definition if the
  199. # index is actually a primary key.
  200. constraint = constraints.get(index_name)
  201. if not constraint:
  202. continue
  203. if constraint.get("definition") is None:
  204. constraint["definition"] = definition
  205. return constraints
  206. def get_table_locks(self, cursor) -> List[Tuple[str, str, str]]:
  207. cursor.execute(
  208. """
  209. SELECT
  210. n.nspname,
  211. t.relname,
  212. l.mode
  213. FROM pg_locks l
  214. INNER JOIN pg_class t ON t.oid = l.relation
  215. INNER JOIN pg_namespace n ON n.oid = t.relnamespace
  216. WHERE t.relnamespace >= 2200
  217. ORDER BY n.nspname, t.relname, l.mode
  218. """
  219. )
  220. return cursor.fetchall()
  221. def get_storage_settings(self, cursor, table_name: str) -> Dict[str, str]:
  222. sql = """
  223. SELECT
  224. unnest(c.reloptions || array(select 'toast.' || x from pg_catalog.unnest(tc.reloptions) x))
  225. FROM
  226. pg_catalog.pg_class c
  227. LEFT JOIN
  228. pg_catalog.pg_class tc ON (c.reltoastrelid = tc.oid)
  229. LEFT JOIN
  230. pg_catalog.pg_am am ON (c.relam = am.oid)
  231. WHERE
  232. c.relname::text = %s
  233. AND pg_catalog.pg_table_is_visible(c.oid)
  234. """
  235. cursor.execute(sql, (table_name,))
  236. storage_settings = {}
  237. for row in cursor.fetchall():
  238. # It's hard to believe, but storage settings are really
  239. # represented as `key=value` strings in Postgres.
  240. # See: https://www.postgresql.org/docs/current/catalog-pg-class.html
  241. name, value = row[0].split("=")
  242. storage_settings[name] = value
  243. return storage_settings
  244. def get_relations(self, cursor, table_name: str):
  245. """Gets a dictionary {field_name: (field_name_other_table,
  246. other_table)} representing all relations in the specified table.
  247. This is overriden because the query in Django does not handle
  248. relations between tables in different schemas properly.
  249. """
  250. cursor.execute(
  251. """
  252. SELECT a1.attname, c2.relname, a2.attname
  253. FROM pg_constraint con
  254. LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
  255. LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
  256. LEFT JOIN pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
  257. LEFT JOIN pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
  258. WHERE
  259. con.conrelid = %s::regclass AND
  260. con.contype = 'f' AND
  261. pg_catalog.pg_table_is_visible(c1.oid)
  262. """,
  263. [table_name],
  264. )
  265. return {row[0]: (row[2], row[1]) for row in cursor.fetchall()}