Просмотр исходного кода

Revert "ref(hybrid-cloud): Modifies outbox processing to use an advisory lock for processing (#58264)"

This reverts commit 7a2ae94324a2ba5ac6c849e3406a0b5f9d7e8c21.

Co-authored-by: GabeVillalobos <5643012+GabeVillalobos@users.noreply.github.com>
getsentry-bot 1 год назад
Родитель
Сommit
56b1cf3317

+ 0 - 27
src/sentry/db/postgres/advisory_lock.py

@@ -1,27 +0,0 @@
-import contextlib
-
-from django.db import connections
-
-from sentry.utils import metrics
-
-
-@contextlib.contextmanager
-def advisory_lock(using: str, lock_id: int, lock_timeout_seconds: int, lock_metric_name: str):
-    # Obtain the previously set lock timeout, so we can reset it after unlocking.
-    with connections[using].cursor() as cursor:
-        cursor.execute("show lock_timeout")
-        orig_timeout = cursor.fetchone()[0]
-        cursor.execute(f"SET local lock_timeout='{lock_timeout_seconds}s'")
-
-    try:
-        with metrics.timer(lock_metric_name), connections[using].cursor() as cursor:
-            cursor.execute("SELECT pg_advisory_lock(%s)", [lock_id])
-        yield
-    finally:
-        try:
-            with connections[using].cursor() as cursor:
-                cursor.execute("SELECT pg_advisory_unlock(%s)", [lock_id])
-                cursor.execute(f"SET lock_timeout='{orig_timeout}'")
-        except Exception:
-            # If unlocking fails for any reason, close the connection in order to free the lock.
-            connections[using].close()

+ 0 - 7
src/sentry/hybridcloud/options.py

@@ -112,10 +112,3 @@ register(
     default=False,
     flags=FLAG_AUTOMATOR_MODIFIABLE,
 )
-
-register(
-    "hybrid_cloud.outbox_lock.raise_on_contention",
-    type=Bool,
-    default=False,
-    flags=FLAG_AUTOMATOR_MODIFIABLE,
-)

+ 81 - 56
src/sentry/models/outbox.py

@@ -22,7 +22,6 @@ from typing import (
     cast,
 )
 
-import mmh3
 import sentry_sdk
 from django import db
 from django.db import OperationalError, connections, models, router, transaction
@@ -34,7 +33,6 @@ from django.utils import timezone
 from sentry_sdk.tracing import Span
 from typing_extensions import Self
 
-from sentry import options
 from sentry.backup.scopes import RelocationScope
 from sentry.db.models import (
     BaseModel,
@@ -47,10 +45,13 @@ from sentry.db.models import (
     sane_repr,
 )
 from sentry.db.models.outboxes import HasControlReplicationHandlers, ReplicatedRegionModel
-from sentry.db.postgres.advisory_lock import advisory_lock
-from sentry.db.postgres.transactions import enforce_constraints, in_test_assert_no_transaction
+from sentry.db.postgres.transactions import (
+    django_test_transaction_water_mark,
+    enforce_constraints,
+    in_test_assert_no_transaction,
+)
 from sentry.services.hybrid_cloud import REGION_NAME_LENGTH
-from sentry.silo import unguarded_write
+from sentry.silo import SiloMode, unguarded_write
 from sentry.utils import metrics
 
 THE_PAST = datetime.datetime(2016, 8, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
@@ -433,18 +434,37 @@ class OutboxBase(Model):
     @classmethod
     def prepare_next_from_shard(cls, row: Mapping[str, Any]) -> Self | None:
         using = router.db_for_write(cls)
-        with transaction.atomic(using=using):
-            next_outbox: OutboxBase | None
-            next_outbox = cls(**row).selected_messages_in_shard().order_by("id").first()
-            if not next_outbox:
-                return None
+        try:
+            with transaction.atomic(using=using, savepoint=False):
+                next_outbox: OutboxBase | None
+                next_outbox = (
+                    cls(**row)
+                    .selected_messages_in_shard()
+                    .order_by("id")
+                    .select_for_update(nowait=True)
+                    .first()
+                )
+                if not next_outbox:
+                    return None
+
+                # We rely on 'proof of failure by remaining' to handle retries -- basically, by scheduling this shard, we
+                # expect all objects to be drained before the next schedule comes around, or else we will run again.
+                # Note that the system does not strongly protect against concurrent processing -- this is expected in the
+                # case of drains, for instance.
+                now = timezone.now()
+                next_outbox.selected_messages_in_shard().update(
+                    scheduled_for=next_outbox.next_schedule(now), scheduled_from=now
+                )
 
-            now = timezone.now()
-            next_outbox.selected_messages_in_shard().update(
-                scheduled_for=next_outbox.next_schedule(now), scheduled_from=now
-            )
+                return next_outbox
 
-            return next_outbox
+        except OperationalError as e:
+            # If concurrent locking is happening on the table, gracefully pass and allow
+            # that work to process.
+            if "LockNotAvailable" in str(e):
+                return None
+            else:
+                raise
 
     def key_from(self, attrs: Iterable[str]) -> Mapping[str, Any]:
         return {k: _ensure_not_null(k, getattr(self, k)) for k in attrs}
@@ -504,48 +524,28 @@ class OutboxBase(Model):
         metrics.incr("outbox.saved", 1, tags=tags)
         super().save(**kwds)
 
-    def lock_id(self, attrs: Iterable[str]) -> int:
-        # 64 bit integer that roughly encodes a unique, serializable lock identifier
-        return mmh3.hash64(".".join(str(getattr(self, attr)) for attr in attrs))[0]
-
     @contextlib.contextmanager
     def process_shard(
-        self,
-        latest_shard_row: OutboxBase | None,
-        lock_timeout: int = 5,
+        self, latest_shard_row: OutboxBase | None
     ) -> Generator[OutboxBase | None, None, None]:
+        flush_all: bool = not bool(latest_shard_row)
+        next_shard_row: OutboxBase | None
         using: str = db.router.db_for_write(type(self))
+        with transaction.atomic(using=using), django_test_transaction_water_mark(using=using):
+            try:
+                next_shard_row = (
+                    self.selected_messages_in_shard(latest_shard_row=latest_shard_row)
+                    .select_for_update(nowait=flush_all)
+                    .first()
+                )
+            except OperationalError as e:
+                if "LockNotAvailable" in str(e):
+                    # If a non task flush process is running already, allow it to proceed without contention.
+                    next_shard_row = None
+                else:
+                    raise e
 
-        shard_lock_id = self.lock_id(self.sharding_columns)
-        try:
-            with advisory_lock(
-                using=using,
-                lock_id=shard_lock_id,
-                lock_timeout_seconds=lock_timeout,
-                lock_metric_name="outbox.process_shard.acquire_lock",
-            ):
-                next_shard_row = self.selected_messages_in_shard(
-                    latest_shard_row=latest_shard_row
-                ).first()
-                yield next_shard_row
-        except OperationalError as e:
-            if latest_shard_row:
-                next_shard_row = self.selected_messages_in_shard(
-                    latest_shard_row=latest_shard_row
-                ).first()
-
-                # If performing a synchronous flush, we have an expectation that writes up to the highest
-                # id seen since we started to flush has been processed.  If that is not the case in a high
-                # contention scenario, we should raise an exception to prevent breaking read after write invariance.
-                if next_shard_row is not None:
-                    # TODO: Remove me -- once we get deployed past canary, we want these exceptions to block any
-                    # contentions that is occurring to preserve the read after write invariance.
-                    if options.get("hybrid_cloud.outbox_lock.raise_on_contention"):
-                        raise OutboxFlushError(
-                            f"Could not flush shard category={self.category} due to lock contention.",
-                            self,
-                        ) from e
-            yield None
+            yield next_shard_row
 
     @contextlib.contextmanager
     def process_coalesced(self) -> Generator[OutboxBase | None, None, None]:
@@ -619,9 +619,6 @@ class OutboxBase(Model):
     def drain_shard(
         self, flush_all: bool = False, _test_processing_barrier: threading.Barrier | None = None
     ) -> None:
-        # Do not waste too much time in flush_all case on a contentious lock.
-        lock_timeout = 5 if not flush_all else 1
-
         in_test_assert_no_transaction(
             "drain_shard should only be called outside of any active transaction!"
         )
@@ -637,7 +634,7 @@ class OutboxBase(Model):
 
         shard_row: OutboxBase | None
         while True:
-            with self.process_shard(latest_shard_row, lock_timeout=lock_timeout) as shard_row:
+            with self.process_shard(latest_shard_row) as shard_row:
                 if shard_row is None:
                     break
 
@@ -783,6 +780,16 @@ class ControlOutbox(ControlOutboxBase):
         )
 
 
+def outbox_silo_modes() -> List[SiloMode]:
+    cur = SiloMode.get_current_mode()
+    result: List[SiloMode] = []
+    if cur != SiloMode.REGION:
+        result.append(SiloMode.CONTROL)
+    if cur != SiloMode.CONTROL:
+        result.append(SiloMode.REGION)
+    return result
+
+
 class OutboxContext(threading.local):
     flushing_enabled: bool | None = None
 
@@ -823,3 +830,21 @@ def outbox_context(
 
 process_region_outbox = Signal()  # ["payload", "object_identifier"]
 process_control_outbox = Signal()  # ["payload", "region_name", "object_identifier"]
+
+
+# Add this in after we successfully deploy, the job.
+# @receiver(post_migrate, weak=False, dispatch_uid="schedule_backfill_outboxes")
+# def schedule_backfill_outboxes(app_config, using, **kwargs):
+#     from sentry.tasks.backfill_outboxes import (
+#         schedule_backfill_outbox_jobs,
+#         schedule_backfill_outbox_jobs_control,
+#     )
+#     from sentry.utils.env import in_test_environment
+#
+#     if in_test_environment():
+#         return
+#
+#     if SiloMode.get_current_mode() != SiloMode.REGION:
+#         schedule_backfill_outbox_jobs_control.delay()
+#     if SiloMode.get_current_mode() != SiloMode.CONTROL:
+#         schedule_backfill_outbox_jobs.delay()

+ 1 - 3
src/sentry/services/hybrid_cloud/replica/impl.py

@@ -77,10 +77,8 @@ def get_conflicting_unique_columns(
     scope_controlled_columns: List[str]
     if scope == scope.USER_SCOPE:
         scope_controlled_columns = [get_foreign_key_column(destination, User)]
+
         if isinstance(destination, AuthIdentityReplica):
-            # TODO: Unique columns along the auth identity shard are safe but not provably safe with this
-            # logic at the moment.  This scope_controlled_column override is an adhoc statement of the safety
-            # of this particular unique indexed column.
             scope_controlled_columns.append("ident")
     elif scope == scope.ORGANIZATION_SCOPE:
         scope_controlled_columns = list(

+ 32 - 34
src/sentry/tasks/check_auth.py

@@ -7,7 +7,6 @@ from django.utils import timezone
 from sentry.auth.exceptions import IdentityNotValid
 from sentry.models.authidentity import AuthIdentity
 from sentry.models.organizationmembermapping import OrganizationMemberMapping
-from sentry.models.outbox import outbox_context
 from sentry.services.hybrid_cloud.organization import RpcOrganizationMember, organization_service
 from sentry.silo import unguarded_write
 from sentry.silo.base import SiloMode
@@ -71,41 +70,40 @@ def check_auth_identity(auth_identity_id, **kwargs):
     prev_is_valid = not getattr(om.flags, "sso:invalid")
 
     provider = auth_provider.get_provider()
-    with outbox_context(flush=False):
-        try:
-            provider.refresh_identity(auth_identity)
-        except IdentityNotValid as exc:
-            if prev_is_valid:
-                logger.warning(
-                    "AuthIdentity(id=%s) notified as not valid: %s",
-                    auth_identity_id,
-                    str(exc),
-                    exc_info=True,
-                )
-                metrics.incr("auth.identities.invalidated", skip_internal=False)
-            is_linked = False
-            is_valid = False
-        except Exception as exc:
-            # to ensure security we count any kind of error as an invalidation
-            # event
-            metrics.incr("auth.identities.refresh_error", skip_internal=False)
-            logger.exception(
-                "AuthIdentity(id=%s) returned an error during validation: %s",
+    try:
+        provider.refresh_identity(auth_identity)
+    except IdentityNotValid as exc:
+        if prev_is_valid:
+            logger.warning(
+                "AuthIdentity(id=%s) notified as not valid: %s",
                 auth_identity_id,
                 str(exc),
+                exc_info=True,
             )
-            is_linked = True
-            is_valid = False
-        else:
-            is_linked = True
-            is_valid = True
+            metrics.incr("auth.identities.invalidated", skip_internal=False)
+        is_linked = False
+        is_valid = False
+    except Exception as exc:
+        # to ensure security we count any kind of error as an invalidation
+        # event
+        metrics.incr("auth.identities.refresh_error", skip_internal=False)
+        logger.exception(
+            "AuthIdentity(id=%s) returned an error during validation: %s",
+            auth_identity_id,
+            str(exc),
+        )
+        is_linked = True
+        is_valid = False
+    else:
+        is_linked = True
+        is_valid = True
 
-        if getattr(om.flags, "sso:linked") != is_linked:
-            with unguarded_write(using=router.db_for_write(OrganizationMemberMapping)):
-                # flags are not replicated, so it's ok not to create outboxes here.
-                setattr(om.flags, "sso:linked", is_linked)
-                setattr(om.flags, "sso:invalid", not is_valid)
-                organization_service.update_membership_flags(organization_member=om)
+    if getattr(om.flags, "sso:linked") != is_linked:
+        with unguarded_write(using=router.db_for_write(OrganizationMemberMapping)):
+            # flags are not replicated, so it's ok not to create outboxes here.
+            setattr(om.flags, "sso:linked", is_linked)
+            setattr(om.flags, "sso:invalid", not is_valid)
+            organization_service.update_membership_flags(organization_member=om)
 
-        now = timezone.now()
-        auth_identity.update(last_verified=now, last_synced=now)
+    now = timezone.now()
+    auth_identity.update(last_verified=now, last_synced=now)

+ 1 - 32
tests/sentry/models/test_outbox.py

@@ -8,8 +8,7 @@ from unittest.mock import call, patch
 import pytest
 import responses
 from django.conf import settings
-from django.db import connections, router
-from django.db.transaction import get_connection
+from django.db import connections
 from django.test import RequestFactory
 from pytest import raises
 from rest_framework import status
@@ -20,7 +19,6 @@ from sentry.models.organizationmemberteam import OrganizationMemberTeam
 from sentry.models.organizationmemberteamreplica import OrganizationMemberTeamReplica
 from sentry.models.outbox import (
     ControlOutbox,
-    OutboxBase,
     OutboxCategory,
     OutboxFlushError,
     OutboxScope,
@@ -33,7 +31,6 @@ from sentry.silo import SiloMode
 from sentry.tasks.deliver_from_outbox import enqueue_outbox_jobs
 from sentry.testutils.cases import TestCase, TransactionTestCase
 from sentry.testutils.factories import Factories
-from sentry.testutils.helpers import override_options
 from sentry.testutils.helpers.datetime import freeze_time
 from sentry.testutils.outbox import outbox_runner
 from sentry.testutils.region import override_regions
@@ -393,34 +390,6 @@ class OutboxDrainTest(TransactionTestCase):
 
         assert mock_process_region_outbox.call_count == 2
 
-    def test_holding_lock_too_long(self):
-        with override_options(
-            {
-                "hybrid_cloud.outbox_lock.raise_on_contention": True,
-            }
-        ):
-            outbox: OutboxBase = OrganizationMember(
-                id=1, organization_id=3, user_id=1
-            ).outbox_for_update()
-            with outbox_context(flush=False):
-                outbox.save()
-
-            def test_inside_locked():
-                nonlocal outbox
-                conn = get_connection(router.db_for_write(RegionOutbox))
-                with conn.cursor() as cursor:
-                    cursor.execute("SET lock_timeout = '1s'")
-                with outbox.process_shard(RegionOutbox(id=0)) as shard_outbox:
-                    assert shard_outbox is None
-                with pytest.raises(OutboxFlushError):
-                    with outbox.process_shard(RegionOutbox(id=outbox.id + 1)):
-                        pass
-
-            thread = threading.Thread(target=wrap_with_connection_closure(test_inside_locked))
-            with outbox.process_shard(None):
-                thread.start()
-                thread.join()
-
 
 @region_silo_test(stable=True)
 class RegionOutboxTest(TestCase):