Browse Source

ref(hc): Add enforce_constraints for nested transactions (#55123)

Addresses issue where nested transactions do not check constraints by
default.
Create helper and use it in any hybrid cloud service.
Zach Collins 1 year ago
parent
commit
539bb5bdf7

+ 1 - 1
migrations_lockfile.txt

@@ -7,5 +7,5 @@ will then be regenerated, and you should be able to merge without conflicts.
 
 nodestore: 0002_nodestore_no_dictfield
 replays: 0003_add_size_to_recording_segment
-sentry: 0535_add_created_date_to_outbox_model
+sentry: 0536_backfill_tombstones
 social_auth: 0002_default_auto_field

+ 14 - 0
src/sentry/db/postgres/transactions.py

@@ -5,6 +5,7 @@ import threading
 
 from django.conf import settings
 from django.db import connections, transaction
+from django.db.transaction import Atomic, get_connection
 
 from sentry.silo import SiloMode
 from sentry.utils.env import in_test_environment
@@ -99,3 +100,16 @@ def in_test_assert_no_transaction(msg: str):
         assert not hybrid_cloud.simulated_transaction_watermarks.connection_transaction_depth_above_watermark(
             connection=conn
         ), msg
+
+
+@contextlib.contextmanager
+def enforce_constraints(transaction: Atomic):
+    """
+    nested transaction in django do not check constraints by default, meaning IntegrityErrors can 'float' to callers
+    of functions that happen to wrap with additional transaction scopes.  Using this context manager around a transaction
+    will force constraints to be checked at the en of that transaction (or savepoint) even if it happens to be nested,
+    allowing you to handle the IntegrityError correctly.
+    """
+    with transaction:
+        yield
+        get_connection(transaction.using or "default").check_constraints()

+ 59 - 0
src/sentry/migrations/0536_backfill_tombstones.py

@@ -0,0 +1,59 @@
+# Generated by Django 3.2.20 on 2023-08-01 20:51
+
+from django.db import IntegrityError, migrations, transaction
+from django.db.models import Max
+
+from sentry.new_migrations.migrations import CheckedMigration
+from sentry.utils.query import RangeQuerySetWrapperWithProgressBar
+
+
+def backfill_tombstones(apps, schema_editor):
+    RegionTombstone = apps.get_model("sentry", "RegionTombstone")
+    ControlTombstone = apps.get_model("sentry", "ControlTombstone")
+
+    max_rt = RegionTombstone.objects.aggregate(Max("id"))["id__max"] or 0
+    max_ct = ControlTombstone.objects.aggregate(Max("id"))["id__max"] or 0
+
+    for rt in RangeQuerySetWrapperWithProgressBar(RegionTombstone.objects.filter(id__lte=max_rt)):
+        try:
+            with transaction.atomic("default"):
+                ControlTombstone.objects.create(
+                    table_name=rt.table_name, object_identifier=rt.object_identifier
+                )
+        except IntegrityError:
+            pass
+
+    for ct in RangeQuerySetWrapperWithProgressBar(ControlTombstone.objects.filter(id__lte=max_ct)):
+        try:
+            with transaction.atomic("default"):
+                RegionTombstone.objects.create(
+                    table_name=ct.table_name, object_identifier=ct.object_identifier
+                )
+        except IntegrityError:
+            pass
+
+
+class Migration(CheckedMigration):
+    # This flag is used to mark that a migration shouldn't be automatically run in production. For
+    # the most part, this should only be used for operations where it's safe to run the migration
+    # after your code has deployed. So this should not be used for most operations that alter the
+    # schema of a table.
+    # Here are some things that make sense to mark as dangerous:
+    # - Large data migrations. Typically we want these to be run manually by ops so that they can
+    #   be monitored and not block the deploy for a long period of time while they run.
+    # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
+    #   have ops run this and not block the deploy. Note that while adding an index is a schema
+    #   change, it's completely safe to run the operation after the code has deployed.
+    is_dangerous = False
+
+    dependencies = [
+        ("sentry", "0535_add_created_date_to_outbox_model"),
+    ]
+
+    operations = [
+        migrations.RunPython(
+            backfill_tombstones,
+            migrations.RunPython.noop,
+            hints={"tables": ["sentry_controltombstone", "sentry_regiontombstone"]},
+        ),
+    ]

+ 5 - 4
src/sentry/models/outbox.py

@@ -31,6 +31,7 @@ from sentry.db.models import (
 )
 from sentry.db.postgres.transactions import (
     django_test_transaction_water_mark,
+    enforce_constraints,
     in_test_assert_no_transaction,
 )
 from sentry.services.hybrid_cloud import REGION_NAME_LENGTH
@@ -501,7 +502,7 @@ _outbox_context = OutboxContext()
 @contextlib.contextmanager
 def outbox_context(
     inner: Atomic | None = None, flush: bool | None = None
-) -> Generator[None, None, None]:
+) -> Generator[Atomic | None, None, None]:
     # If we don't specify our flush, use the outer specified override
     if flush is None:
         flush = _outbox_context.flushing_enabled
@@ -515,16 +516,16 @@ def outbox_context(
 
     if inner:
         assert inner.using is not None
-        with unguarded_write(using=inner.using), inner:
+        with unguarded_write(using=inner.using), enforce_constraints(inner):
             _outbox_context.flushing_enabled = flush
             try:
-                yield
+                yield inner
             finally:
                 _outbox_context.flushing_enabled = original
     else:
         _outbox_context.flushing_enabled = flush
         try:
-            yield
+            yield None
         finally:
             _outbox_context.flushing_enabled = original
 

+ 9 - 14
src/sentry/models/team.py

@@ -313,20 +313,15 @@ class Team(Model, SnowflakeIdMixin):
         ).delete()
 
         if new_team != self:
-            # Delete the old team
-            cursor = connections[router.db_for_write(Team)].cursor()
-            # we use a cursor here to avoid automatic cascading of relations
-            # in Django
-            try:
-                with outbox_context(transaction.atomic(router.db_for_write(Team)), flush=False):
-                    cursor.execute("DELETE FROM sentry_team WHERE id = %s", [self.id])
-                    self.outbox_for_update().save()
-                    cursor.execute("DELETE FROM sentry_actor WHERE team_id = %s", [new_team.id])
-            finally:
-                cursor.close()
-
-            # Change whatever new_team's actor is to the one from the old team.
-            with transaction.atomic(router.db_for_write(Team)):
+            with outbox_context(
+                transaction.atomic(router.db_for_write(Team)), flush=False
+            ), connections[router.db_for_write(Team)].cursor() as cursor:
+                # we use a cursor here to avoid automatic cascading of relations
+                # in Django
+                cursor.execute("DELETE FROM sentry_team WHERE id = %s", [self.id])
+                self.outbox_for_update().save()
+                cursor.execute("DELETE FROM sentry_actor WHERE team_id = %s", [new_team.id])
+
                 Actor.objects.filter(id=self.actor_id).update(team_id=new_team.id)
                 new_team.actor_id = self.actor_id
                 new_team.save()

+ 5 - 0
src/sentry/models/tombstone.py

@@ -11,6 +11,7 @@ from sentry.db.models import (
     Model,
     control_silo_only_model,
     region_silo_only_model,
+    sane_repr,
 )
 from sentry.silo import SiloMode
 
@@ -58,9 +59,13 @@ class RegionTombstone(TombstoneBase):
         app_label = "sentry"
         db_table = "sentry_regiontombstone"
 
+    __repr__ = sane_repr("id", "table_name", "object_identifier")
+
 
 @control_silo_only_model
 class ControlTombstone(TombstoneBase):
     class Meta:
         app_label = "sentry"
         db_table = "sentry_controltombstone"
+
+    __repr__ = sane_repr("id", "table_name", "object_identifier")

+ 16 - 3
src/sentry/receivers/outbox/__init__.py

@@ -51,7 +51,12 @@ from __future__ import annotations
 
 from typing import Any, Protocol, Type, TypeVar
 
-from sentry.services.hybrid_cloud.tombstone import RpcTombstone, tombstone_service
+from sentry.services.hybrid_cloud.tombstone import (
+    RpcTombstone,
+    control_tombstone_service,
+    region_tombstone_service,
+)
+from sentry.silo import SiloMode
 
 
 class ModelLike(Protocol):
@@ -61,10 +66,18 @@ class ModelLike(Protocol):
 T = TypeVar("T", bound=ModelLike)
 
 
-def maybe_process_tombstone(model: Type[T], object_identifier: int) -> T | None:
+def maybe_process_tombstone(
+    model: Type[T], object_identifier: int, region_name: str | None = None
+) -> T | None:
     if instance := model.objects.filter(id=object_identifier).last():
         return instance
 
     tombstone = RpcTombstone(table_name=model._meta.db_table, identifier=object_identifier)
-    tombstone_service.record_remote_tombstone(tombstone=tombstone)
+    # tombstones sent from control must have a region name, and monolith needs to provide a region_name
+    if region_name or SiloMode.get_current_mode() == SiloMode.CONTROL:
+        region_tombstone_service.record_remote_tombstone(
+            region_name=region_name, tombstone=tombstone
+        )
+    else:
+        control_tombstone_service.record_remote_tombstone(tombstone=tombstone)
     return None

+ 19 - 9
src/sentry/receivers/outbox/control.py

@@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
 
 @receiver(process_control_outbox, sender=OutboxCategory.USER_UPDATE)
 def process_user_updates(object_identifier: int, region_name: str, **kwds: Any):
-    if (user := maybe_process_tombstone(User, object_identifier)) is None:
+    if (user := maybe_process_tombstone(User, object_identifier, region_name=region_name)) is None:
         return
     organization_service.update_region_user(
         user=RpcRegionUser(
@@ -48,33 +48,43 @@ def process_user_updates(object_identifier: int, region_name: str, **kwds: Any):
 
 
 @receiver(process_control_outbox, sender=OutboxCategory.INTEGRATION_UPDATE)
-def process_integration_updates(object_identifier: int, **kwds: Any):
-    if (integration := maybe_process_tombstone(Integration, object_identifier)) is None:
+def process_integration_updates(object_identifier: int, region_name: str, **kwds: Any):
+    if (
+        integration := maybe_process_tombstone(
+            Integration, object_identifier, region_name=region_name
+        )
+    ) is None:
         return
     integration  # Currently we do not sync any other integration changes, but if we did, you can use this variable.
 
 
 @receiver(process_control_outbox, sender=OutboxCategory.API_APPLICATION_UPDATE)
-def process_api_application_updates(object_identifier: int, **kwds: Any):
-    if (api_application := maybe_process_tombstone(ApiApplication, object_identifier)) is None:
+def process_api_application_updates(object_identifier: int, region_name: str, **kwds: Any):
+    if (
+        api_application := maybe_process_tombstone(
+            ApiApplication, object_identifier, region_name=region_name
+        )
+    ) is None:
         return
     api_application  # Currently we do not sync any other api application changes, but if we did, you can use this variable.
 
 
 @receiver(process_control_outbox, sender=OutboxCategory.SENTRY_APP_INSTALLATION_UPDATE)
-def process_sentry_app_installation_updates(object_identifier: int, **kwds: Any):
+def process_sentry_app_installation_updates(object_identifier: int, region_name: str, **kwds: Any):
     if (
-        sentry_app_installation := maybe_process_tombstone(SentryAppInstallation, object_identifier)
+        sentry_app_installation := maybe_process_tombstone(
+            SentryAppInstallation, object_identifier, region_name=region_name
+        )
     ) is None:
         return
     sentry_app_installation  # Currently we do not sync any other api application changes, but if we did, you can use this variable.
 
 
 @receiver(process_control_outbox, sender=OutboxCategory.ORGANIZATION_INTEGRATION_UPDATE)
-def process_organization_integration_update(object_identifier: int, **kwds: Any):
+def process_organization_integration_update(object_identifier: int, region_name: str, **kwds: Any):
     if (
         organization_integration := maybe_process_tombstone(
-            OrganizationIntegration, object_identifier
+            OrganizationIntegration, object_identifier, region_name=region_name
         )
     ) is None:
         return

+ 4 - 2
src/sentry/services/hybrid_cloud/log/impl.py

@@ -2,8 +2,9 @@ from __future__ import annotations
 
 import datetime
 
-from django.db import IntegrityError, router
+from django.db import IntegrityError, router, transaction
 
+from sentry.db.postgres.transactions import enforce_constraints
 from sentry.models import AuditLogEntry, OutboxCategory, OutboxScope, RegionOutbox, User, UserIP
 from sentry.services.hybrid_cloud.log import AuditLogEvent, LogService, UserIpEvent
 from sentry.silo import unguarded_write
@@ -13,7 +14,8 @@ class DatabaseBackedLogService(LogService):
     def record_audit_log(self, *, event: AuditLogEvent) -> None:
         entry = AuditLogEntry.from_event(event)
         try:
-            entry.save()
+            with enforce_constraints(transaction.atomic(router.db_for_write(AuditLogEntry))):
+                entry.save()
         except IntegrityError as e:
             error_message = str(e)
             if '"auth_user"' in error_message:

+ 5 - 2
src/sentry/services/hybrid_cloud/organization/impl.py

@@ -7,6 +7,7 @@ from django.dispatch import Signal
 
 from sentry import roles
 from sentry.api.serializers import serialize
+from sentry.db.postgres.transactions import enforce_constraints
 from sentry.models import (
     Activity,
     ControlOutbox,
@@ -449,7 +450,9 @@ class DatabaseBackedOrganizationService(OrganizationService):
 
         for team in from_member.teams.all():
             try:
-                with transaction.atomic(router.db_for_write(OrganizationMemberTeam)):
+                with enforce_constraints(
+                    transaction.atomic(router.db_for_write(OrganizationMemberTeam))
+                ):
                     OrganizationMemberTeam.objects.create(organizationmember=to_member, team=team)
             except IntegrityError:
                 pass
@@ -468,7 +471,7 @@ class DatabaseBackedOrganizationService(OrganizationService):
                 user_id=from_user_id, project__organization_id=organization_id
             ):
                 try:
-                    with transaction.atomic(router.db_for_write(model)):
+                    with enforce_constraints(transaction.atomic(router.db_for_write(model))):
                         obj.update(user_id=to_user_id)
                 except IntegrityError:
                     pass

Some files were not shown because too many files changed in this diff