@@ -0,0 +1,336 @@
+from __future__ import annotations
+import abc
+import contextlib
+import datetime
+import sys
+from enum import IntEnum
+from typing import Any, Generator, Iterable, List, Mapping, Set
+from django.db import connections, models, router, transaction
+from django.db.models import Max
+from django.dispatch import Signal
+from django.utils import timezone
+from sentry.db.models import (
+ BoundedBigIntegerField,
+ BoundedPositiveIntegerField,
+ JSONField,
+ Model,
+ control_silo_only_model,
+ region_silo_only_model,
+ sane_repr,
+from sentry.silo import SiloMode
+THE_PAST = datetime.datetime(2016, 8, 1, 0, 0, 0, 0, tzinfo=timezone.utc)
+class OutboxScope(IntEnum):
+ def __str__(self):
+ return self.name
+ @classmethod
+ def as_choices(cls):
+ return [(i.value, i.value) for i in cls]
+class OutboxCategory(IntEnum):
+ @classmethod
+ def as_choices(cls):
+ return [(i.value, i.value) for i in cls]
+class WebhookProviderIdentifier(IntEnum):
+ SLACK = 0
+ GITHUB = 1
+def _ensure_not_null(k: str, v: Any) -> Any:
+ if v is None:
+ raise ValueError(f"Attribute {k} was None, but it needed to be set!")
+ return v
+class OutboxFlushError(Exception):
+ pass
+class OutboxBase(Model):
+ sharding_columns: Iterable[str]
+ coalesced_columns: Iterable[str]
+ @classmethod
+ def _unique_object_identifier(cls):
+ using = router.db_for_write(cls)
+ with transaction.atomic(using=using):
+ with connections[using].cursor() as cursor:
+ cursor.execute("SELECT nextval(%s)", [f"{cls._meta.db_table}_id_seq"])
+ return cursor.fetchone()[0]
+ @classmethod
+ def find_scheduled_shards(cls) -> Iterable[Mapping[str, Any]]:
+ return (
+ cls.objects.values(*cls.sharding_columns)
+ .annotate(
+ scheduled_for=Max("scheduled_for"),
+ id=Max("id"),
+ )
+ .filter(scheduled_for__lte=timezone.now())
+ .order_by("scheduled_for", "id")
+ )
+ @classmethod
+ def prepare_next_from_shard(cls, row: Mapping[str, Any]) -> OutboxBase | None:
+ with transaction.atomic(savepoint=False):
+ next_outbox: OutboxBase | None
+ next_outbox = (
+ cls(**row).selected_messages_in_shard().order_by("id").select_for_update().first()
+ )
+ if not next_outbox:
+ return None
+ # We rely on 'proof of failure by remaining' to handle retries -- basically, by scheduling this shard, we
+ # expect all objects to be drained before the next schedule comes around, or else we will run again.
+ # Note that the system does not strongly protect against concurrent processing -- this is expected in the
+ # case of drains, for instance.
+ now = timezone.now()
+ next_outbox.selected_messages_in_shard().update(
+ scheduled_for=next_outbox.next_schedule(now), scheduled_from=now
+ )
+ return next_outbox
+ def key_from(self, attrs: Iterable[str]) -> Mapping[str, Any]:
+ return {k: _ensure_not_null(k, getattr(self, k)) for k in attrs}
+ def selected_messages_in_shard(self) -> models.QuerySet:
+ return self.objects.filter(**self.key_from(self.sharding_columns))
+ def select_coalesced_messages(self) -> models.QuerySet:
+ return self.objects.filter(**self.key_from(self.coalesced_columns))
+ class Meta:
+ abstract = True
+ __include_in_export__ = False
+ # Different shard_scope, shard_identifier pairings of messages are always deliverable in parallel
+ shard_scope = BoundedPositiveIntegerField(choices=OutboxScope.as_choices(), null=False)
+ shard_identifier = BoundedBigIntegerField(null=False)
+ # Objects of equal scope, shard_identifier, category, and object_identifier are coalesced in processing.
+ category = BoundedPositiveIntegerField(choices=OutboxCategory.as_choices(), null=False)
+ object_identifier = BoundedBigIntegerField(null=False)
+ # payload is used for webhook payloads.
+ payload = JSONField(null=True)
+ # The point at which this object was scheduled, used as a diff from scheduled_for to determine the intended delay.
+ scheduled_from = models.DateTimeField(null=False, default=timezone.now)
+ # The point at which this object is intended to be replicated, used for backoff purposes. Keep in mind that
+ # the largest back off effectively applies to the entire 'shard' key.
+ scheduled_for = models.DateTimeField(null=False, default=THE_PAST)
+ def last_delay(self) -> datetime.timedelta:
+ return max(self.scheduled_for - self.scheduled_from, datetime.timedelta(seconds=1))
+ def next_schedule(self, now: datetime.datetime) -> datetime.datetime:
+ return now + (self.last_delay() * 2)
+ @contextlib.contextmanager
+ def process_coalesced(self) -> Generator[OutboxBase | None, None, None]:
+ # Do not, use a select for update here -- it is tempting, but a major performance issue.
+ # we should simply accept the occasional multiple sends than to introduce hard locking.
+ # so long as all objects sent are committed, and so long as any concurrent changes to data
+ # result in a future processing, we should always converge on non stale values.
+ coalesced: OutboxBase | None = self.select_coalesced_messages().last()
+ yield coalesced
+ if coalesced is not None:
+ self.select_coalesced_messages().filter(id__lte=coalesced.id).delete()
+ def process(self) -> bool:
+ with self.process_coalesced() as coalesced:
+ if coalesced is not None:
+ coalesced.send_signal()
+ return True
+ return False
+ @abc.abstractmethod
+ def send_signal(self):
+ pass
+ def drain_shard(self, max_updates_to_drain: int | None = None):
+ runs = 0
+ next_row: OutboxBase | None = self.selected_messages_in_shard().first()
+ while next_row is not None and (
+ max_updates_to_drain is None or runs < max_updates_to_drain
+ ):
+ runs += 1
+ next_row.process()
+ next_row: OutboxBase | None = self.selected_messages_in_shard().first()
+ if next_row is not None:
+ raise OutboxFlushError(
+ f"Could not flush items from shard {self.key_from(self.sharding_columns)!r}"
+ )
+MONOLITH_REGION_NAME = "--monolith--"
+# Outboxes bound from region silo -> control silo
+class RegionOutbox(OutboxBase):
+ def send_signal(self):
+ process_region_outbox.send(
+ sender=OutboxCategory(self.category),
+ payload=self.payload,
+ object_identifier=self.object_identifier,
+ )
+ sharding_columns = ("shard_scope", "shard_identifier")
+ coalesced_columns = ("shard_scope", "shard_identifier", "category", "object_identifier")
+ class Meta:
+ app_label = "sentry"
+ db_table = "sentry_regionoutbox"
+ index_together = (
+ (
+ "shard_scope",
+ "shard_identifier",
+ "category",
+ "object_identifier",
+ ),
+ (
+ "shard_scope",
+ "shard_identifier",
+ "scheduled_for",
+ ),
+ ("shard_scope", "shard_identifier", "id"),
+ )
+ __repr__ = sane_repr("shard_scope", "shard_identifier", "category", "object_identifier")
+# Outboxes bound from region silo -> control silo
+class ControlOutbox(OutboxBase):
+ sharding_columns = ("region_name", "shard_scope", "shard_identifier")
+ coalesced_columns = (
+ "region_name",
+ "shard_scope",
+ "shard_identifier",
+ "category",
+ "object_identifier",
+ )
+ region_name = models.CharField(max_length=48)
+ def send_signal(self):
+ process_control_outbox.send(
+ sender=self.category,
+ payload=self.payload,
+ region_name=self.region_name,
+ object_identifier=self.object_identifier,
+ )
+ class Meta:
+ app_label = "sentry"
+ db_table = "sentry_controloutbox"
+ index_together = (
+ (
+ "region_name",
+ "shard_scope",
+ "shard_identifier",
+ "category",
+ "object_identifier",
+ ),
+ (
+ "region_name",
+ "shard_scope",
+ "shard_identifier",
+ "scheduled_for",
+ ),
+ ("region_name", "shard_scope", "shard_identifier", "id"),
+ )
+ __repr__ = sane_repr(
+ "region_name", "shard_scope", "shard_identifier", "category", "object_identifier"
+ )
+ @classmethod
+ def for_webhook_update(
+ cls,
+ *,
+ webhook_identifier: WebhookProviderIdentifier,
+ region_names: List[str],
+ payload=Mapping[str, Any],
+ ) -> Iterable[ControlOutbox]:
+ for region_name in region_names:
+ result = cls()
+ result.shard_scope = OutboxScope.WEBHOOK_SCOPE
+ result.shard_identifier = webhook_identifier.value
+ result.object_identifier = cls._unique_object_identifier()
+ result.category = OutboxCategory.WEBHOOK_PROXY
+ result.region_name = region_name
+ result.payload = payload
+ yield result
+def _find_orgs_for_user(user_id: int) -> Set[int]:
+ # TODO: This must be changed to the org member mapping in the control silo eventually.
+ from sentry.models import OrganizationMember
+ return {
+ m["organization_id"]
+ for m in OrganizationMember.objects.filter(user_id=user_id).values("organization_id")
+ }
+def find_regions_for_user(user_id: int) -> Set[str]:
+ from sentry.models import OrganizationMapping
+ org_ids: Set[int]
+ if "pytest" in sys.modules:
+ from sentry.testutils.silo import exempt_from_silo_limits
+ with exempt_from_silo_limits():
+ org_ids = _find_orgs_for_user(user_id)
+ else:
+ org_ids = _find_orgs_for_user(user_id)
+ if SiloMode.get_current_mode() == SiloMode.MONOLITH:
+ return {
+ }
+ else:
+ return {
+ t["region_name"]
+ for t in OrganizationMapping.objects.filter(organization_id__in=org_ids).values(
+ "region_name"
+ )
+ }
+def outbox_silo_modes() -> List[SiloMode]:
+ cur = SiloMode.get_current_mode()
+ result: List[SiloMode] = []
+ if cur != SiloMode.REGION:
+ result.append(SiloMode.CONTROL)
+ if cur != SiloMode.CONTROL:
+ result.append(SiloMode.REGION)
+ return result
+process_region_outbox = Signal(providing_args=["payload", "object_identifier"])
+process_control_outbox = Signal(providing_args=["payload", "region_name", "object_identifier"])