@@ -2,8 +2,9 @@ from __future__ import annotations
import functools
import inspect
+import sys
from contextlib import contextmanager
-from typing import Any, Callable, Generator, Iterable, Set, Tuple, Type
+from typing import Any, Callable, Generator, Iterable, MutableMapping, MutableSet, Set, Tuple, Type
from unittest import TestCase
import pytest
@@ -11,6 +12,8 @@ from django.conf import settings
from django.db import connections, router
from django.db.models import Model
from django.db.models.fields.related import RelatedField
+from django.db.models.signals import post_migrate
+from django.db.transaction import get_connection
from django.test import override_settings
from sentry import deletions
@@ -186,29 +189,110 @@ def exempt_from_silo_limits() -> Generator[None, None, None]:
-def reset_test_role(role: str, using: str | None = None, create_role: bool | None = None) -> None:
- with connections["default"].cursor() as connection:
- connection.execute("SELECT 1 FROM pg_roles WHERE rolname = %s", [role])
- if connection.fetchone():
- connection.execute(f"REASSIGN OWNED BY {role} TO postgres")
- connection.execute(f"DROP OWNED BY {role} CASCADE")
- connection.execute(f"DROP ROLE {role}")
- connection.execute(f"CREATE ROLE {role}")
- connection.execute(f"GRANT USAGE ON SCHEMA public TO {role};")
- connection.execute(f"GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {role};")
- connection.execute(f"GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {role};")
+def reset_test_role(role: str, using: str, create_role: bool) -> None:
+ connection_names = [conn.alias for conn in connections.all()]
+ if create_role:
+ role_exists = False
+ with get_connection(using).cursor() as connection:
+ connection.execute("SELECT 1 FROM pg_roles WHERE rolname = %s", [role])
+ role_exists = connection.fetchone()
-def restrict_role_by_silo(mode: SiloMode, role: str) -> None:
- for model in iter_models():
- silo_limit = getattr(model._meta, "silo_limit", None)
- if silo_limit is None or mode not in silo_limit.modes:
- restrict_role(role, model, "ALL PRIVILEGES")
+ if role_exists:
+ # Drop role permissions on each connection, or we can't drop the role.
+ for alias in connection_names:
+ with get_connection(alias).cursor() as conn:
+ conn.execute(f"REASSIGN OWNED BY {role} TO postgres")
+ conn.execute(f"DROP OWNED BY {role} CASCADE")
+ # Drop and re-create the role as required.
+ with get_connection(using).cursor() as conn:
+ conn.execute(f"DROP ROLE {role}")
+ with get_connection(using).cursor() as conn:
+ conn.execute(f"CREATE ROLE {role}")
+ # Create permissions on the current connection as we'll build up permissions incrementally.
+ with get_connection(using).cursor() as conn:
+ conn.execute(f"GRANT USAGE ON SCHEMA public TO {role};")
+ conn.execute(f"GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {role};")
+ conn.execute(f"GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {role};")
+_role_created: bool = False
+_role_privileges_created: MutableMapping[str, bool] = {}
+def create_model_role_guards(app_config: Any, using: str, **kwargs: Any):
+ global _role_created
+ if "pytest" not in sys.modules:
+ return
+ from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
+ from sentry.models import (
+ Organization,
+ OrganizationMapping,
+ OrganizationMember,
+ OrganizationMemberMapping,
+ )
+ from sentry.testutils.silo import iter_models, reset_test_role, restrict_role
+ if not app_config or app_config.name != "sentry":
+ return
+ with get_connection(using).cursor() as conn:
+ conn.execute("SET ROLE 'postgres'")
+ if not _role_privileges_created.get(using, False):
+ reset_test_role(role="postgres_unprivileged", using=using, create_role=not _role_created)
+ _role_created = True
+ _role_privileges_created[using] = True
+ # Protect Foreign Keys using hybrid cloud models from being deleted without using the privileged user.
+ # Deletion should only occur when the developer is actively aware of the need to generate outboxes.
+ seen_models: MutableSet[type] = set()
+ for model in iter_models(app_config.name):
+ for field in model._meta.fields:
+ if not isinstance(field, HybridCloudForeignKey):
+ continue
+ fk_model = field.foreign_model
+ if fk_model is None or fk_model in seen_models:
+ continue
+ seen_models.add(fk_model)
+ restrict_role(
+ role="postgres_unprivileged", model=fk_model, revocation_type="DELETE", using=using
+ )
+ # Protect organization members from being updated without also invoking the correct outbox logic.
+ # If you hit test failures as a result of lacking these privileges, first ensure that you create the correct
+ # outboxes in a transaction, and cover that transaction with `in_test_psql_role_override`
+ restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="INSERT")
+ restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="UPDATE")
+ restrict_role(role="postgres_unprivileged", model=Organization, revocation_type="INSERT")
+ restrict_role(role="postgres_unprivileged", model=Organization, revocation_type="UPDATE")
+ restrict_role(role="postgres_unprivileged", model=OrganizationMapping, revocation_type="INSERT")
+ restrict_role(role="postgres_unprivileged", model=OrganizationMapping, revocation_type="UPDATE")
+ # OrganizationMember objects need to cascade, but they can't use the standard hybrid cloud foreign key because the
+ # identifiers are not snowflake ids.
+ restrict_role(role="postgres_unprivileged", model=OrganizationMember, revocation_type="DELETE")
+ restrict_role(
+ role="postgres_unprivileged", model=OrganizationMemberMapping, revocation_type="INSERT"
+ )
+# Listen to django's migration signal so that we're not trapped inside
+# test method transactions.
+post_migrate.connect(create_model_role_guards, dispatch_uid="create_model_role_guards", weak=False)
+def restrict_role(role: str, model: Any, revocation_type: str, using: str = "default") -> None:
+ if router.db_for_write(model) != using:
+ return
-def restrict_role(role: str, model: Any, revocation_type: str) -> None:
using = router.db_for_write(model)
- with connections[using].cursor() as connection:
+ with get_connection(using).cursor() as connection:
connection.execute(f"REVOKE {revocation_type} ON public.{model._meta.db_table} FROM {role}")