Browse Source

feat(outcomes): Dispatch billing outcomes to a dedicated topic (#31175)

Outcomes are used for stats, debugging, quota enforcement, and spike protection.
The last two categories run exclusively in Sentry SaaS and require timely
consistent handling.

To ensure this, this PR adds the possibility to send ACCEPTED and RATE_LIMITED
outcomes to a dedicated topic. Since this is optional, this will not be used in
development and self-hosted Sentry.
Jan Michael Auer 3 years ago
parent
commit
00bf8260de
4 changed files with 234 additions and 18 deletions
  1. 1 0
      mypy.ini
  2. 4 0
      src/sentry/conf/server.py
  3. 49 18
      src/sentry/utils/outcomes.py
  4. 180 0
      tests/sentry/utils/test_outcomes.py

+ 1 - 0
mypy.ini

@@ -75,6 +75,7 @@ files = src/sentry/analytics/,
         src/sentry/utils/email/,
         src/sentry/utils/jwt.py,
         src/sentry/utils/kvstore,
+        src/sentry/utils/outcomes.py,
         src/sentry/utils/time_window.py,
         src/sentry/web/decorators.py,
         tests/sentry/lang/native/test_appconnect.py,

+ 4 - 0
src/sentry/conf/server.py

@@ -2206,6 +2206,7 @@ KAFKA_CLUSTERS = {
 
 KAFKA_EVENTS = "events"
 KAFKA_OUTCOMES = "outcomes"
+KAFKA_OUTCOMES_BILLING = "outcomes-billing"
 KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS = "events-subscription-results"
 KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS = "transactions-subscription-results"
 KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS = "sessions-subscription-results"
@@ -2225,6 +2226,9 @@ KAFKA_SNUBA_METRICS = "snuba-metrics"
 KAFKA_TOPICS = {
     KAFKA_EVENTS: {"cluster": "default", "topic": KAFKA_EVENTS},
     KAFKA_OUTCOMES: {"cluster": "default", "topic": KAFKA_OUTCOMES},
+    # When OUTCOMES_BILLING is None, it inherits from OUTCOMES and does not
+    # create a separate producer. Check ``track_outcome`` for details.
+    KAFKA_OUTCOMES_BILLING: None,
     KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: {
         "cluster": "default",
         "topic": KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,

+ 49 - 18
src/sentry/utils/outcomes.py

@@ -1,6 +1,7 @@
 import time
 from datetime import datetime
 from enum import IntEnum
+from typing import Optional
 
 from django.conf import settings
 
@@ -27,22 +28,25 @@ class Outcome(IntEnum):
     def parse(cls, name: str) -> "Outcome":
         return Outcome[name.upper()]
 
+    def is_billing(self) -> bool:
+        return self in (Outcome.ACCEPTED, Outcome.RATE_LIMITED)
+
 
-outcomes = settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]
 outcomes_publisher = None
+billing_publisher = None
 
 
 def track_outcome(
-    org_id,
-    project_id,
-    key_id,
-    outcome,
-    reason=None,
-    timestamp=None,
-    event_id=None,
-    category=None,
-    quantity=None,
-):
+    org_id: int,
+    project_id: int,
+    key_id: Optional[int],
+    outcome: Outcome,
+    reason: Optional[str] = None,
+    timestamp: Optional[datetime] = None,
+    event_id: Optional[str] = None,
+    category: Optional[DataCategory] = None,
+    quantity: Optional[int] = None,
+) -> None:
     """
     This is a central point to track org/project counters per incoming event.
     NB: This should only ever be called once per incoming event, which means
@@ -54,11 +58,7 @@ def track_outcome(
     events.
     """
     global outcomes_publisher
-    if outcomes_publisher is None:
-        cluster_name = outcomes["cluster"]
-        outcomes_publisher = KafkaPublisher(
-            kafka_config.get_kafka_producer_cluster_options(cluster_name)
-        )
+    global billing_publisher
 
     if quantity is None:
         quantity = 1
@@ -71,11 +71,41 @@ def track_outcome(
     assert isinstance(category, (type(None), DataCategory))
     assert isinstance(quantity, int)
 
+    outcomes_config = settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]
+    billing_config = settings.KAFKA_TOPICS.get(settings.KAFKA_OUTCOMES_BILLING) or outcomes_config
+
+    # Create a second producer instance only if the cluster differs. Otherwise,
+    # reuse the same producer and just send to the other topic.
+    if outcome.is_billing() and billing_config["cluster"] != outcomes_config["cluster"]:
+        if billing_publisher is None:
+            cluster_name = billing_config["cluster"]
+            billing_publisher = KafkaPublisher(
+                kafka_config.get_kafka_producer_cluster_options(cluster_name)
+            )
+        publisher = billing_publisher
+
+    else:
+        if outcomes_publisher is None:
+            cluster_name = outcomes_config["cluster"]
+            outcomes_publisher = KafkaPublisher(
+                kafka_config.get_kafka_producer_cluster_options(cluster_name)
+            )
+        publisher = outcomes_publisher
+
     timestamp = timestamp or to_datetime(time.time())
 
+    # Send billing outcomes to a dedicated topic if there is a separate
+    # configuration for it. Otherwise, fall back to the regular outcomes topic.
+    # This does NOT switch the producer, if both topics are on the same cluster.
+    #
+    # In Sentry, there is no significant difference between the classes of
+    # outcome. In Sentry SaaS, they have elevated stability requirements as they
+    # are used for spike protection and quota enforcement.
+    topic_name = billing_config["topic"] if outcome.is_billing() else outcomes_config["topic"]
+
     # Send a snuba metrics payload.
-    outcomes_publisher.publish(
-        outcomes["topic"],
+    publisher.publish(
+        topic_name,
         json.dumps(
             {
                 "timestamp": timestamp,
@@ -98,5 +128,6 @@ def track_outcome(
             "outcome": outcome.name.lower(),
             "reason": reason,
             "category": category.api_name() if category is not None else "null",
+            "topic": topic_name,
         },
     )

+ 180 - 0
tests/sentry/utils/test_outcomes.py

@@ -0,0 +1,180 @@
+import copy
+from unittest.mock import Mock
+
+import pytest
+
+from sentry.utils import json, kafka_config, outcomes
+from sentry.utils.outcomes import Outcome, track_outcome
+
+
+@pytest.fixture(autouse=True)
+def setup(monkeypatch, settings):
+    # Rely on the fact that the publisher is initialized lazily
+    monkeypatch.setattr(kafka_config, "get_kafka_producer_cluster_options", Mock())
+    monkeypatch.setattr(outcomes, "KafkaPublisher", Mock())
+
+    # Reset internals of the outcomes module
+    monkeypatch.setattr(outcomes, "outcomes_publisher", None)
+    monkeypatch.setattr(outcomes, "billing_publisher", None)
+
+    # Settings fixture does not restore nested mutable attributes
+    settings.KAFKA_TOPICS = copy.deepcopy(settings.KAFKA_TOPICS)
+
+
+@pytest.mark.parametrize(
+    "outcome, is_billing",
+    [
+        (Outcome.ACCEPTED, True),
+        (Outcome.FILTERED, False),
+        (Outcome.RATE_LIMITED, True),
+        (Outcome.INVALID, False),
+        (Outcome.ABUSE, False),
+        (Outcome.CLIENT_DISCARD, False),
+    ],
+)
+def test_outcome_is_billing(outcome: Outcome, is_billing: bool):
+    """
+    Tests the complete behavior of ``is_billing``, used for routing outcomes to
+    different Kafka topics. This is more of a sanity check to prevent
+    unintentional changes.
+    """
+    assert outcome.is_billing() is is_billing
+
+
+@pytest.mark.parametrize(
+    "name, outcome",
+    [
+        ("rate_limited", Outcome.RATE_LIMITED),
+        ("RATE_LIMITED", Outcome.RATE_LIMITED),
+    ],
+)
+def test_parse_outcome(name, outcome):
+    """
+    Asserts *case insensitive* parsing of outcomes from their canonical names,
+    as used in the API and queries.
+    """
+    assert Outcome.parse(name) == outcome
+
+
+def test_track_outcome_default(settings):
+    """
+    Asserts an outcomes serialization roundtrip with defaults.
+
+    Additionally checks that non-billing outcomes are routed to the DEFAULT
+    outcomes cluster and topic, even if there is a separate cluster for billing
+    outcomes.
+    """
+
+    # Provide a billing cluster config that should be ignored
+    settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {
+        "cluster": "different",
+        "topic": "new-topic",
+    }
+
+    track_outcome(
+        org_id=1,
+        project_id=2,
+        key_id=3,
+        outcome=Outcome.INVALID,
+        reason="project_id",
+    )
+
+    cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
+    assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
+
+    assert outcomes.outcomes_publisher
+    (topic_name, payload), _ = outcomes.outcomes_publisher.publish.call_args
+    assert topic_name == settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["topic"]
+
+    data = json.loads(payload)
+    del data["timestamp"]
+    assert data == {
+        "org_id": 1,
+        "project_id": 2,
+        "key_id": 3,
+        "outcome": Outcome.INVALID.value,
+        "reason": "project_id",
+        "event_id": None,
+        "category": None,
+        "quantity": 1,
+    }
+
+    assert outcomes.billing_publisher is None
+
+
+def test_track_outcome_billing(settings):
+    """
+    Checks that outcomes are routed to the SHARED topic within the same cluster
+    in default configuration.
+    """
+
+    track_outcome(
+        org_id=1,
+        project_id=1,
+        key_id=1,
+        outcome=Outcome.ACCEPTED,
+    )
+
+    cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
+    assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
+
+    assert outcomes.outcomes_publisher
+    (topic_name, _), _ = outcomes.outcomes_publisher.publish.call_args
+    assert topic_name == settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["topic"]
+
+    assert outcomes.billing_publisher is None
+
+
+def test_track_outcome_billing_topic(settings):
+    """
+    Checks that outcomes are routed to the DEDICATED billing topic within the
+    same cluster in default configuration.
+    """
+
+    settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {
+        "cluster": settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],
+        "topic": "new-topic",
+    }
+
+    track_outcome(
+        org_id=1,
+        project_id=1,
+        key_id=1,
+        outcome=Outcome.ACCEPTED,
+    )
+
+    cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
+    assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
+
+    assert outcomes.outcomes_publisher
+    (topic_name, _), _ = outcomes.outcomes_publisher.publish.call_args
+    assert topic_name == "new-topic"
+
+    assert outcomes.billing_publisher is None
+
+
+def test_track_outcome_billing_cluster(settings):
+    """
+    Checks that outcomes are routed to the dedicated cluster and topic.
+    """
+
+    settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {
+        "cluster": "different",
+        "topic": "new-topic",
+    }
+
+    track_outcome(
+        org_id=1,
+        project_id=1,
+        key_id=1,
+        outcome=Outcome.ACCEPTED,
+    )
+
+    cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
+    assert cluster_args == ("different",)
+
+    assert outcomes.billing_publisher
+    (topic_name, _), _ = outcomes.billing_publisher.publish.call_args
+    assert topic_name == "new-topic"
+
+    assert outcomes.outcomes_publisher is None