123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- from contextlib import contextmanager
- from unittest import mock
- import django
- from django.db.migrations import (
- AddField,
- AlterField,
- CreateModel,
- DeleteModel,
- RemoveField,
- RenameField,
- )
- from django.db.migrations.autodetector import MigrationAutodetector
- from django.db.migrations.operations.fields import FieldOperation
- from psqlextra.models import (
- PostgresMaterializedViewModel,
- PostgresPartitionedModel,
- PostgresViewModel,
- )
- from psqlextra.types import PostgresPartitioningMethod
- from . import operations
- from .state import (
- PostgresMaterializedViewModelState,
- PostgresPartitionedModelState,
- PostgresViewModelState,
- )
- # original `MigrationAutodetector.add_operation`
- # function, saved here so the patched version can
- # call the original
- add_operation = MigrationAutodetector.add_operation
- class AddOperationHandler:
- """Handler for when operations are being added to a new migration.
- This is where we intercept operations such as
- :see:CreateModel to replace it with our own.
- """
- def __init__(self, autodetector, app_label, args, kwargs):
- self.autodetector = autodetector
- self.app_label = app_label
- self.args = args
- self.kwargs = kwargs
- def add(self, operation):
- """Adds the specified operation to the list of operations to execute in
- the migration."""
- return add_operation(
- self.autodetector,
- self.app_label,
- operation,
- *self.args,
- **self.kwargs,
- )
- def add_field(self, operation: AddField):
- """Adds the specified :see:AddField operation to the list of operations
- to execute in the migration."""
- return self._transform_view_field_operations(operation)
- def remove_field(self, operation: RemoveField):
- """Adds the specified :see:RemoveField operation to the list of
- operations to execute in the migration."""
- return self._transform_view_field_operations(operation)
- def alter_field(self, operation: AlterField):
- """Adds the specified :see:AlterField operation to the list of
- operations to execute in the migration."""
- return self._transform_view_field_operations(operation)
- def rename_field(self, operation: RenameField):
- """Adds the specified :see:RenameField operation to the list of
- operations to execute in the migration."""
- return self._transform_view_field_operations(operation)
- def _transform_view_field_operations(self, operation: FieldOperation):
- """Transforms operations on fields on a (materialized) view into state
- only operations.
- One cannot add/remove/delete fields on a (materialized) view,
- however, we do want Django's migration system to keep track of
- these kind of changes to the model. The :see:ApplyState
- operation just tells Django the operation was applied without
- actually applying it.
- """
- if django.VERSION >= (4, 0):
- model_identifier = (self.app_label, operation.model_name.lower())
- model_state = (
- self.autodetector.to_state.models.get(model_identifier)
- or self.autodetector.from_state.models[model_identifier]
- )
- if isinstance(model_state, PostgresViewModelState):
- return self.add(
- operations.ApplyState(state_operation=operation)
- )
- else:
- model = self.autodetector.new_apps.get_model(
- self.app_label, operation.model_name
- )
- if issubclass(model, PostgresViewModel):
- return self.add(
- operations.ApplyState(state_operation=operation)
- )
- return self.add(operation)
- def add_create_model(self, operation: CreateModel):
- """Adds the specified :see:CreateModel operation to the list of
- operations to execute in the migration."""
- if django.VERSION >= (4, 0):
- model_state = self.autodetector.to_state.models[
- self.app_label, operation.name.lower()
- ]
- if isinstance(model_state, PostgresPartitionedModelState):
- return self.add_create_partitioned_model(operation)
- elif isinstance(model_state, PostgresMaterializedViewModelState):
- return self.add_create_materialized_view_model(operation)
- elif isinstance(model_state, PostgresViewModelState):
- return self.add_create_view_model(operation)
- else:
- model = self.autodetector.new_apps.get_model(
- self.app_label, operation.name
- )
- if issubclass(model, PostgresPartitionedModel):
- return self.add_create_partitioned_model(operation)
- elif issubclass(model, PostgresMaterializedViewModel):
- return self.add_create_materialized_view_model(operation)
- elif issubclass(model, PostgresViewModel):
- return self.add_create_view_model(operation)
- return self.add(operation)
- def add_delete_model(self, operation: DeleteModel):
- """Adds the specified :see:Deletemodel operation to the list of
- operations to execute in the migration."""
- if django.VERSION >= (4, 0):
- model_state = self.autodetector.from_state.models[
- self.app_label, operation.name.lower()
- ]
- if isinstance(model_state, PostgresPartitionedModelState):
- return self.add_delete_partitioned_model(operation)
- elif isinstance(model_state, PostgresMaterializedViewModelState):
- return self.add_delete_materialized_view_model(operation)
- elif isinstance(model_state, PostgresViewModelState):
- return self.add_delete_view_model(operation)
- else:
- model = self.autodetector.old_apps.get_model(
- self.app_label, operation.name
- )
- if issubclass(model, PostgresPartitionedModel):
- return self.add_delete_partitioned_model(operation)
- elif issubclass(model, PostgresMaterializedViewModel):
- return self.add_delete_materialized_view_model(operation)
- elif issubclass(model, PostgresViewModel):
- return self.add_delete_view_model(operation)
- return self.add(operation)
- def add_create_partitioned_model(self, operation: CreateModel):
- """Adds a :see:PostgresCreatePartitionedModel operation to the list of
- operations to execute in the migration."""
- if django.VERSION >= (4, 0):
- model_state = self.autodetector.to_state.models[
- self.app_label, operation.name.lower()
- ]
- partitioning_options = model_state.partitioning_options
- else:
- model = self.autodetector.new_apps.get_model(
- self.app_label, operation.name
- )
- partitioning_options = model._partitioning_meta.original_attrs
- _, args, kwargs = operation.deconstruct()
- if partitioning_options["method"] != PostgresPartitioningMethod.HASH:
- self.add(
- operations.PostgresAddDefaultPartition(
- model_name=operation.name, name="default"
- )
- )
- partitioned_kwargs = {
- **kwargs,
- "partitioning_options": partitioning_options,
- }
- self.add(
- operations.PostgresCreatePartitionedModel(
- *args,
- **partitioned_kwargs,
- )
- )
- def add_delete_partitioned_model(self, operation: DeleteModel):
- """Adds a :see:PostgresDeletePartitionedModel operation to the list of
- operations to execute in the migration."""
- _, args, kwargs = operation.deconstruct()
- return self.add(
- operations.PostgresDeletePartitionedModel(*args, **kwargs)
- )
- def add_create_view_model(self, operation: CreateModel):
- """Adds a :see:PostgresCreateViewModel operation to the list of
- operations to execute in the migration."""
- if django.VERSION >= (4, 0):
- model_state = self.autodetector.to_state.models[
- self.app_label, operation.name.lower()
- ]
- view_options = model_state.view_options
- else:
- model = self.autodetector.new_apps.get_model(
- self.app_label, operation.name
- )
- view_options = model._view_meta.original_attrs
- _, args, kwargs = operation.deconstruct()
- view_kwargs = {**kwargs, "view_options": view_options}
- self.add(operations.PostgresCreateViewModel(*args, **view_kwargs))
- def add_delete_view_model(self, operation: DeleteModel):
- """Adds a :see:PostgresDeleteViewModel operation to the list of
- operations to execute in the migration."""
- _, args, kwargs = operation.deconstruct()
- return self.add(operations.PostgresDeleteViewModel(*args, **kwargs))
- def add_create_materialized_view_model(self, operation: CreateModel):
- """Adds a :see:PostgresCreateMaterializedViewModel operation to the
- list of operations to execute in the migration."""
- if django.VERSION >= (4, 0):
- model_state = self.autodetector.to_state.models[
- self.app_label, operation.name.lower()
- ]
- view_options = model_state.view_options
- else:
- model = self.autodetector.new_apps.get_model(
- self.app_label, operation.name
- )
- view_options = model._view_meta.original_attrs
- _, args, kwargs = operation.deconstruct()
- view_kwargs = {**kwargs, "view_options": view_options}
- self.add(
- operations.PostgresCreateMaterializedViewModel(
- *args,
- **view_kwargs,
- )
- )
- def add_delete_materialized_view_model(self, operation: DeleteModel):
- """Adds a :see:PostgresDeleteMaterializedViewModel operation to the
- list of operations to execute in the migration."""
- _, args, kwargs = operation.deconstruct()
- return self.add(
- operations.PostgresDeleteMaterializedViewModel(*args, **kwargs)
- )
- @contextmanager
- def patched_autodetector():
- """Patches the standard Django :seee:MigrationAutodetector for the duration
- of the context.
- The patch intercepts the `add_operation` function to
- customize how new operations are added.
- We have to do this because there is no way in Django
- to extend the auto detector otherwise.
- """
- autodetector_module_path = "django.db.migrations.autodetector"
- autodetector_class_path = (
- f"{autodetector_module_path}.MigrationAutodetector"
- )
- add_operation_path = f"{autodetector_class_path}.add_operation"
- def _patched(autodetector, app_label, operation, *args, **kwargs):
- handler = AddOperationHandler(autodetector, app_label, args, kwargs)
- if isinstance(operation, CreateModel):
- return handler.add_create_model(operation)
- if isinstance(operation, DeleteModel):
- return handler.add_delete_model(operation)
- if isinstance(operation, AddField):
- return handler.add_field(operation)
- if isinstance(operation, RemoveField):
- return handler.remove_field(operation)
- if isinstance(operation, AlterField):
- return handler.alter_field(operation)
- if isinstance(operation, RenameField):
- return handler.rename_field(operation)
- return handler.add(operation)
- with mock.patch(add_operation_path, new=_patched):
- yield
|