from copy import deepcopy from functools import cached_property from unittest import mock from uuid import uuid4 from arroyo.utils import metrics from confluent_kafka import Producer from confluent_kafka.admin import AdminClient from django.conf import settings from django.core import mail from django.test.utils import override_settings from sentry.incidents.action_handlers import ( EmailActionHandler, generate_incident_trigger_email_context, ) from sentry.incidents.logic import ( CRITICAL_TRIGGER_LABEL, create_alert_rule_trigger, create_alert_rule_trigger_action, ) from sentry.incidents.models import ( AlertRuleTriggerAction, Incident, IncidentActivity, IncidentStatus, IncidentType, TriggerStatus, ) from sentry.incidents.tasks import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE from sentry.runner.commands.run import DEFAULT_BLOCK_SIZE from sentry.snuba.dataset import Dataset from sentry.snuba.query_subscriptions.constants import topic_to_dataset from sentry.snuba.query_subscriptions.consumer import subscriber_registry from sentry.snuba.query_subscriptions.run import get_query_subscription_consumer from sentry.testutils.cases import TestCase from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.skips import requires_kafka from sentry.utils import json, kafka_config from sentry.utils.batching_kafka_consumer import create_topics pytestmark = [requires_kafka] @freeze_time() class HandleSnubaQueryUpdateTest(TestCase): def setUp(self): super().setUp() self.override_settings_cm = override_settings( KAFKA_TOPICS={self.topic: {"cluster": "default"}} ) self.override_settings_cm.__enter__() self.orig_registry = deepcopy(subscriber_registry) cluster_options = kafka_config.get_kafka_admin_cluster_options( "default", {"allow.auto.create.topics": "true"} ) self.admin_client = AdminClient(cluster_options) kafka_cluster = kafka_config.get_topic_definition(self.topic)["cluster"] create_topics(kafka_cluster, [self.topic]) def tearDown(self): super().tearDown() self.override_settings_cm.__exit__(None, None, None) subscriber_registry.clear() subscriber_registry.update(self.orig_registry) self.admin_client.delete_topics([self.topic]) metrics._metrics_backend = None @cached_property def subscription(self): return self.rule.snuba_query.subscriptions.get() @cached_property def rule(self): with self.tasks(): rule = self.create_alert_rule( name="some rule", query="", aggregate="count()", time_window=1, threshold_period=1, resolve_threshold=10, ) trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100) create_alert_rule_trigger_action( trigger, AlertRuleTriggerAction.Type.EMAIL, AlertRuleTriggerAction.TargetType.USER, str(self.user.id), ) return rule @cached_property def trigger(self): return self.rule.alertruletrigger_set.get() @cached_property def action(self): return self.trigger.alertruletriggeraction_set.get() @cached_property def producer(self): cluster_name = kafka_config.get_topic_definition(self.topic)["cluster"] conf = { "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][ "bootstrap.servers" ], "session.timeout.ms": 6000, } return Producer(conf) @cached_property def topic(self): return uuid4().hex def run_test(self, consumer): # Full integration test to ensure that when a subscription receives an update # the `QuerySubscriptionConsumer` successfully retries the subscription and # calls the correct callback, which should result in an incident being created. message = { "version": 3, "payload": { "subscription_id": self.subscription.subscription_id, "result": { "data": [{"some_col": 101}], "meta": [{"name": "count", "type": "UInt64"}], }, "request": { "some": "data", "query": """MATCH (metrics_counters) SELECT sum(value) AS value BY tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16 AND tags[3] IN tuple(13, 4)""", }, "entity": "metrics_counters", "timestamp": "2020-01-01T01:23:45.1234", }, } self.producer.produce(self.topic, json.dumps(message)) self.producer.flush() def active_incident(): return Incident.objects.filter( type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule ).exclude(status=IncidentStatus.CLOSED.value) original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] def shutdown_callback(*args, **kwargs): # We want to just exit after the callback so that we can see the result of # processing. original_callback(*args, **kwargs) consumer.signal_shutdown() subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback with self.feature(["organizations:incidents", "organizations:performance-view"]): assert not active_incident().exists() with self.tasks(), self.capture_on_commit_callbacks(execute=True): consumer.run() assert active_incident().exists() assert len(mail.outbox) == 1 handler = EmailActionHandler(self.action, active_incident().get(), self.project) incident_activity = ( IncidentActivity.objects.filter(incident=handler.incident).order_by("-id").first() ) message_builder = handler.build_message( generate_incident_trigger_email_context( handler.project, handler.incident, handler.action.alert_rule_trigger, TriggerStatus.ACTIVE, IncidentStatus.CRITICAL, notification_uuid=str(incident_activity.notification_uuid), ), TriggerStatus.ACTIVE, self.user.id, ) out = mail.outbox[0] assert isinstance(out, mail.EmailMultiAlternatives) assert out.to == [self.user.email] assert out.subject == message_builder.subject built_message = message_builder.build(self.user.email) assert out.body == built_message.body def test_arroyo(self): with mock.patch.dict(topic_to_dataset, {self.topic: Dataset.Metrics}): consumer = get_query_subscription_consumer( self.topic, "hi", True, "earliest", 1, 1, 1, DEFAULT_BLOCK_SIZE, DEFAULT_BLOCK_SIZE, multi_proc=False, ) self.run_test(consumer)