Browse Source

ref(Hybrid-Cloud): Patches transaction atomic to choose and assert correct silo for a given silo mode (#51967)

Gabe Villalobos 1 year ago
parent
commit
72c2ee94c1

+ 3 - 0
src/sentry/runner/initializer.py

@@ -9,6 +9,7 @@ from typing import Any, TypeVar
 import click
 from django.conf import settings
 
+from sentry.silo.patches.silo_aware_transaction_patch import patch_silo_aware_atomic
 from sentry.utils import metrics, warnings
 from sentry.utils.sdk import configure_sdk
 from sentry.utils.warnings import DeprecatedSettingWarning
@@ -374,6 +375,8 @@ def initialize_app(config: dict[str, Any], skip_service_validation: bool = False
 
     monkeypatch_django_migrations()
 
+    patch_silo_aware_atomic()
+
     apply_legacy_settings(settings)
 
     bind_cache_to_option_store()

+ 58 - 0
src/sentry/silo/patches/silo_aware_transaction_patch.py

@@ -0,0 +1,58 @@
+from typing import Optional
+
+from django import get_version
+from django.db import router, transaction
+from django.db.transaction import Atomic
+
+_default_atomic_impl = transaction.atomic
+
+
+class MismatchedSiloTransactionError(Exception):
+    pass
+
+
+def siloed_atomic(using: Optional[str] = None, savepoint: bool = True) -> Atomic:
+    from sentry.models import ControlOutbox, RegionOutbox
+    from sentry.silo import SiloMode
+
+    current_silo_mode = SiloMode.get_current_mode()
+    if not using:
+        model_to_route_to = RegionOutbox if current_silo_mode == SiloMode.REGION else ControlOutbox
+        using = router.db_for_write(model_to_route_to)
+
+    both_silos_route_to_same_db = router.db_for_write(ControlOutbox) == router.db_for_write(
+        RegionOutbox
+    )
+
+    if both_silos_route_to_same_db or current_silo_mode == SiloMode.MONOLITH:
+        pass
+    elif (
+        using == router.db_for_write(ControlOutbox)
+        and SiloMode.get_current_mode() != SiloMode.CONTROL
+    ):
+        raise MismatchedSiloTransactionError(
+            f"Cannot use transaction.atomic({using}) in Control Mode"
+        )
+
+    elif (
+        using == router.db_for_write(RegionOutbox)
+        and SiloMode.get_current_mode() != SiloMode.REGION
+    ):
+        raise MismatchedSiloTransactionError(
+            f"Cannot use transaction.atomic({using}) in Region Mode"
+        )
+
+    return _default_atomic_impl(using=using, savepoint=savepoint)
+
+
+def patch_silo_aware_atomic():
+    global _default_atomic_impl
+
+    current_django_version = get_version()
+    assert current_django_version.startswith("2.2."), (
+        "Newer versions of Django have an additional 'durable' parameter in atomic,"
+        + " verify the signature before updating the version check."
+    )
+
+    _default_atomic_impl = transaction.atomic
+    transaction.atomic = siloed_atomic  # type:ignore

+ 3 - 2
tests/sentry/deletions/test_sentry_app_installations.py

@@ -1,5 +1,6 @@
 import pytest
-from django.db import connection
+from django.db import router
+from django.db.transaction import get_connection
 
 from sentry import deletions
 from sentry.models import (
@@ -84,7 +85,7 @@ class TestSentryAppIntallationDeletionTask(TestCase):
 
         # The QuerySet will automatically NOT include deleted installs, so we
         # use a raw sql query to ensure it still exists.
-        c = connection.cursor()
+        c = get_connection(router.db_for_write(SentryAppInstallation)).cursor()
         c.execute(
             "SELECT COUNT(1) "
             "FROM sentry_sentryappinstallation "

+ 64 - 0
tests/sentry/silo/test_silo_aware_transaction_patch.py

@@ -0,0 +1,64 @@
+import os
+
+import pytest
+from django.db import router
+from django.test import override_settings
+
+from sentry.models import Organization, OrganizationMapping
+from sentry.silo import SiloMode
+from sentry.silo.patches.silo_aware_transaction_patch import (
+    MismatchedSiloTransactionError,
+    siloed_atomic,
+)
+from sentry.testutils import TestCase
+
+
+def is_running_in_split_db_mode() -> bool:
+    return bool(os.environ.get("SENTRY_USE_SPLIT_DBS"))
+
+
+class TestSiloAwareTransactionPatchInSingleDbMode(TestCase):
+    @pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode")
+    def test_routes_to_correct_db_in_control_silo(self):
+        with override_settings(SILO_MODE=SiloMode.CONTROL):
+            transaction_in_test = siloed_atomic()
+            assert transaction_in_test.using == "default"
+
+    @pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode")
+    def test_routes_to_correct_db_in_region_silo(self):
+
+        with override_settings(SILO_MODE=SiloMode.REGION):
+            transaction_in_test = siloed_atomic()
+            assert transaction_in_test.using == "default"
+
+    def test_correctly_accepts_using_for_atomic(self):
+        transaction_in_test = siloed_atomic(using="foobar")
+        assert transaction_in_test.using == "foobar"
+
+
+class TestSiloAwareTransactionPatchInSplitDbMode(TestCase):
+    @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
+    def test_routes_to_correct_db_in_control_silo(self):
+        with override_settings(SILO_MODE=SiloMode.REGION):
+            transaction_in_test = siloed_atomic()
+            assert transaction_in_test.using == "default"
+
+    @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
+    def test_fails_if_silo_mismatch_with_using_in_region_silo(self):
+        with override_settings(SILO_MODE=SiloMode.REGION), pytest.raises(
+            MismatchedSiloTransactionError
+        ):
+            siloed_atomic(using=router.db_for_write(OrganizationMapping))
+
+    @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
+    def test_routes_to_correct_db_in_region_silo(self):
+        with override_settings(SILO_MODE=SiloMode.CONTROL):
+            transaction_in_test = siloed_atomic()
+            assert transaction_in_test.using == "control"
+
+    @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
+    def test_fails_if_silo_mismatch_with_using_in_control_silo(self):
+        with override_settings(SILO_MODE=SiloMode.CONTROL), pytest.raises(
+            MismatchedSiloTransactionError
+        ):
+            siloed_atomic(using=router.db_for_write(Organization))