|
@@ -7,10 +7,11 @@ import re
|
|
|
import sys
|
|
|
from collections import defaultdict
|
|
|
from contextlib import contextmanager
|
|
|
-from typing import Any, Callable, Dict, Iterable, MutableMapping, MutableSet, Set, Tuple, Type
|
|
|
+from typing import Any, Callable, Dict, Iterable, List, MutableMapping, MutableSet, Set, Tuple, Type
|
|
|
from unittest import TestCase
|
|
|
|
|
|
import pytest
|
|
|
+from django.apps import apps
|
|
|
from django.conf import settings
|
|
|
from django.db import connections, router
|
|
|
from django.db.models import Model
|
|
@@ -198,6 +199,7 @@ def assume_test_silo_mode(desired_silo: SiloMode) -> Any:
|
|
|
|
|
|
|
|
|
def reset_test_role(role: str, using: str, create_role: bool) -> None:
|
|
|
+ # Deprecated, will remove once getsentry is updated.
|
|
|
connection_names = [conn.alias for conn in connections.all()]
|
|
|
|
|
|
if create_role:
|
|
@@ -232,6 +234,7 @@ _role_privileges_created: MutableMapping[str, bool] = {}
|
|
|
|
|
|
|
|
|
def create_model_role_guards(app_config: Any, using: str, **kwargs: Any):
|
|
|
+ # Deprecated, will remove once getsentry is updated.
|
|
|
global _role_created
|
|
|
if "pytest" not in sys.argv[0] or not settings.USE_ROLE_SWAPPING_IN_TESTS:
|
|
|
return
|
|
@@ -296,6 +299,7 @@ post_migrate.connect(create_model_role_guards, dispatch_uid="create_model_role_g
|
|
|
|
|
|
|
|
|
def restrict_role(role: str, model: Any, revocation_type: str, using: str = "default") -> None:
|
|
|
+ # Deprecated, will remove once getsentry is updated.
|
|
|
if router.db_for_write(model) != using:
|
|
|
return
|
|
|
|
|
@@ -337,18 +341,43 @@ def protected_table(table: str, operation: str) -> re.Pattern:
|
|
|
return re.compile(f'{operation}[^"]+"{table}"', re.IGNORECASE)
|
|
|
|
|
|
|
|
|
-protected_operations = (
|
|
|
- protected_table("sentry_organizationmember", "insert"),
|
|
|
- protected_table("sentry_organizationmember", "update"),
|
|
|
- protected_table("sentry_organizationmember", "delete"),
|
|
|
- protected_table("sentry_organization", "insert"),
|
|
|
- protected_table("sentry_organization", "update"),
|
|
|
- protected_table("sentry_organizationmapping", "insert"),
|
|
|
- protected_table("sentry_organizationmapping", "update"),
|
|
|
- protected_table("sentry_organizationmembermapping", "insert"),
|
|
|
-)
|
|
|
+_protected_operations: List[re.Pattern] = []
|
|
|
|
|
|
-fence_re = re.compile(r"select\s*\'(?P<operation>start|end)_role_override", re.IGNORECASE)
|
|
|
+
|
|
|
+def get_protected_operations() -> List[re.Pattern]:
|
|
|
+ if len(_protected_operations):
|
|
|
+ return _protected_operations
|
|
|
+
|
|
|
+ # Protect Foreign Keys using hybrid cloud models from being deleted without using the
|
|
|
+ # privileged user. Deletion should only occur when the developer is actively aware
|
|
|
+ # of the need to generate outboxes.
|
|
|
+ seen_models: MutableSet[type] = set()
|
|
|
+ for app_config in apps.get_app_configs():
|
|
|
+ for model in iter_models(app_config.name):
|
|
|
+ for field in model._meta.fields:
|
|
|
+ if not isinstance(field, HybridCloudForeignKey):
|
|
|
+ continue
|
|
|
+ fk_model = field.foreign_model
|
|
|
+ if fk_model is None or fk_model in seen_models:
|
|
|
+ continue
|
|
|
+ seen_models.add(fk_model)
|
|
|
+ _protected_operations.append(protected_table(fk_model._meta.db_table, "delete"))
|
|
|
+
|
|
|
+ # Protect inserts/updates that require outbox messages.
|
|
|
+ _protected_operations.extend(
|
|
|
+ [
|
|
|
+ protected_table("sentry_organizationmember", "insert"),
|
|
|
+ protected_table("sentry_organizationmember", "update"),
|
|
|
+ protected_table("sentry_organizationmember", "delete"),
|
|
|
+ protected_table("sentry_organization", "insert"),
|
|
|
+ protected_table("sentry_organization", "update"),
|
|
|
+ protected_table("sentry_organizationmapping", "insert"),
|
|
|
+ protected_table("sentry_organizationmapping", "update"),
|
|
|
+ protected_table("sentry_organizationmembermapping", "insert"),
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ return _protected_operations
|
|
|
|
|
|
|
|
|
def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
@@ -371,7 +400,7 @@ def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
|
else:
|
|
|
raise AssertionError("Invalid fencing operation encounted")
|
|
|
|
|
|
- for protected in protected_operations:
|
|
|
+ for protected in get_protected_operations():
|
|
|
if protected.match(sql):
|
|
|
if fence_depth == 0:
|
|
|
msg = [
|
|
@@ -396,8 +425,6 @@ def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
|
|
|
|
|
|
|
def iter_models(app_name: str | None = None) -> Iterable[Type[Model]]:
|
|
|
- from django.apps import apps
|
|
|
-
|
|
|
for app, app_models in apps.all_models.items():
|
|
|
if app == app_name or app_name is None:
|
|
|
for model in app_models.values():
|