|
@@ -7,11 +7,10 @@ import re
|
|
|
import sys
|
|
|
from collections import defaultdict
|
|
|
from contextlib import contextmanager
|
|
|
-from typing import Any, Callable, Dict, Iterable, List, MutableMapping, MutableSet, Set, Tuple, Type
|
|
|
+from typing import Any, Callable, Dict, Iterable, MutableMapping, MutableSet, Set, Tuple, Type
|
|
|
from unittest import TestCase
|
|
|
|
|
|
import pytest
|
|
|
-from django.apps import apps
|
|
|
from django.conf import settings
|
|
|
from django.db import connections, router
|
|
|
from django.db.models import Model
|
|
@@ -199,7 +198,6 @@ def assume_test_silo_mode(desired_silo: SiloMode) -> Any:
|
|
|
|
|
|
|
|
|
def reset_test_role(role: str, using: str, create_role: bool) -> None:
|
|
|
- # Deprecated, will remove once getsentry is updated.
|
|
|
connection_names = [conn.alias for conn in connections.all()]
|
|
|
|
|
|
if create_role:
|
|
@@ -234,7 +232,6 @@ _role_privileges_created: MutableMapping[str, bool] = {}
|
|
|
|
|
|
|
|
|
def create_model_role_guards(app_config: Any, using: str, **kwargs: Any):
|
|
|
- # Deprecated, will remove once getsentry is updated.
|
|
|
global _role_created
|
|
|
if "pytest" not in sys.argv[0] or not settings.USE_ROLE_SWAPPING_IN_TESTS:
|
|
|
return
|
|
@@ -299,7 +296,6 @@ post_migrate.connect(create_model_role_guards, dispatch_uid="create_model_role_g
|
|
|
|
|
|
|
|
|
def restrict_role(role: str, model: Any, revocation_type: str, using: str = "default") -> None:
|
|
|
- # Deprecated, will remove once getsentry is updated.
|
|
|
if router.db_for_write(model) != using:
|
|
|
return
|
|
|
|
|
@@ -341,43 +337,18 @@ def protected_table(table: str, operation: str) -> re.Pattern:
|
|
|
return re.compile(f'{operation}[^"]+"{table}"', re.IGNORECASE)
|
|
|
|
|
|
|
|
|
-_protected_operations: List[re.Pattern] = []
|
|
|
+protected_operations = (
|
|
|
+ protected_table("sentry_organizationmember", "insert"),
|
|
|
+ protected_table("sentry_organizationmember", "update"),
|
|
|
+ protected_table("sentry_organizationmember", "delete"),
|
|
|
+ protected_table("sentry_organization", "insert"),
|
|
|
+ protected_table("sentry_organization", "update"),
|
|
|
+ protected_table("sentry_organizationmapping", "insert"),
|
|
|
+ protected_table("sentry_organizationmapping", "update"),
|
|
|
+ protected_table("sentry_organizationmembermapping", "insert"),
|
|
|
+)
|
|
|
|
|
|
-
|
|
|
-def get_protected_operations() -> List[re.Pattern]:
|
|
|
- if len(_protected_operations):
|
|
|
- return _protected_operations
|
|
|
-
|
|
|
- # Protect Foreign Keys using hybrid cloud models from being deleted without using the
|
|
|
- # privileged user. Deletion should only occur when the developer is actively aware
|
|
|
- # of the need to generate outboxes.
|
|
|
- seen_models: MutableSet[type] = set()
|
|
|
- for app_config in apps.get_app_configs():
|
|
|
- for model in iter_models(app_config.name):
|
|
|
- for field in model._meta.fields:
|
|
|
- if not isinstance(field, HybridCloudForeignKey):
|
|
|
- continue
|
|
|
- fk_model = field.foreign_model
|
|
|
- if fk_model is None or fk_model in seen_models:
|
|
|
- continue
|
|
|
- seen_models.add(fk_model)
|
|
|
- _protected_operations.append(protected_table(fk_model._meta.db_table, "delete"))
|
|
|
-
|
|
|
- # Protect inserts/updates that require outbox messages.
|
|
|
- _protected_operations.extend(
|
|
|
- [
|
|
|
- protected_table("sentry_organizationmember", "insert"),
|
|
|
- protected_table("sentry_organizationmember", "update"),
|
|
|
- protected_table("sentry_organizationmember", "delete"),
|
|
|
- protected_table("sentry_organization", "insert"),
|
|
|
- protected_table("sentry_organization", "update"),
|
|
|
- protected_table("sentry_organizationmapping", "insert"),
|
|
|
- protected_table("sentry_organizationmapping", "update"),
|
|
|
- protected_table("sentry_organizationmembermapping", "insert"),
|
|
|
- ]
|
|
|
- )
|
|
|
-
|
|
|
- return _protected_operations
|
|
|
+fence_re = re.compile(r"select\s*\'(?P<operation>start|end)_role_override", re.IGNORECASE)
|
|
|
|
|
|
|
|
|
def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
@@ -400,7 +371,7 @@ def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
|
else:
|
|
|
raise AssertionError("Invalid fencing operation encounted")
|
|
|
|
|
|
- for protected in get_protected_operations():
|
|
|
+ for protected in protected_operations:
|
|
|
if protected.match(sql):
|
|
|
if fence_depth == 0:
|
|
|
msg = [
|
|
@@ -425,6 +396,8 @@ def validate_protected_queries(queries: Iterable[Dict[str, str]]) -> None:
|
|
|
|
|
|
|
|
|
def iter_models(app_name: str | None = None) -> Iterable[Type[Model]]:
|
|
|
+ from django.apps import apps
|
|
|
+
|
|
|
for app, app_models in apps.all_models.items():
|
|
|
if app == app_name or app_name is None:
|
|
|
for model in app_models.values():
|