|
- from typing import TYPE_CHECKING, Any, List, Optional, Type, cast
- from unittest import mock
- import django
- from django.core.exceptions import (
- FieldDoesNotExist,
- ImproperlyConfigured,
- SuspiciousOperation,
- )
- from django.db import transaction
- from django.db.backends.ddl_references import Statement
- from django.db.backends.postgresql.schema import ( # type: ignore[import]
- DatabaseSchemaEditor,
- )
- from django.db.models import Field, Model
- from psqlextra.settings import (
- postgres_prepend_local_search_path,
- postgres_reset_local_search_path,
- )
- from psqlextra.type_assertions import is_sql_with_params
- from psqlextra.types import PostgresPartitioningMethod
- from . import base_impl
- from .introspection import PostgresIntrospection
- from .side_effects import (
- HStoreRequiredSchemaEditorSideEffect,
- HStoreUniqueSchemaEditorSideEffect,
- )
- if TYPE_CHECKING:
- class SchemaEditor(DatabaseSchemaEditor):
- pass
- else:
- SchemaEditor = base_impl.schema_editor()
- class PostgresSchemaEditor(SchemaEditor):
- """Schema editor that adds extra methods for PostgreSQL specific features
- and hooks into existing implementations to add side effects specific to
- PostgreSQL."""
- sql_add_pk = "ALTER TABLE %s ADD PRIMARY KEY (%s)"
- sql_create_fk_not_valid = f"{SchemaEditor.sql_create_fk} NOT VALID"
- sql_validate_fk = "ALTER TABLE %s VALIDATE CONSTRAINT %s"
- sql_create_sequence_with_owner = "CREATE SEQUENCE %s OWNED BY %s.%s"
- sql_alter_table_storage_setting = "ALTER TABLE %s SET (%s = %s)"
- sql_reset_table_storage_setting = "ALTER TABLE %s RESET (%s)"
- sql_alter_table_schema = "ALTER TABLE %s SET SCHEMA %s"
- sql_create_schema = "CREATE SCHEMA %s"
- sql_delete_schema = "DROP SCHEMA %s"
- sql_delete_schema_cascade = "DROP SCHEMA %s CASCADE"
- sql_create_view = "CREATE VIEW %s AS (%s)"
- sql_replace_view = "CREATE OR REPLACE VIEW %s AS (%s)"
- sql_drop_view = "DROP VIEW IF EXISTS %s"
- sql_create_materialized_view = (
- "CREATE MATERIALIZED VIEW %s AS (%s) WITH DATA"
- )
- sql_drop_materialized_view = "DROP MATERIALIZED VIEW %s"
- sql_refresh_materialized_view = "REFRESH MATERIALIZED VIEW %s"
- sql_refresh_materialized_view_concurrently = (
- "REFRESH MATERIALIZED VIEW CONCURRENTLY %s"
- )
- sql_partition_by = " PARTITION BY %s (%s)"
- sql_add_default_partition = "CREATE TABLE %s PARTITION OF %s DEFAULT"
- sql_add_hash_partition = "CREATE TABLE %s PARTITION OF %s FOR VALUES WITH (MODULUS %s, REMAINDER %s)"
- sql_add_range_partition = (
- "CREATE TABLE %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)"
- )
- sql_add_list_partition = (
- "CREATE TABLE %s PARTITION OF %s FOR VALUES IN (%s)"
- )
- sql_delete_partition = "DROP TABLE %s"
- sql_table_comment = "COMMENT ON TABLE %s IS %s"
- side_effects: List[DatabaseSchemaEditor] = [
- cast(DatabaseSchemaEditor, HStoreUniqueSchemaEditorSideEffect()),
- cast(DatabaseSchemaEditor, HStoreRequiredSchemaEditorSideEffect()),
- ]
- def __init__(self, connection, collect_sql=False, atomic=True):
- super().__init__(connection, collect_sql, atomic)
- for side_effect in self.side_effects:
- side_effect.execute = self.execute
- side_effect.quote_name = self.quote_name
- self.deferred_sql = []
- self.introspection = PostgresIntrospection(self.connection)
- def create_schema(self, name: str) -> None:
- """Creates a Postgres schema."""
- self.execute(self.sql_create_schema % self.quote_name(name))
- def delete_schema(self, name: str, cascade: bool) -> None:
- """Drops a Postgres schema."""
- sql = (
- self.sql_delete_schema
- if not cascade
- else self.sql_delete_schema_cascade
- )
- self.execute(sql % self.quote_name(name))
- def create_model(self, model: Type[Model]) -> None:
- """Creates a new model."""
- super().create_model(model)
- for side_effect in self.side_effects:
- side_effect.create_model(model)
- def delete_model(self, model: Type[Model]) -> None:
- """Drops/deletes an existing model."""
- for side_effect in self.side_effects:
- side_effect.delete_model(model)
- super().delete_model(model)
- def clone_model_structure_to_schema(
- self, model: Type[Model], *, schema_name: str
- ) -> None:
- """Creates a clone of the columns for the specified model in a separate
- schema.
- The table will have exactly the same name as the model table
- in the default schema. It will have none of the constraints,
- foreign keys and indexes.
- Use this to create a temporary clone of a model table to
- replace the original model table later on. The lack of
- indices and constraints allows for greater write speeds.
- The original model table will be unaffected.
- Arguments:
- model:
- Model to clone the table of into the
- specified schema.
- schema_name:
- Name of the schema to create the cloned
- table in.
- """
- table_name = model._meta.db_table
- quoted_table_name = self.quote_name(model._meta.db_table)
- quoted_schema_name = self.quote_name(schema_name)
- quoted_table_fqn = f"{quoted_schema_name}.{quoted_table_name}"
- self.execute(
- self.sql_create_table
- % {
- "table": quoted_table_fqn,
- "definition": f"LIKE {quoted_table_name} INCLUDING ALL EXCLUDING CONSTRAINTS EXCLUDING INDEXES",
- }
- )
- # Copy sequences
- #
- # Django 4.0 and older do not use IDENTITY so Postgres does
- # not copy the sequences into the new table. We do it manually.
- if django.VERSION < (4, 1):
- with self.connection.cursor() as cursor:
- sequences = self.introspection.get_sequences(cursor, table_name)
- for sequence in sequences:
- if sequence["table"] != table_name:
- continue
- quoted_sequence_name = self.quote_name(sequence["name"])
- quoted_sequence_fqn = (
- f"{quoted_schema_name}.{quoted_sequence_name}"
- )
- quoted_column_name = self.quote_name(sequence["column"])
- self.execute(
- self.sql_create_sequence_with_owner
- % (
- quoted_sequence_fqn,
- quoted_table_fqn,
- quoted_column_name,
- )
- )
- self.execute(
- self.sql_alter_column
- % {
- "table": quoted_table_fqn,
- "changes": self.sql_alter_column_default
- % {
- "column": quoted_column_name,
- "default": "nextval('%s')" % quoted_sequence_fqn,
- },
- }
- )
- # Copy storage settings
- #
- # Postgres only copies column-level storage options, not
- # the table-level storage options.
- with self.connection.cursor() as cursor:
- storage_settings = self.introspection.get_storage_settings(
- cursor, model._meta.db_table
- )
- for setting_name, setting_value in storage_settings.items():
- self.alter_table_storage_setting(
- quoted_table_fqn, setting_name, setting_value
- )
- def clone_model_constraints_and_indexes_to_schema(
- self, model: Type[Model], *, schema_name: str
- ) -> None:
- """Adds the constraints, foreign keys and indexes to a model table that
- was cloned into a separate table without them by
- `clone_model_structure_to_schema`.
- Arguments:
- model:
- Model for which the cloned table was created.
- schema_name:
- Name of the schema in which the cloned table
- resides.
- """
- with postgres_prepend_local_search_path(
- [schema_name], using=self.connection.alias
- ):
- for constraint in model._meta.constraints:
- self.add_constraint(model, constraint) # type: ignore[attr-defined]
- for index in model._meta.indexes:
- self.add_index(model, index)
- if model._meta.unique_together:
- self.alter_unique_together(
- model, tuple(), model._meta.unique_together
- )
- if model._meta.index_together:
- self.alter_index_together(
- model, tuple(), model._meta.index_together
- )
- for field in model._meta.local_concrete_fields: # type: ignore[attr-defined]
- # Django creates primary keys later added to the model with
- # a custom name. We want the name as it was created originally.
- if field.primary_key:
- with postgres_reset_local_search_path(
- using=self.connection.alias
- ):
- [primary_key_name] = self._constraint_names( # type: ignore[attr-defined]
- model, primary_key=True
- )
- self.execute(
- self.sql_create_pk
- % {
- "table": self.quote_name(model._meta.db_table),
- "name": self.quote_name(primary_key_name),
- "columns": self.quote_name(
- field.db_column or field.attname
- ),
- }
- )
- continue
- # Django creates foreign keys in a single statement which acquires
- # a AccessExclusiveLock on the referenced table. We want to avoid
- # that and created the FK as NOT VALID. We can run VALIDATE in
- # a separate transaction later to validate the entries without
- # acquiring a AccessExclusiveLock.
- if field.remote_field:
- with postgres_reset_local_search_path(
- using=self.connection.alias
- ):
- [fk_name] = self._constraint_names( # type: ignore[attr-defined]
- model, [field.column], foreign_key=True
- )
- sql = Statement(
- self.sql_create_fk_not_valid,
- table=self.quote_name(model._meta.db_table),
- name=self.quote_name(fk_name),
- column=self.quote_name(field.column),
- to_table=self.quote_name(
- field.target_field.model._meta.db_table
- ),
- to_column=self.quote_name(field.target_field.column),
- deferrable=self.connection.ops.deferrable_sql(),
- )
- self.execute(sql)
- # It's hard to alter a field's check because it is defined
- # by the field class, not the field instance. Handle this
- # manually.
- field_check = field.db_parameters(self.connection).get("check")
- if field_check:
- with postgres_reset_local_search_path(
- using=self.connection.alias
- ):
- [field_check_name] = self._constraint_names( # type: ignore[attr-defined]
- model,
- [field.column],
- check=True,
- exclude={
- constraint.name
- for constraint in model._meta.constraints
- },
- )
- self.execute(
- self._create_check_sql( # type: ignore[attr-defined]
- model, field_check_name, field_check
- )
- )
- # Clone the field and alter its state to math our current
- # table definition. This will cause Django see the missing
- # indices and create them.
- if field.remote_field:
- # We add the foreign key constraint ourselves with NOT VALID,
- # hence, we specify `db_constraint=False` on both old/new.
- # Django won't touch the foreign key constraint.
- old_field = self._clone_model_field(
- field, db_index=False, unique=False, db_constraint=False
- )
- new_field = self._clone_model_field(
- field, db_constraint=False
- )
- self.alter_field(model, old_field, new_field)
- else:
- old_field = self._clone_model_field(
- field, db_index=False, unique=False
- )
- new_field = self._clone_model_field(field)
- self.alter_field(model, old_field, new_field)
- def clone_model_foreign_keys_to_schema(
- self, model: Type[Model], schema_name: str
- ) -> None:
- """Validates the foreign keys in the cloned model table created by
- `clone_model_structure_to_schema` and
- `clone_model_constraints_and_indexes_to_schema`.
- Do NOT run this in the same transaction as the
- foreign keys were added to the table. It WILL
- acquire a long-lived AccessExclusiveLock.
- Arguments:
- model:
- Model for which the cloned table was created.
- schema_name:
- Name of the schema in which the cloned table
- resides.
- """
- constraint_names = self._constraint_names(model, foreign_key=True) # type: ignore[attr-defined]
- with postgres_prepend_local_search_path(
- [schema_name], using=self.connection.alias
- ):
- for fk_name in constraint_names:
- self.execute(
- self.sql_validate_fk
- % (
- self.quote_name(model._meta.db_table),
- self.quote_name(fk_name),
- )
- )
- def alter_table_storage_setting(
- self, table_name: str, name: str, value: str
- ) -> None:
- """Alters a storage setting for a table.
- See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS
- Arguments:
- table_name:
- Name of the table to alter the setting for.
- name:
- Name of the setting to alter.
- value:
- Value to alter the setting to.
- Note that this is always a string, even if it looks
- like a number or a boolean. That's how Postgres
- stores storage settings internally.
- """
- self.execute(
- self.sql_alter_table_storage_setting
- % (self.quote_name(table_name), name, value)
- )
- def alter_model_storage_setting(
- self, model: Type[Model], name: str, value: str
- ) -> None:
- """Alters a storage setting for the model's table.
- See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS
- Arguments:
- model:
- Model of which to alter the table
- setting.
- name:
- Name of the setting to alter.
- value:
- Value to alter the setting to.
- Note that this is always a string, even if it looks
- like a number or a boolean. That's how Postgres
- stores storage settings internally.
- """
- self.alter_table_storage_setting(model._meta.db_table, name, value)
- def reset_table_storage_setting(self, table_name: str, name: str) -> None:
- """Resets a table's storage setting to the database or server default.
- See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS
- Arguments:
- table_name:
- Name of the table to reset the setting for.
- name:
- Name of the setting to reset.
- """
- self.execute(
- self.sql_reset_table_storage_setting
- % (self.quote_name(table_name), name)
- )
- def reset_model_storage_setting(
- self, model: Type[Model], name: str
- ) -> None:
- """Resets a model's table storage setting to the database or server
- default.
- See: https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-STORAGE-PARAMETERS
- Arguments:
- table_name:
- model:
- Model for which to reset the table setting for.
- name:
- Name of the setting to reset.
- """
- self.reset_table_storage_setting(model._meta.db_table, name)
- def alter_table_schema(self, table_name: str, schema_name: str) -> None:
- """Moves the specified table into the specified schema.
- WARNING: Moving models into a different schema than the default
- will break querying the model.
- Arguments:
- table_name:
- Name of the table to move into the specified schema.
- schema_name:
- Name of the schema to move the table to.
- """
- self.execute(
- self.sql_alter_table_schema
- % (self.quote_name(table_name), self.quote_name(schema_name))
- )
- def alter_model_schema(self, model: Type[Model], schema_name: str) -> None:
- """Moves the specified model's table into the specified schema.
- WARNING: Moving models into a different schema than the default
- will break querying the model.
- Arguments:
- model:
- Model of which to move the table.
- schema_name:
- Name of the schema to move the model's table to.
- """
- self.execute(
- self.sql_alter_table_schema
- % (
- self.quote_name(model._meta.db_table),
- self.quote_name(schema_name),
- )
- )
- def refresh_materialized_view_model(
- self, model: Type[Model], concurrently: bool = False
- ) -> None:
- """Refreshes a materialized view."""
- sql_template = (
- self.sql_refresh_materialized_view_concurrently
- if concurrently
- else self.sql_refresh_materialized_view
- )
- sql = sql_template % self.quote_name(model._meta.db_table)
- self.execute(sql)
- def create_view_model(self, model: Type[Model]) -> None:
- """Creates a new view model."""
- self._create_view_model(self.sql_create_view, model)
- def replace_view_model(self, model: Type[Model]) -> None:
- """Replaces a view model with a newer version.
- This is used to alter the backing query of a view.
- """
- self._create_view_model(self.sql_replace_view, model)
- def delete_view_model(self, model: Type[Model]) -> None:
- """Deletes a view model."""
- sql = self.sql_drop_view % self.quote_name(model._meta.db_table)
- self.execute(sql)
- def create_materialized_view_model(self, model: Type[Model]) -> None:
- """Creates a new materialized view model."""
- self._create_view_model(self.sql_create_materialized_view, model)
- def replace_materialized_view_model(self, model: Type[Model]) -> None:
- """Replaces a materialized view with a newer version.
- This is used to alter the backing query of a materialized view.
- Replacing a materialized view is a lot trickier than a normal view.
- For normal views we can use `CREATE OR REPLACE VIEW`, but for
- materialized views, we have to create the new view, copy all
- indexes and constraints and drop the old one.
- This operation is atomic as it runs in a transaction.
- """
- with self.connection.cursor() as cursor:
- constraints = self.introspection.get_constraints(
- cursor, model._meta.db_table
- )
- with transaction.atomic():
- self.delete_materialized_view_model(model)
- self.create_materialized_view_model(model)
- for constraint_name, constraint_options in constraints.items():
- if not constraint_options["definition"]:
- raise SuspiciousOperation(
- "Table %s has a constraint '%s' that no definition could be generated for",
- (model._meta.db_table, constraint_name),
- )
- self.execute(constraint_options["definition"])
- def delete_materialized_view_model(self, model: Type[Model]) -> None:
- """Deletes a materialized view model."""
- sql = self.sql_drop_materialized_view % self.quote_name(
- model._meta.db_table
- )
- self.execute(sql)
- def create_partitioned_model(self, model: Type[Model]) -> None:
- """Creates a new partitioned model."""
- meta = self._partitioning_properties_for_model(model)
- # get the sql statement that django creates for normal
- # table creations..
- sql, params = self._extract_sql(self.create_model, model)
- partitioning_key_sql = ", ".join(
- self.quote_name(field_name) for field_name in meta.key
- )
- # create a composite key that includes the partitioning key
- sql = sql.replace(" PRIMARY KEY", "")
- if model._meta.pk and model._meta.pk.name not in meta.key:
- sql = sql[:-1] + ", PRIMARY KEY (%s, %s))" % (
- self.quote_name(model._meta.pk.name),
- partitioning_key_sql,
- )
- else:
- sql = sql[:-1] + ", PRIMARY KEY (%s))" % (partitioning_key_sql,)
- # extend the standard CREATE TABLE statement with
- # 'PARTITION BY ...'
- sql += self.sql_partition_by % (
- meta.method.upper(),
- partitioning_key_sql,
- )
- self.execute(sql, params)
- def delete_partitioned_model(self, model: Type[Model]) -> None:
- """Drops the specified partitioned model."""
- return self.delete_model(model)
- def add_range_partition(
- self,
- model: Type[Model],
- name: str,
- from_values: Any,
- to_values: Any,
- comment: Optional[str] = None,
- ) -> None:
- """Creates a new range partition for the specified partitioned model.
- Arguments:
- model:
- Partitioned model to create a partition for.
- name:
- Name to give to the new partition.
- Final name will be "{table_name}_{partition_name}"
- from_values:
- Start of the partitioning key range of
- values that need to be stored in this
- partition.
- to_values:
- End of the partitioning key range of
- values that need to be stored in this
- partition.
- comment:
- Optionally, a comment to add on this
- partition table.
- """
- # asserts the model is a model set up for partitioning
- self._partitioning_properties_for_model(model)
- table_name = self.create_partition_table_name(model, name)
- sql = self.sql_add_range_partition % (
- self.quote_name(table_name),
- self.quote_name(model._meta.db_table),
- "%s",
- "%s",
- )
- with transaction.atomic():
- self.execute(sql, (from_values, to_values))
- if comment:
- self.set_comment_on_table(table_name, comment)
- def add_list_partition(
- self,
- model: Type[Model],
- name: str,
- values: List[Any],
- comment: Optional[str] = None,
- ) -> None:
- """Creates a new list partition for the specified partitioned model.
- Arguments:
- model:
- Partitioned model to create a partition for.
- name:
- Name to give to the new partition.
- Final name will be "{table_name}_{partition_name}"
- values:
- Partition key values that should be
- stored in this partition.
- comment:
- Optionally, a comment to add on this
- partition table.
- """
- # asserts the model is a model set up for partitioning
- self._partitioning_properties_for_model(model)
- table_name = self.create_partition_table_name(model, name)
- sql = self.sql_add_list_partition % (
- self.quote_name(table_name),
- self.quote_name(model._meta.db_table),
- ",".join(["%s" for _ in range(len(values))]),
- )
- with transaction.atomic():
- self.execute(sql, values)
- if comment:
- self.set_comment_on_table(table_name, comment)
- def add_hash_partition(
- self,
- model: Type[Model],
- name: str,
- modulus: int,
- remainder: int,
- comment: Optional[str] = None,
- ) -> None:
- """Creates a new hash partition for the specified partitioned model.
- Arguments:
- model:
- Partitioned model to create a partition for.
- name:
- Name to give to the new partition.
- Final name will be "{table_name}_{partition_name}"
- modulus:
- Integer value by which the key is divided.
- remainder:
- The remainder of the hash value when divided by modulus.
- comment:
- Optionally, a comment to add on this partition table.
- """
- # asserts the model is a model set up for partitioning
- self._partitioning_properties_for_model(model)
- table_name = self.create_partition_table_name(model, name)
- sql = self.sql_add_hash_partition % (
- self.quote_name(table_name),
- self.quote_name(model._meta.db_table),
- "%s",
- "%s",
- )
- with transaction.atomic():
- self.execute(sql, (modulus, remainder))
- if comment:
- self.set_comment_on_table(table_name, comment)
- def add_default_partition(
- self, model: Type[Model], name: str, comment: Optional[str] = None
- ) -> None:
- """Creates a new default partition for the specified partitioned model.
- A default partition is a partition where rows are routed to when
- no more specific partition is a match.
- Arguments:
- model:
- Partitioned model to create a partition for.
- name:
- Name to give to the new partition.
- Final name will be "{table_name}_{partition_name}"
- comment:
- Optionally, a comment to add on this
- partition table.
- """
- # asserts the model is a model set up for partitioning
- self._partitioning_properties_for_model(model)
- table_name = self.create_partition_table_name(model, name)
- sql = self.sql_add_default_partition % (
- self.quote_name(table_name),
- self.quote_name(model._meta.db_table),
- )
- with transaction.atomic():
- self.execute(sql)
- if comment:
- self.set_comment_on_table(table_name, comment)
- def delete_partition(self, model: Type[Model], name: str) -> None:
- """Deletes the partition with the specified name."""
- sql = self.sql_delete_partition % self.quote_name(
- self.create_partition_table_name(model, name)
- )
- self.execute(sql)
- def alter_db_table(
- self, model: Type[Model], old_db_table: str, new_db_table: str
- ) -> None:
- """Alters a table/model."""
- super().alter_db_table(model, old_db_table, new_db_table)
- for side_effect in self.side_effects:
- side_effect.alter_db_table(model, old_db_table, new_db_table)
- def add_field(self, model: Type[Model], field: Field) -> None:
- """Adds a new field to an exisiting model."""
- super().add_field(model, field)
- for side_effect in self.side_effects:
- side_effect.add_field(model, field)
- def remove_field(self, model: Type[Model], field: Field) -> None:
- """Removes a field from an existing model."""
- for side_effect in self.side_effects:
- side_effect.remove_field(model, field)
- super().remove_field(model, field)
- def alter_field(
- self,
- model: Type[Model],
- old_field: Field,
- new_field: Field,
- strict: bool = False,
- ) -> None:
- """Alters an existing field on an existing model."""
- super().alter_field(model, old_field, new_field, strict)
- for side_effect in self.side_effects:
- side_effect.alter_field(model, old_field, new_field, strict)
- def vacuum_table(
- self,
- table_name: str,
- columns: List[str] = [],
- *,
- full: bool = False,
- freeze: bool = False,
- verbose: bool = False,
- analyze: bool = False,
- disable_page_skipping: bool = False,
- skip_locked: bool = False,
- index_cleanup: bool = False,
- truncate: bool = False,
- parallel: Optional[int] = None,
- ) -> None:
- """Runs the VACUUM statement on the specified table with the specified
- options.
- Arguments:
- table_name:
- Name of the table to run VACUUM on.
- columns:
- Optionally, a list of columns to vacuum. If not
- specified, all columns are vacuumed.
- """
- if self.connection.in_atomic_block:
- raise SuspiciousOperation("Vacuum cannot be done in a transaction")
- options = []
- if full:
- options.append("FULL")
- if freeze:
- options.append("FREEZE")
- if verbose:
- options.append("VERBOSE")
- if analyze:
- options.append("ANALYZE")
- if disable_page_skipping:
- options.append("DISABLE_PAGE_SKIPPING")
- if skip_locked:
- options.append("SKIP_LOCKED")
- if index_cleanup:
- options.append("INDEX_CLEANUP")
- if truncate:
- options.append("TRUNCATE")
- if parallel is not None:
- options.append(f"PARALLEL {parallel}")
- sql = "VACUUM"
- if options:
- options_sql = ", ".join(options)
- sql += f" ({options_sql})"
- sql += f" {self.quote_name(table_name)}"
- if columns:
- columns_sql = ", ".join(
- [self.quote_name(column) for column in columns]
- )
- sql += f" ({columns_sql})"
- self.execute(sql)
- def vacuum_model(
- self, model: Type[Model], fields: List[Field] = [], **kwargs
- ) -> None:
- """Runs the VACUUM statement on the table of the specified model with
- the specified options.
- Arguments:
- table_name:
- model:
- Model of which to run VACUUM the table.
- fields:
- Optionally, a list of fields to vacuum. If not
- specified, all fields are vacuumed.
- """
- columns = [
- field.column
- for field in fields
- if getattr(field, "concrete", False) and field.column
- ]
- self.vacuum_table(model._meta.db_table, columns, **kwargs)
- def set_comment_on_table(self, table_name: str, comment: str) -> None:
- """Sets the comment on the specified table."""
- sql = self.sql_table_comment % (self.quote_name(table_name), "%s")
- self.execute(sql, (comment,))
- def _create_view_model(self, sql: str, model: Type[Model]) -> None:
- """Creates a new view model using the specified SQL query."""
- meta = self._view_properties_for_model(model)
- with self.connection.cursor() as cursor:
- view_sql = cursor.mogrify(*meta.query)
- if isinstance(view_sql, bytes):
- view_sql = view_sql.decode("utf-8")
- self.execute(sql % (self.quote_name(model._meta.db_table), view_sql))
- def _extract_sql(self, method, *args):
- """Calls the specified method with the specified arguments and
- intercepts the SQL statement it WOULD execute.
- We use this to figure out the exact SQL statement Django would
- execute. We can then make a small modification and execute it
- ourselves.
- """
- with mock.patch.object(self, "execute") as execute:
- method(*args)
- return tuple(execute.mock_calls[0])[1]
- @staticmethod
- def _view_properties_for_model(model: Type[Model]):
- """Gets the view options for the specified model.
- Raises:
- ImproperlyConfigured:
- When the specified model is not set up
- as a view.
- """
- meta = getattr(model, "_view_meta", None)
- if not meta:
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be a view."
- " Create the `ViewMeta` class as a child of '%s'."
- )
- % (model.__name__, model.__name__)
- )
- if not is_sql_with_params(meta.query):
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be a view."
- " Set the `query` and `key` attribute on the"
- " `ViewMeta` class as a child of '%s'"
- )
- % (model.__name__, model.__name__)
- )
- return meta
- @staticmethod
- def _partitioning_properties_for_model(model: Type[Model]):
- """Gets the partitioning options for the specified model.
- Raises:
- ImproperlyConfigured:
- When the specified model is not set up
- for partitioning.
- """
- meta = getattr(model, "_partitioning_meta", None)
- if not meta:
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be partitioned."
- " Create the `PartitioningMeta` class as a child of '%s'."
- )
- % (model.__name__, model.__name__)
- )
- if not meta.method or not meta.key:
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be partitioned."
- " Set the `method` and `key` attributes on the"
- " `PartitioningMeta` class as a child of '%s'"
- )
- % (model.__name__, model.__name__)
- )
- if meta.method not in PostgresPartitioningMethod:
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be partitioned."
- " '%s' is not a member of the PostgresPartitioningMethod enum."
- )
- % (model.__name__, meta.method)
- )
- if not isinstance(meta.key, list):
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be partitioned."
- " Partitioning key should be a list (of field names or values,"
- " depending on the partitioning method)."
- )
- % model.__name__
- )
- try:
- for field_name in meta.key:
- model._meta.get_field(field_name)
- except FieldDoesNotExist:
- raise ImproperlyConfigured(
- (
- "Model '%s' is not properly configured to be partitioned."
- " Field '%s' in partitioning key %s is not a valid field on"
- " '%s'."
- )
- % (model.__name__, field_name, meta.key, model.__name__)
- )
- return meta
- def create_partition_table_name(self, model: Type[Model], name: str) -> str:
- return "%s_%s" % (model._meta.db_table.lower(), name.lower())
- def _clone_model_field(self, field: Field, **overrides) -> Field:
- """Clones the specified model field and overrides its kwargs with the
- specified overrides.
- The cloned field will not be contributed to the model.
- """
- _, _, field_args, field_kwargs = field.deconstruct()
- cloned_field_args = field_args[:]
- cloned_field_kwargs = {**field_kwargs, **overrides}
- cloned_field = field.__class__(
- *cloned_field_args, **cloned_field_kwargs
- )
- cloned_field.model = field.model
- cloned_field.set_attributes_from_name(field.name)
- if cloned_field.remote_field and field.remote_field:
- cloned_field.remote_field.model = field.remote_field.model
- cloned_field.set_attributes_from_rel() # type: ignore[attr-defined]
- return cloned_field
|