settings.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from contextlib import contextmanager
  2. from typing import Generator, List, Optional, Union
  3. from django.core.exceptions import SuspiciousOperation
  4. from django.db import DEFAULT_DB_ALIAS, connections
  5. @contextmanager
  6. def postgres_set_local(
  7. *,
  8. using: str = DEFAULT_DB_ALIAS,
  9. **options: Optional[Union[str, int, float, List[str]]],
  10. ) -> Generator[None, None, None]:
  11. """Sets the specified PostgreSQL options using SET LOCAL so that they apply
  12. to the current transacton only.
  13. The effect is undone when the context manager exits.
  14. See https://www.postgresql.org/docs/current/runtime-config-client.html
  15. for an overview of all available options.
  16. """
  17. connection = connections[using]
  18. qn = connection.ops.quote_name
  19. if not connection.in_atomic_block:
  20. raise SuspiciousOperation(
  21. "SET LOCAL makes no sense outside a transaction. Start a transaction first."
  22. )
  23. sql = []
  24. params: List[Union[str, int, float, List[str]]] = []
  25. for name, value in options.items():
  26. if value is None:
  27. sql.append(f"SET LOCAL {qn(name)} TO DEFAULT")
  28. continue
  29. # Settings that accept a list of values are actually
  30. # stored as string lists. We cannot just pass a list
  31. # of values. We have to create the comma separated
  32. # string ourselves.
  33. if isinstance(value, list) or isinstance(value, tuple):
  34. placeholder = ", ".join(["%s" for _ in value])
  35. params.extend(value)
  36. else:
  37. placeholder = "%s"
  38. params.append(value)
  39. sql.append(f"SET LOCAL {qn(name)} = {placeholder}")
  40. with connection.cursor() as cursor:
  41. cursor.execute(
  42. "SELECT name, setting FROM pg_settings WHERE name = ANY(%s)",
  43. (list(options.keys()),),
  44. )
  45. original_values = dict(cursor.fetchall())
  46. cursor.execute("; ".join(sql), params)
  47. yield
  48. # Put everything back to how it was. DEFAULT is
  49. # not good enough as a outer SET LOCAL might
  50. # have set a different value.
  51. with connection.cursor() as cursor:
  52. sql = []
  53. params = []
  54. for name, value in options.items():
  55. original_value = original_values.get(name)
  56. if original_value:
  57. sql.append(f"SET LOCAL {qn(name)} = {original_value}")
  58. else:
  59. sql.append(f"SET LOCAL {qn(name)} TO DEFAULT")
  60. cursor.execute("; ".join(sql), params)
  61. @contextmanager
  62. def postgres_set_local_search_path(
  63. search_path: List[str], *, using: str = DEFAULT_DB_ALIAS
  64. ) -> Generator[None, None, None]:
  65. """Sets the search path to the specified schemas."""
  66. with postgres_set_local(search_path=search_path, using=using):
  67. yield
  68. @contextmanager
  69. def postgres_prepend_local_search_path(
  70. search_path: List[str], *, using: str = DEFAULT_DB_ALIAS
  71. ) -> Generator[None, None, None]:
  72. """Prepends the current local search path with the specified schemas."""
  73. connection = connections[using]
  74. with connection.cursor() as cursor:
  75. cursor.execute("SHOW search_path")
  76. [
  77. original_search_path,
  78. ] = cursor.fetchone()
  79. placeholders = ", ".join(["%s" for _ in search_path])
  80. cursor.execute(
  81. f"SET LOCAL search_path = {placeholders}, {original_search_path}",
  82. tuple(search_path),
  83. )
  84. yield
  85. cursor.execute(f"SET LOCAL search_path = {original_search_path}")
  86. @contextmanager
  87. def postgres_reset_local_search_path(
  88. *, using: str = DEFAULT_DB_ALIAS
  89. ) -> Generator[None, None, None]:
  90. """Resets the local search path to the default."""
  91. with postgres_set_local(search_path=None, using=using):
  92. yield