|
@@ -1,24 +1,21 @@
|
|
|
-import copy
|
|
|
-from unittest.mock import Mock
|
|
|
+from unittest import mock
|
|
|
|
|
|
import pytest
|
|
|
+from django.conf import settings
|
|
|
|
|
|
from sentry.utils import json, kafka_config, outcomes
|
|
|
from sentry.utils.outcomes import Outcome, track_outcome
|
|
|
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
|
-def setup(monkeypatch, settings):
|
|
|
+def setup(monkeypatch):
|
|
|
# Rely on the fact that the publisher is initialized lazily
|
|
|
- monkeypatch.setattr(kafka_config, "get_kafka_producer_cluster_options", Mock())
|
|
|
- monkeypatch.setattr(outcomes, "KafkaPublisher", Mock())
|
|
|
-
|
|
|
- # Reset internals of the outcomes module
|
|
|
- monkeypatch.setattr(outcomes, "outcomes_publisher", None)
|
|
|
- monkeypatch.setattr(outcomes, "billing_publisher", None)
|
|
|
-
|
|
|
- # Settings fixture does not restore nested mutable attributes
|
|
|
- settings.KAFKA_TOPICS = copy.deepcopy(settings.KAFKA_TOPICS)
|
|
|
+ with mock.patch.object(kafka_config, "get_kafka_producer_cluster_options"):
|
|
|
+ with mock.patch.object(outcomes, "KafkaPublisher"):
|
|
|
+ # Reset internals of the outcomes module
|
|
|
+ with mock.patch.object(outcomes, "outcomes_publisher", None):
|
|
|
+ with mock.patch.object(outcomes, "billing_publisher", None):
|
|
|
+ yield
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
@@ -56,7 +53,7 @@ def test_parse_outcome(name, outcome):
|
|
|
assert Outcome.parse(name) == outcome
|
|
|
|
|
|
|
|
|
-def test_track_outcome_default(settings):
|
|
|
+def test_track_outcome_default():
|
|
|
"""
|
|
|
Asserts an outcomes serialization roundtrip with defaults.
|
|
|
|
|
@@ -66,40 +63,43 @@ def test_track_outcome_default(settings):
|
|
|
"""
|
|
|
|
|
|
# Provide a billing cluster config that should be ignored
|
|
|
- settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {"cluster": "different"}
|
|
|
-
|
|
|
- track_outcome(
|
|
|
- org_id=1,
|
|
|
- project_id=2,
|
|
|
- key_id=3,
|
|
|
- outcome=Outcome.INVALID,
|
|
|
- reason="project_id",
|
|
|
- )
|
|
|
-
|
|
|
- cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
- assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
|
|
|
-
|
|
|
- assert outcomes.outcomes_publisher
|
|
|
- (topic_name, payload), _ = outcomes.outcomes_publisher.publish.call_args
|
|
|
- assert topic_name == settings.KAFKA_OUTCOMES
|
|
|
-
|
|
|
- data = json.loads(payload)
|
|
|
- del data["timestamp"]
|
|
|
- assert data == {
|
|
|
- "org_id": 1,
|
|
|
- "project_id": 2,
|
|
|
- "key_id": 3,
|
|
|
- "outcome": Outcome.INVALID.value,
|
|
|
- "reason": "project_id",
|
|
|
- "event_id": None,
|
|
|
- "category": None,
|
|
|
- "quantity": 1,
|
|
|
- }
|
|
|
-
|
|
|
- assert outcomes.billing_publisher is None
|
|
|
-
|
|
|
-
|
|
|
-def test_track_outcome_billing(settings):
|
|
|
+ with mock.patch.dict(
|
|
|
+ settings.KAFKA_TOPICS, {settings.KAFKA_OUTCOMES_BILLING: {"cluster": "different"}}
|
|
|
+ ):
|
|
|
+ track_outcome(
|
|
|
+ org_id=1,
|
|
|
+ project_id=2,
|
|
|
+ key_id=3,
|
|
|
+ outcome=Outcome.INVALID,
|
|
|
+ reason="project_id",
|
|
|
+ )
|
|
|
+
|
|
|
+ cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
+ assert cluster_args == (
|
|
|
+ kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"],
|
|
|
+ )
|
|
|
+
|
|
|
+ assert outcomes.outcomes_publisher
|
|
|
+ (topic_name, payload), _ = outcomes.outcomes_publisher.publish.call_args
|
|
|
+ assert topic_name == settings.KAFKA_OUTCOMES
|
|
|
+
|
|
|
+ data = json.loads(payload)
|
|
|
+ del data["timestamp"]
|
|
|
+ assert data == {
|
|
|
+ "org_id": 1,
|
|
|
+ "project_id": 2,
|
|
|
+ "key_id": 3,
|
|
|
+ "outcome": Outcome.INVALID.value,
|
|
|
+ "reason": "project_id",
|
|
|
+ "event_id": None,
|
|
|
+ "category": None,
|
|
|
+ "quantity": 1,
|
|
|
+ }
|
|
|
+
|
|
|
+ assert outcomes.billing_publisher is None
|
|
|
+
|
|
|
+
|
|
|
+def test_track_outcome_billing():
|
|
|
"""
|
|
|
Checks that outcomes are routed to the SHARED topic within the same cluster
|
|
|
in default configuration.
|
|
@@ -113,7 +113,7 @@ def test_track_outcome_billing(settings):
|
|
|
)
|
|
|
|
|
|
cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
- assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
|
|
|
+ assert cluster_args == (kafka_config.get_topic_definition(settings.KAFKA_OUTCOMES)["cluster"],)
|
|
|
|
|
|
assert outcomes.outcomes_publisher
|
|
|
(topic_name, _), _ = outcomes.outcomes_publisher.publish.call_args
|
|
@@ -122,31 +122,35 @@ def test_track_outcome_billing(settings):
|
|
|
assert outcomes.billing_publisher is None
|
|
|
|
|
|
|
|
|
-def test_track_outcome_billing_topic(settings):
|
|
|
+def test_track_outcome_billing_topic():
|
|
|
"""
|
|
|
Checks that outcomes are routed to the DEDICATED billing topic within the
|
|
|
same cluster in default configuration.
|
|
|
"""
|
|
|
|
|
|
- settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {
|
|
|
- "cluster": settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],
|
|
|
- }
|
|
|
+ with mock.patch.dict(
|
|
|
+ settings.KAFKA_TOPICS,
|
|
|
+ {
|
|
|
+ settings.KAFKA_OUTCOMES_BILLING: {
|
|
|
+ "cluster": settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],
|
|
|
+ }
|
|
|
+ },
|
|
|
+ ):
|
|
|
+ track_outcome(
|
|
|
+ org_id=1,
|
|
|
+ project_id=1,
|
|
|
+ key_id=1,
|
|
|
+ outcome=Outcome.ACCEPTED,
|
|
|
+ )
|
|
|
|
|
|
- track_outcome(
|
|
|
- org_id=1,
|
|
|
- project_id=1,
|
|
|
- key_id=1,
|
|
|
- outcome=Outcome.ACCEPTED,
|
|
|
- )
|
|
|
+ cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
+ assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
|
|
|
|
|
|
- cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
- assert cluster_args == (settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES]["cluster"],)
|
|
|
+ assert outcomes.outcomes_publisher
|
|
|
+ (topic_name, _), _ = outcomes.outcomes_publisher.publish.call_args
|
|
|
+ assert topic_name == settings.KAFKA_OUTCOMES_BILLING
|
|
|
|
|
|
- assert outcomes.outcomes_publisher
|
|
|
- (topic_name, _), _ = outcomes.outcomes_publisher.publish.call_args
|
|
|
- assert topic_name == settings.KAFKA_OUTCOMES_BILLING
|
|
|
-
|
|
|
- assert outcomes.billing_publisher is None
|
|
|
+ assert outcomes.billing_publisher is None
|
|
|
|
|
|
|
|
|
def test_track_outcome_billing_cluster(settings):
|
|
@@ -154,20 +158,21 @@ def test_track_outcome_billing_cluster(settings):
|
|
|
Checks that outcomes are routed to the dedicated cluster and topic.
|
|
|
"""
|
|
|
|
|
|
- settings.KAFKA_TOPICS[settings.KAFKA_OUTCOMES_BILLING] = {"cluster": "different"}
|
|
|
+ with mock.patch.dict(
|
|
|
+ settings.KAFKA_TOPICS, {settings.KAFKA_OUTCOMES_BILLING: {"cluster": "different"}}
|
|
|
+ ):
|
|
|
+ track_outcome(
|
|
|
+ org_id=1,
|
|
|
+ project_id=1,
|
|
|
+ key_id=1,
|
|
|
+ outcome=Outcome.ACCEPTED,
|
|
|
+ )
|
|
|
|
|
|
- track_outcome(
|
|
|
- org_id=1,
|
|
|
- project_id=1,
|
|
|
- key_id=1,
|
|
|
- outcome=Outcome.ACCEPTED,
|
|
|
- )
|
|
|
-
|
|
|
- cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
- assert cluster_args == ("different",)
|
|
|
+ cluster_args, _ = kafka_config.get_kafka_producer_cluster_options.call_args
|
|
|
+ assert cluster_args == ("different",)
|
|
|
|
|
|
- assert outcomes.billing_publisher
|
|
|
- (topic_name, _), _ = outcomes.billing_publisher.publish.call_args
|
|
|
- assert topic_name == settings.KAFKA_OUTCOMES_BILLING
|
|
|
+ assert outcomes.billing_publisher
|
|
|
+ (topic_name, _), _ = outcomes.billing_publisher.publish.call_args
|
|
|
+ assert topic_name == settings.KAFKA_OUTCOMES_BILLING
|
|
|
|
|
|
- assert outcomes.outcomes_publisher is None
|
|
|
+ assert outcomes.outcomes_publisher is None
|