Browse Source

feat(hybridcloud) Add method to fetch sentryapp installs with caching (#71783)

We frequently see spikes of traffic going to this RPC call. These should
benefit from caching as orgs occasionally run hot with errors and
caching would help reduce the call volume.

I've included cache expiration for when a sentryapp changes, and when an
install changes as waiting 1 hour for updates is too long. Once this
lands, I'll start using the new cached method behind an option to
gradually warm the cache and observe changes in throughput.

---------

Co-authored-by: Gabe Villalobos <GabeVillalobos@users.noreply.github.com>
Mark Story 9 months ago
parent
commit
a20e75fe7d

+ 1 - 1
src/sentry/db/models/outboxes.py

@@ -331,7 +331,7 @@ class ControlOutboxProducingManager(BaseManager[_CM]):
 
 class ReplicatedControlModel(ControlOutboxProducingModel):
     """
-    An extension of RegionOutboxProducingModel that provides a default implementation for `outboxes_for_update`
+    An extension of ControlOutboxProducingModel that provides a default implementation for `outboxes_for_update`
     based on the category nd outbox type configured as class variables.  It also provides a default signal handler
     that invokes either of handle_async_replication or handle_async_deletion based on wether the object has
     been deleted or not.  Subclasses can and often should override these methods to configure outbox processing.

+ 12 - 1
src/sentry/models/integrations/sentry_app.py

@@ -190,7 +190,18 @@ class SentryApp(ParanoidModel, HasApiScopes, Model):
 
     def save(self, *args, **kwargs):
         self.date_updated = timezone.now()
-        return super().save(*args, **kwargs)
+        with outbox_context(transaction.atomic(using=router.db_for_write(SentryApp)), flush=False):
+            result = super().save(*args, **kwargs)
+            for outbox in self.outboxes_for_update():
+                outbox.save()
+            return result
+
+    def update(self, *args, **kwargs):
+        with outbox_context(transaction.atomic(using=router.db_for_write(SentryApp)), flush=False):
+            result = super().update(*args, **kwargs)
+            for outbox in self.outboxes_for_update():
+                outbox.save()
+            return result
 
     def is_installed_on(self, organization):
         from sentry.models.integrations.sentry_app_installation import SentryAppInstallation

+ 6 - 0
src/sentry/models/integrations/sentry_app_installation.py

@@ -199,11 +199,17 @@ class SentryAppInstallation(ReplicatedControlModel, ParanoidModel):
         )
 
     def handle_async_replication(self, region_name: str, shard_identifier: int) -> None:
+        from sentry.hybridcloud.rpc.services.caching import region_caching_service
+        from sentry.services.hybrid_cloud.app.service import get_installation
+
         if self.api_token is not None:
             # ApiTokens replicate the organization_id they are associated with.
             with outbox_context(flush=False):
                 for ob in self.api_token.outboxes_for_update():
                     ob.save()
+        region_caching_service.clear_key(
+            key=get_installation.key_from(self.id), region_name=region_name
+        )
 
     @classmethod
     def handle_async_deletion(

+ 28 - 1
src/sentry/receivers/outbox/control.py

@@ -8,16 +8,21 @@ and perform RPC calls to propagate changes to relevant region(s).
 from __future__ import annotations
 
 import logging
+from collections import defaultdict
 from collections.abc import Mapping
 from typing import Any
 
 from django.dispatch import receiver
 
+from sentry.hybridcloud.rpc.services.caching import region_caching_service
 from sentry.models.apiapplication import ApiApplication
 from sentry.models.integrations.integration import Integration
 from sentry.models.integrations.sentry_app import SentryApp
+from sentry.models.integrations.sentry_app_installation import SentryAppInstallation
+from sentry.models.organizationmapping import OrganizationMapping
 from sentry.models.outbox import OutboxCategory, process_control_outbox
 from sentry.receivers.outbox import maybe_process_tombstone
+from sentry.services.hybrid_cloud.app.service import get_installation
 from sentry.services.hybrid_cloud.issue import issue_service
 from sentry.services.hybrid_cloud.organization import RpcOrganizationSignal, organization_service
 
@@ -37,13 +42,35 @@ def process_integration_updates(object_identifier: int, region_name: str, **kwds
 
 @receiver(process_control_outbox, sender=OutboxCategory.SENTRY_APP_UPDATE)
 def process_sentry_app_updates(object_identifier: int, region_name: str, **kwds: Any):
+
     if (
         sentry_app := maybe_process_tombstone(
             model=SentryApp, object_identifier=object_identifier, region_name=region_name
         )
     ) is None:
         return
-    sentry_app  # Currently we do not sync any other sentry_app changes, but if we did, you can use this variable.
+
+    # When a sentry app's definition changes purge cache for all the installations.
+    # This could get slow for large applications, but generally big applications don't change often.
+    install_query = SentryAppInstallation.objects.filter(sentry_app=sentry_app).values(
+        "id", "organization_id"
+    )
+    # There isn't a constraint on org : sentryapp so we have to handle lists
+    install_map: dict[int, list[int]] = defaultdict(list)
+    for row in install_query:
+        install_map[row["organization_id"]].append(row["id"])
+
+    # Limit our operations to the region this outbox is for.
+    # This could be a single query if we use raw_sql.
+    region_query = OrganizationMapping.objects.filter(
+        organization_id__in=list(install_map.keys()), region_name=region_name
+    ).values("organization_id")
+    for row in region_query:
+        installs = install_map[row["organization_id"]]
+        for install_id in installs:
+            region_caching_service.clear_key(
+                key=get_installation.key_from(install_id), region_name=region_name
+            )
 
 
 @receiver(process_control_outbox, sender=OutboxCategory.API_APPLICATION_UPDATE)

+ 14 - 0
src/sentry/services/hybrid_cloud/app/service.py

@@ -7,6 +7,7 @@ import abc
 from collections.abc import Mapping
 from typing import Any
 
+from sentry.hybridcloud.rpc.services.caching.service import back_with_silo_cache
 from sentry.services.hybrid_cloud.app import (
     RpcAlertRuleActionResult,
     RpcSentryApp,
@@ -82,6 +83,14 @@ class AppService(RpcService):
     def get_installation_by_id(self, *, id: int) -> RpcSentryAppInstallation | None:
         pass
 
+    def installation_by_id(self, *, id: int) -> RpcSentryAppInstallation | None:
+        """
+        Get a sentryapp install by id
+
+        This method is a cached wrapper around get_installation_by_id()
+        """
+        return get_installation(id)
+
     @rpc_method
     @abc.abstractmethod
     def get_installation(
@@ -172,4 +181,9 @@ class AppService(RpcService):
         pass
 
 
+@back_with_silo_cache("app_service.get_installation", SiloMode.REGION, RpcSentryAppInstallation)
+def get_installation(id: int) -> RpcSentryAppInstallation | None:
+    return app_service.get_installation_by_id(id=id)
+
+
 app_service = AppService.create_delegation()

+ 2 - 0
src/sentry/tasks/sentry_apps.py

@@ -293,6 +293,7 @@ def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwa
 
 def get_webhook_data(installation_id, issue_id, user_id):
     extra = {"installation_id": installation_id, "issue_id": issue_id}
+    # TODO(hybridcloud) Use installation_by_id to reduce RPC traffic.
     install = app_service.get_installation_by_id(id=installation_id)
     if not install:
         logger.info("workflow_notification.missing_installation", extra=extra)
@@ -316,6 +317,7 @@ def get_webhook_data(installation_id, issue_id, user_id):
 @instrumented_task("sentry.tasks.send_process_resource_change_webhook", **TASK_OPTIONS)
 @retry_decorator
 def send_resource_change_webhook(installation_id, event, data, *args, **kwargs):
+    # TODO(hybridcloud) Use installation_by_id to reduce RPC traffic
     installation = app_service.get_installation_by_id(id=installation_id)
     if not installation:
         logger.info(

+ 11 - 0
tests/sentry/models/test_sentryapp.py

@@ -1,6 +1,7 @@
 from sentry.constants import SentryAppStatus
 from sentry.models.apiapplication import ApiApplication
 from sentry.models.integrations.sentry_app import SentryApp
+from sentry.models.outbox import ControlOutbox, OutboxCategory
 from sentry.testutils.cases import TestCase
 from sentry.testutils.silo import control_silo_test
 
@@ -72,3 +73,13 @@ class SentryAppTest(TestCase):
             organization=other_org, slug=self.sentry_app.slug, prevent_token_exchange=True
         )
         assert not self.sentry_app.is_installed_on(self.org)
+
+    def test_save_outbox_update(self):
+        # Clear the outbox created in setup()
+        ControlOutbox.objects.filter(category=OutboxCategory.SENTRY_APP_UPDATE).delete()
+
+        self.sentry_app.update(name="NoneDB")
+        outboxes = ControlOutbox.objects.filter(category=OutboxCategory.SENTRY_APP_UPDATE).all()
+        assert len(outboxes) == 1
+        assert outboxes[0].shard_identifier == self.sentry_app.id
+        assert outboxes[0].region_name

+ 12 - 0
tests/sentry/models/test_sentryappinstallation.py

@@ -1,8 +1,12 @@
+from unittest import mock
+
+import sentry.hybridcloud.rpc.services.caching as caching_module
 from sentry.models.apiapplication import ApiApplication
 from sentry.models.integrations.sentry_app import SentryApp
 from sentry.models.integrations.sentry_app_installation import SentryAppInstallation
 from sentry.testutils.cases import TestCase
 from sentry.testutils.silo import control_silo_test
+from sentry.types.region import get_region_for_organization
 
 
 @control_silo_test
@@ -44,3 +48,11 @@ class SentryAppInstallationTest(TestCase):
         assert self.install in SentryAppInstallation.objects.filter(
             organization_id=self.install.organization_id
         )
+
+    def test_handle_async_replication_clears_region_cache(self):
+        with mock.patch.object(caching_module, "region_caching_service") as mock_caching_service:
+            self.install.save()
+            region = get_region_for_organization(self.org.slug)
+            mock_caching_service.clear_key.assert_any_call(
+                key=f"app_service.get_installation:{self.install.id}", region_name=region.name
+            )

+ 24 - 0
tests/sentry/receivers/outbox/test_control.py

@@ -5,6 +5,7 @@ from sentry.models.integrations.integration import Integration
 from sentry.receivers.outbox.control import (
     process_api_application_updates,
     process_integration_updates,
+    process_sentry_app_updates,
 )
 from sentry.testutils.cases import TestCase
 from sentry.testutils.silo import control_silo_test
@@ -34,3 +35,26 @@ class ProcessControlOutboxTest(TestCase):
         mock_maybe_process.assert_called_with(
             ApiApplication, self.identifier, region_name=_TEST_REGION.name
         )
+
+    @patch("sentry.receivers.outbox.control.region_caching_service")
+    def test_process_sentry_app_updates(self, mock_caching):
+        org = self.create_organization()
+        sentry_app = self.create_sentry_app()
+        install = self.create_sentry_app_installation(slug=sentry_app.slug, organization=org)
+        install_dupe = self.create_sentry_app_installation(slug=sentry_app.slug, organization=org)
+
+        org_two = self.create_organization()
+        install_two = self.create_sentry_app_installation(
+            slug=sentry_app.slug, organization=org_two
+        )
+
+        process_sentry_app_updates(object_identifier=sentry_app.id, region_name=_TEST_REGION.name)
+        mock_caching.clear_key.assert_any_call(
+            key=f"app_service.get_installation:{install.id}", region_name=_TEST_REGION.name
+        )
+        mock_caching.clear_key.assert_any_call(
+            key=f"app_service.get_installation:{install_dupe.id}", region_name=_TEST_REGION.name
+        )
+        mock_caching.clear_key.assert_any_call(
+            key=f"app_service.get_installation:{install_two.id}", region_name=_TEST_REGION.name
+        )