Browse Source

chore(sentry apps): Create forward shims for sentry apps tasks (#78344)

Christinarlong 5 months ago
parent
commit
c4fe51a9b0

+ 0 - 1
pyproject.toml

@@ -343,7 +343,6 @@ module = [
     "sentry.tasks.auth",
     "sentry.tasks.base",
     "sentry.tasks.process_buffer",
-    "sentry.tasks.sentry_apps",
     "sentry.templatetags.sentry_assets",
     "sentry.templatetags.sentry_helpers",
     "sentry.templatetags.sentry_plugins",

+ 1 - 0
src/sentry/conf/server.py

@@ -752,6 +752,7 @@ CELERY_IMPORTS = (
     "sentry.integrations.github.tasks.pr_comment",
     "sentry.integrations.jira.tasks",
     "sentry.integrations.opsgenie.tasks",
+    "sentry.sentry_apps.tasks",
     "sentry.snuba.tasks",
     "sentry.replays.tasks",
     "sentry.monitors.tasks.clock_pulse",

+ 23 - 0
src/sentry/sentry_apps/tasks/__init__.py

@@ -0,0 +1,23 @@
+from .sentry_apps import (
+    build_comment_webhook,
+    clear_region_cache,
+    create_or_update_service_hooks_for_sentry_app,
+    installation_webhook,
+    process_resource_change_bound,
+    send_alert_event,
+    send_resource_change_webhook,
+    workflow_notification,
+)
+from .service_hooks import process_service_hook
+
+__all__ = (
+    "send_alert_event",
+    "build_comment_webhook",
+    "clear_region_cache",
+    "create_or_update_service_hooks_for_sentry_app",
+    "installation_webhook",
+    "process_resource_change_bound",
+    "send_resource_change_webhook",
+    "workflow_notification",
+    "process_service_hook",
+)

+ 120 - 0
src/sentry/sentry_apps/tasks/sentry_apps.py

@@ -0,0 +1,120 @@
+from collections.abc import Mapping
+from typing import Any
+
+from celery import Task
+
+from sentry.eventstore.models import Event
+from sentry.tasks.base import instrumented_task
+from sentry.tasks.sentry_apps import CONTROL_TASK_OPTIONS, TASK_OPTIONS
+from sentry.tasks.sentry_apps import build_comment_webhook as old_build_comment_webhook
+from sentry.tasks.sentry_apps import clear_region_cache as old_clear_region_cache
+from sentry.tasks.sentry_apps import (
+    create_or_update_service_hooks_for_sentry_app as old_create_or_update_service_hooks_for_sentry_app,
+)
+from sentry.tasks.sentry_apps import installation_webhook as old_installation_webhook
+from sentry.tasks.sentry_apps import (
+    process_resource_change_bound as old_process_resource_change_bound,
+)
+from sentry.tasks.sentry_apps import retry_decorator
+from sentry.tasks.sentry_apps import send_alert_event as old_send_alert_event
+from sentry.tasks.sentry_apps import (
+    send_resource_change_webhook as old_send_resource_change_webhook,
+)
+from sentry.tasks.sentry_apps import workflow_notification as old_workflow_notification
+
+
+@instrumented_task(name="sentry.sentry_apps.tasks.sentry_apps.send_alert_event", **TASK_OPTIONS)
+@retry_decorator
+def send_alert_event(
+    event: Event,
+    rule: str,
+    sentry_app_id: int,
+    additional_payload_key: str | None = None,
+    additional_payload: Mapping[str, Any] | None = None,
+) -> None:
+    old_send_alert_event(
+        event=event,
+        rule=rule,
+        sentry_app_id=sentry_app_id,
+        additional_payload_key=additional_payload_key,
+        additional_payload=additional_payload,
+    )
+
+
+@instrumented_task(
+    "sentry.sentry_apps.tasks.sentry_apps.process_resource_change_bound", bind=True, **TASK_OPTIONS
+)
+@retry_decorator
+def process_resource_change_bound(
+    self: Task, action: str, sender: str, instance_id: int, **kwargs: Any
+) -> None:
+    old_process_resource_change_bound(
+        self=self, action=action, sender=sender, instance_id=instance_id, **kwargs
+    )
+
+
+@instrumented_task(
+    name="sentry.sentry_apps.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS
+)
+@retry_decorator
+def installation_webhook(installation_id, user_id, *args, **kwargs):
+    old_installation_webhook(installation_id=installation_id, user_id=user_id, *args, **kwargs)
+
+
+@instrumented_task(
+    name="sentry.sentry_apps.tasks.sentry_apps.clear_region_cache", **CONTROL_TASK_OPTIONS
+)
+def clear_region_cache(sentry_app_id: int, region_name: str) -> None:
+    old_clear_region_cache(sentry_app_id=sentry_app_id, region_name=region_name)
+
+
+@instrumented_task(
+    name="sentry.sentry_apps.tasks.sentry_apps.workflow_notification", **TASK_OPTIONS
+)
+@retry_decorator
+def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs):
+    old_workflow_notification(
+        installation_id=installation_id,
+        issue_id=issue_id,
+        type=type,
+        user_id=user_id,
+        *args,
+        **kwargs,
+    )
+
+
+@instrumented_task(
+    name="sentry.sentry_apps.tasks.sentry_apps.build_comment_webhook", **TASK_OPTIONS
+)
+@retry_decorator
+def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs):
+    old_build_comment_webhook(
+        installation_id=installation_id,
+        issue_id=issue_id,
+        type=type,
+        user_id=user_id,
+        *args,
+        **kwargs,
+    )
+
+
+@instrumented_task(
+    "sentry.sentry_apps.tasks.sentry_apps.send_resource_change_webhook", **TASK_OPTIONS
+)
+@retry_decorator
+def send_resource_change_webhook(installation_id, event, data, *args, **kwargs):
+    old_send_resource_change_webhook(
+        installation_id=installation_id, event=event, data=data, *args, **kwargs
+    )
+
+
+@instrumented_task(
+    "sentry.sentry_apps.tasks.sentry_apps.create_or_update_service_hooks_for_sentry_app",
+    **CONTROL_TASK_OPTIONS,
+)
+def create_or_update_service_hooks_for_sentry_app(
+    sentry_app_id: int, webhook_url: str, events: list[str], **kwargs: dict
+) -> None:
+    old_create_or_update_service_hooks_for_sentry_app(
+        sentry_app_id=sentry_app_id, webhook_url=webhook_url, events=events, **kwargs
+    )

+ 14 - 0
src/sentry/sentry_apps/tasks/service_hooks.py

@@ -0,0 +1,14 @@
+from sentry.silo.base import SiloMode
+from sentry.tasks.base import instrumented_task, retry
+from sentry.tasks.servicehooks import process_service_hook as old_process_service_hook
+
+
+@instrumented_task(
+    name="sentry.sentry_apps.tasks.service_hooks.process_service_hook",
+    default_retry_delay=60 * 5,
+    max_retries=5,
+    silo_mode=SiloMode.REGION,
+)
+@retry
+def process_service_hook(servicehook_id, event, **kwargs):
+    old_process_service_hook(servicehook_id=servicehook_id, event=event, **kwargs)

+ 77 - 46
src/sentry/tasks/sentry_apps.py

@@ -5,14 +5,15 @@ from collections import defaultdict
 from collections.abc import Mapping
 from typing import Any
 
-from celery import current_task
+from celery import Task, current_task
 from django.urls import reverse
 from requests.exceptions import RequestException
 
 from sentry import analytics
 from sentry.api.serializers import serialize
 from sentry.constants import SentryAppInstallationStatus
-from sentry.eventstore.models import Event, GroupEvent
+from sentry.db.models.base import Model
+from sentry.eventstore.models import BaseEvent, Event, GroupEvent
 from sentry.hybridcloud.rpc.caching import region_caching_service
 from sentry.models.activity import Activity
 from sentry.models.group import Group
@@ -23,6 +24,7 @@ from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEve
 from sentry.sentry_apps.models.sentry_app import VALID_EVENTS, SentryApp
 from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation
 from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject
+from sentry.sentry_apps.services.app.model import RpcSentryAppInstallation
 from sentry.sentry_apps.services.app.service import (
     app_service,
     get_by_application_id,
@@ -31,6 +33,7 @@ from sentry.sentry_apps.services.app.service import (
 from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError
 from sentry.silo.base import SiloMode
 from sentry.tasks.base import instrumented_task, retry
+from sentry.users.services.user.model import RpcUser
 from sentry.users.services.user.service import user_service
 from sentry.utils import metrics
 from sentry.utils.http import absolute_uri
@@ -68,7 +71,9 @@ RESOURCE_RENAMES = {"Group": "issue"}
 TYPES = {"Group": Group, "Error": Event, "Comment": Activity}
 
 
-def _webhook_event_data(event, group_id, project_id):
+def _webhook_event_data(
+    event: Event | GroupEvent, group_id: int, project_id: int
+) -> dict[str, Any]:
     project = Project.objects.get_from_cache(id=project_id)
     organization = Organization.objects.get_from_cache(id=project.organization_id)
 
@@ -113,6 +118,7 @@ def send_alert_event(
     :return:
     """
     group = event.group
+    assert group, "Group must exist to get related attributes"
     project = Project.objects.get_from_cache(id=group.project_id)
     organization = Organization.objects.get_from_cache(id=project.organization_id)
 
@@ -164,9 +170,17 @@ def send_alert_event(
         )
 
 
-def _process_resource_change(action, sender, instance_id, retryer=None, *args, **kwargs):
+def _process_resource_change(
+    *,
+    action: str,
+    sender: str,
+    instance_id: int,
+    retryer: Task | None = None,
+    **kwargs: Any,
+) -> None:
     # The class is serialized as a string when enqueueing the class.
-    model = TYPES[sender]
+    model: type[Event] | type[Model] = TYPES[sender]
+    instance: Event | Model | None = None
     # The Event model has different hooks for the different event types. The sender
     # determines which type eg. Error and therefore the 'name' eg. error
     if issubclass(model, Event):
@@ -186,19 +200,19 @@ def _process_resource_change(action, sender, instance_id, retryer=None, *args, *
 
     # We may run into a race condition where this task executes before the
     # transaction that creates the Group has committed.
-    try:
-        if issubclass(model, Event):
-            # XXX:(Meredith): Passing through the entire event was an intentional choice
-            # to avoid having to query NodeStore again for data we had previously in
-            # post_process. While this is not ideal, changing this will most likely involve
-            # an overhaul of how we do things in post_process, not just this task alone.
-            instance = kwargs.get("instance")
-        else:
+    if issubclass(model, Event):
+        # XXX:(Meredith): Passing through the entire event was an intentional choice
+        # to avoid having to query NodeStore again for data we had previously in
+        # post_process. While this is not ideal, changing this will most likely involve
+        # an overhaul of how we do things in post_process, not just this task alone.
+        instance = kwargs.get("instance")
+    else:
+        try:
             instance = model.objects.get(id=instance_id)
-    except model.DoesNotExist as e:
-        # Explicitly requeue the task, so we don't report this to Sentry until
-        # we hit the max number of retries.
-        return retryer.retry(exc=e)
+        except model.DoesNotExist as e:
+            # Explicitly requeue the task, so we don't report this to Sentry until
+            # we hit the max number of retries.
+            return retryer.retry(exc=e)
 
     event = f"{name}.{action}"
 
@@ -211,32 +225,40 @@ def _process_resource_change(action, sender, instance_id, retryer=None, *args, *
         org = Organization.objects.get_from_cache(
             id=Project.objects.get_from_cache(id=instance.project_id).organization_id
         )
-
-    installations = filter(
-        lambda i: event in i.sentry_app.events,
-        app_service.get_installed_for_organization(organization_id=org.id),
-    )
-
-    for installation in installations:
-        data = {}
-        if isinstance(instance, Event) or isinstance(instance, GroupEvent):
-            data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id)
-        else:
-            data[name] = serialize(instance)
-
-        # Trigger a new task for each webhook
-        send_resource_change_webhook.delay(installation_id=installation.id, event=event, data=data)
+        assert org, "organization must exist to get related sentry app installations"
+        installations: list[RpcSentryAppInstallation] = [
+            installation
+            for installation in app_service.get_installed_for_organization(organization_id=org.id)
+            if event in installation.sentry_app.events
+        ]
+
+        for installation in installations:
+            data = {}
+            if isinstance(instance, (Event, GroupEvent)):
+                assert instance.group_id, "group id is required to create webhook event data"
+                data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id)
+            else:
+                data[name] = serialize(instance)
+
+            # Trigger a new task for each webhook
+            send_resource_change_webhook.delay(
+                installation_id=installation.id, event=event, data=data
+            )
 
 
 @instrumented_task("sentry.tasks.process_resource_change_bound", bind=True, **TASK_OPTIONS)
 @retry_decorator
-def process_resource_change_bound(self, action, sender, instance_id, *args, **kwargs):
-    _process_resource_change(action, sender, instance_id, retryer=self, *args, **kwargs)
+def process_resource_change_bound(
+    self: Task, action: str, sender: str, instance_id: int, **kwargs: Any
+) -> None:
+    _process_resource_change(
+        action=action, sender=sender, instance_id=instance_id, retryer=self, **kwargs
+    )
 
 
 @instrumented_task(name="sentry.tasks.sentry_apps.installation_webhook", **CONTROL_TASK_OPTIONS)
 @retry_decorator
-def installation_webhook(installation_id, user_id, *args, **kwargs):
+def installation_webhook(installation_id: int, user_id: int, *args: Any, **kwargs: Any) -> None:
     from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier
 
     extra = {"installation_id": installation_id, "user_id": user_id}
@@ -295,7 +317,9 @@ def clear_region_cache(sentry_app_id: int, region_name: str) -> None:
 
 @instrumented_task(name="sentry.tasks.sentry_apps.workflow_notification", **TASK_OPTIONS)
 @retry_decorator
-def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwargs):
+def workflow_notification(
+    installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any
+) -> None:
     webhook_data = get_webhook_data(installation_id, issue_id, user_id)
     if not webhook_data:
         return
@@ -313,10 +337,12 @@ def workflow_notification(installation_id, issue_id, type, user_id, *args, **kwa
 
 @instrumented_task(name="sentry.tasks.sentry_apps.build_comment_webhook", **TASK_OPTIONS)
 @retry_decorator
-def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwargs):
+def build_comment_webhook(
+    installation_id: int, issue_id: int, type: str, user_id: int, *args: Any, **kwargs: Any
+) -> None:
     webhook_data = get_webhook_data(installation_id, issue_id, user_id)
     if not webhook_data:
-        return
+        return None
     install, _, user = webhook_data
     data = kwargs.get("data", {})
     project_slug = data.get("project_slug")
@@ -340,18 +366,20 @@ def build_comment_webhook(installation_id, issue_id, type, user_id, *args, **kwa
     )
 
 
-def get_webhook_data(installation_id, issue_id, user_id):
+def get_webhook_data(
+    installation_id: int, issue_id: int, user_id: int
+) -> tuple[RpcSentryAppInstallation, Group, RpcUser | None] | None:
     extra = {"installation_id": installation_id, "issue_id": issue_id}
     install = app_service.installation_by_id(id=installation_id)
     if not install:
         logger.info("workflow_notification.missing_installation", extra=extra)
-        return
+        return None
 
     try:
         issue = Group.objects.get(id=issue_id)
     except Group.DoesNotExist:
         logger.info("workflow_notification.missing_issue", extra=extra)
-        return
+        return None
 
     user = None
     if user_id:
@@ -364,7 +392,9 @@ def get_webhook_data(installation_id, issue_id, user_id):
 
 @instrumented_task("sentry.tasks.send_process_resource_change_webhook", **TASK_OPTIONS)
 @retry_decorator
-def send_resource_change_webhook(installation_id, event, data, *args, **kwargs):
+def send_resource_change_webhook(
+    installation_id: int, event: str, data: dict[str, Any], *args: Any, **kwargs: Any
+) -> None:
     installation = app_service.installation_by_id(id=installation_id)
     if not installation:
         logger.info(
@@ -378,12 +408,12 @@ def send_resource_change_webhook(installation_id, event, data, *args, **kwargs):
     metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event})
 
 
-def notify_sentry_app(event, futures):
+def notify_sentry_app(event: BaseEvent, futures):
     for f in futures:
         if not f.kwargs.get("sentry_app"):
             continue
 
-        extra_kwargs = {
+        extra_kwargs: dict[str, Any] = {
             "additional_payload_key": None,
             "additional_payload": None,
         }
@@ -406,7 +436,8 @@ def notify_sentry_app(event, futures):
         )
 
 
-def send_webhooks(installation, event, **kwargs):
+def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any) -> None:
+    servicehook: ServiceHook
     try:
         servicehook = ServiceHook.objects.get(
             organization_id=installation.organization_id, actor_id=installation.id
@@ -452,7 +483,7 @@ def send_webhooks(installation, event, **kwargs):
         send_and_save_webhook_request(
             installation.sentry_app,
             request_data,
-            servicehook.sentry_app.webhook_url,
+            installation.sentry_app.webhook_url,
         )