Browse Source

feat(hybridcloud) Partition busy webhook integrations to increase concurrency (#67669)

For high volume jira server instances we can get higher concurrency
delivery by splitting into multiple mailboxes. I've arbitrarily chosen
3000 messages in 1 hour, and 100 buckets based on vibes. There is only 1
customer integration that will fall into this code path currently.
Mark Story 11 months ago
parent
commit
059706c03c

+ 1 - 0
fixtures/integrations/jira/stubs/edit_issue_assignee_payload.json

@@ -14,6 +14,7 @@
     "id": "10172"
   },
   "issue": {
+    "id": 101,
     "fields": {
       "assignee": {
         "emailAddress": "jess@sentry.io",

+ 1 - 1
src/sentry/hybridcloud/models/webhookpayload.py

@@ -76,7 +76,7 @@ class WebhookPayload(Model):
         *,
         region: str,
         provider: str,
-        identifier: int,
+        identifier: int | str,
         request: HttpRequest,
         integration_id: int | None = None,
     ) -> Self:

+ 16 - 20
src/sentry/middleware/integrations/parsers/base.py

@@ -142,49 +142,45 @@ class BaseRequestParser(abc.ABC):
 
         return region_to_response_map
 
-    def get_response_from_outbox_creation(
-        self, regions: Sequence[Region], shard_identifier_override: int | None = None
+    def get_response_from_webhookpayload(
+        self,
+        regions: Sequence[Region],
+        identifier: int | str | None = None,
+        shard_identifier_override: int | None = None,  # deprecated used in getsentry
+        integration_id: int | None = None,
     ):
         """
-        DEPRECATED: use get_response_from_outbox_creation_for_integration
-
-        Used to create outboxes for provided regions to handle the webhooks asynchronously.
+        Used to create webhookpayloads for provided regions to handle the webhooks asynchronously.
         Responds to the webhook provider with a 202 Accepted status.
         """
         if len(regions) < 1:
             return HttpResponse(status=status.HTTP_202_ACCEPTED)
 
-        shard_identifier = shard_identifier_override or self.webhook_identifier.value
+        shard_identifier = identifier or shard_identifier_override or self.webhook_identifier.value
         for region in regions:
             WebhookPayload.create_from_request(
                 region=region.name,
                 provider=self.provider,
                 identifier=shard_identifier,
+                integration_id=integration_id,
                 request=self.request,
             )
 
         return HttpResponse(status=status.HTTP_202_ACCEPTED)
 
-    def get_response_from_outbox_creation_for_integration(
+    # Alias to prop up getsentry
+    get_response_from_outbox_creation = get_response_from_webhookpayload
+
+    def get_response_from_webhookpayload_for_integration(
         self, regions: Sequence[Region], integration: Integration | RpcIntegration
     ):
         """
         Used to create outboxes for provided regions to handle the webhooks asynchronously.
         Responds to the webhook provider with a 202 Accepted status.
         """
-        if not regions:
-            return HttpResponse(status=status.HTTP_202_ACCEPTED)
-
-        identifier = integration.id
-        for region in regions:
-            WebhookPayload.create_from_request(
-                region=region.name,
-                provider=self.provider,
-                identifier=identifier,
-                request=self.request,
-                integration_id=identifier,
-            )
-        return HttpResponse(status=status.HTTP_202_ACCEPTED)
+        return self.get_response_from_webhookpayload(
+            regions=regions, identifier=integration.id, integration_id=integration.id
+        )
 
     def get_response_from_first_region(self):
         regions = self.get_regions_from_organizations()

+ 1 - 1
src/sentry/middleware/integrations/parsers/bitbucket.py

@@ -45,7 +45,7 @@ class BitbucketRequestParser(BaseRequestParser):
             logging_extra["mapping_id"] = mapping.id
             logger.info("%s.no_region", self.provider, extra=logging_extra)
             return self.get_response_from_control_silo()
-        return self.get_response_from_outbox_creation(
+        return self.get_response_from_webhookpayload(
             regions=[region], shard_identifier_override=mapping.organization_id
         )
 

+ 1 - 1
src/sentry/middleware/integrations/parsers/github.py

@@ -65,6 +65,6 @@ class GithubRequestParser(BaseRequestParser):
         except (Integration.DoesNotExist, OrganizationIntegration.DoesNotExist):
             return self.get_default_missing_integration_response()
 
-        return self.get_response_from_outbox_creation_for_integration(
+        return self.get_response_from_webhookpayload_for_integration(
             regions=regions, integration=integration
         )

+ 1 - 1
src/sentry/middleware/integrations/parsers/gitlab.py

@@ -76,7 +76,7 @@ class GitlabRequestParser(BaseRequestParser, GitlabWebhookMixin):
         except (Integration.DoesNotExist, OrganizationIntegration.DoesNotExist):
             return self.get_default_missing_integration_response()
 
-        return self.get_response_from_outbox_creation_for_integration(
+        return self.get_response_from_webhookpayload_for_integration(
             regions=regions, integration=integration
         )
 

+ 1 - 1
src/sentry/middleware/integrations/parsers/jira.py

@@ -81,7 +81,7 @@ class JiraRequestParser(BaseRequestParser):
                 return self.get_response_from_control_silo()
 
         if self.view_class in self.outbox_response_region_classes:
-            return self.get_response_from_outbox_creation_for_integration(
+            return self.get_response_from_webhookpayload_for_integration(
                 regions=regions, integration=integration
             )
 

+ 32 - 2
src/sentry/middleware/integrations/parsers/jira_server.py

@@ -1,15 +1,20 @@
 from __future__ import annotations
 
 import logging
+from collections.abc import Mapping
+from typing import Any
 
 from django.http import HttpResponse
 
+from sentry import options
 from sentry.integrations.jira_server.webhooks import (
     JiraServerIssueUpdatedWebhook,
     get_integration_from_token,
 )
 from sentry.middleware.integrations.parsers.base import BaseRequestParser
 from sentry.models.outbox import WebhookProviderIdentifier
+from sentry.ratelimits import backend as ratelimiter
+from sentry.services.hybrid_cloud.integration.model import RpcIntegration
 from sentry.utils import json
 
 logger = logging.getLogger(__name__)
@@ -40,10 +45,35 @@ class JiraServerRequestParser(BaseRequestParser):
             logger.info("missing-changelog", extra={"integration_id": integration.id})
             return HttpResponse(status=200)
 
-        return self.get_response_from_outbox_creation_for_integration(
-            regions=regions, integration=integration
+        identifier = self.get_mailbox_identifier(integration, data)
+        return self.get_response_from_webhookpayload(
+            regions=regions,
+            identifier=identifier,
+            integration_id=integration.id,
         )
 
+    def get_mailbox_identifier(self, integration: RpcIntegration, data: Mapping[str, Any]) -> str:
+        """
+        Some Jira server instances send us high volumes of hooks.
+        Splitting these hooks across multiple mailboxes allows us to deliver messages in parallel
+        without sacrificing linearization that customers care about.
+        """
+        enabled = options.get("hybridcloud.webhookpayload.use_mailbox_buckets")
+        issue_id = data.get("issue", {}).get("id", None)
+        if not issue_id or not enabled:
+            return str(integration.id)
+
+        # If we get fewer than 3000 in 1 hour we don't need to split into buckets
+        ratelimit_key = f"webhookpayload:{self.provider}:{integration.id}"
+        if not ratelimiter.is_limited(key=ratelimit_key, window=60 * 60, limit=3000):
+            return str(integration.id)
+
+        # Split high volume integrations into 100 buckets.
+        # 100 is arbitrary but we can't leave it unbounded.
+        bucket_number = issue_id % 100
+
+        return f"{integration.id}:{bucket_number}"
+
     def get_response(self):
         if self.view_class == JiraServerIssueUpdatedWebhook:
             return self.get_response_from_issue_update_webhook()

+ 1 - 1
src/sentry/middleware/integrations/parsers/msteams.py

@@ -125,6 +125,6 @@ class MsTeamsRequestParser(BaseRequestParser, MsTeamsWebhookMixin):
             "Scheduling event for request",
             extra={"request_data": self.request_data},
         )
-        return self.get_response_from_outbox_creation_for_integration(
+        return self.get_response_from_webhookpayload_for_integration(
             regions=regions, integration=integration
         )

+ 1 - 1
src/sentry/middleware/integrations/parsers/plugin.py

@@ -54,6 +54,6 @@ class PluginRequestParser(BaseRequestParser):
 
         # Because outboxes are now sharded by integration and plugins don't have one,
         # we use the org ID as the shard ID to batch these changes.
-        return self.get_response_from_outbox_creation(
+        return self.get_response_from_webhookpayload(
             regions=[region], shard_identifier_override=mapping.organization_id
         )

Some files were not shown because too many files changed in this diff