|
@@ -198,22 +198,23 @@ def restrict_role(role: str, model: Any, revocation_type: str) -> None:
|
|
|
connection.execute(f"REVOKE {revocation_type} ON public.{model._meta.db_table} FROM {role}")
|
|
|
|
|
|
|
|
|
-def iter_models() -> Iterable[Type[Model]]:
|
|
|
+def iter_models(app_name: str | None = None) -> Iterable[Type[Model]]:
|
|
|
from django.apps import apps
|
|
|
|
|
|
for app, app_models in apps.all_models.items():
|
|
|
- for model in app_models.values():
|
|
|
- if (
|
|
|
- model.__module__.startswith("django.")
|
|
|
- or "tests." in model.__module__
|
|
|
- or "fixtures." in model.__module__
|
|
|
- ):
|
|
|
- continue
|
|
|
- yield model
|
|
|
+ if app == app_name or app_name is None:
|
|
|
+ for model in app_models.values():
|
|
|
+ if (
|
|
|
+ model.__module__.startswith("django.")
|
|
|
+ or "tests." in model.__module__
|
|
|
+ or "fixtures." in model.__module__
|
|
|
+ ):
|
|
|
+ continue
|
|
|
+ yield model
|
|
|
|
|
|
|
|
|
-def validate_models_have_silos(exemptions: Set[Type[Model]]) -> None:
|
|
|
- for model in iter_models():
|
|
|
+def validate_models_have_silos(exemptions: Set[Type[Model]], app_name: str | None = None) -> None:
|
|
|
+ for model in iter_models(app_name):
|
|
|
if model in exemptions:
|
|
|
continue
|
|
|
if not isinstance(getattr(model._meta, "silo_limit", None), ModelSiloLimit):
|
|
@@ -229,13 +230,17 @@ def validate_models_have_silos(exemptions: Set[Type[Model]]) -> None:
|
|
|
)
|
|
|
|
|
|
|
|
|
-def validate_no_cross_silo_foreign_keys(exemptions: Set[Tuple[Type[Model], Type[Model]]]) -> None:
|
|
|
- for model in iter_models():
|
|
|
+def validate_no_cross_silo_foreign_keys(
|
|
|
+ exemptions: Set[Tuple[Type[Model], Type[Model]]], app_name: str | None = None
|
|
|
+) -> None:
|
|
|
+ for model in iter_models(app_name):
|
|
|
validate_model_no_cross_silo_foreign_keys(model, exemptions)
|
|
|
|
|
|
|
|
|
-def validate_no_cross_silo_deletions(exemptions: Set[Tuple[Type[Model], Type[Model]]]) -> None:
|
|
|
- for model_class in iter_models():
|
|
|
+def validate_no_cross_silo_deletions(
|
|
|
+ exemptions: Set[Tuple[Type[Model], Type[Model]]], app_name: str | None = None
|
|
|
+) -> None:
|
|
|
+ for model_class in iter_models(app_name):
|
|
|
if not hasattr(model_class._meta, "silo_limit"):
|
|
|
continue
|
|
|
deletion_task: BaseDeletionTask = deletions.get(model=model_class, query={})
|