test_tasks.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from copy import deepcopy
  2. from functools import cached_property
  3. from unittest import mock
  4. from uuid import uuid4
  5. from arroyo.utils import metrics
  6. from confluent_kafka import Producer
  7. from confluent_kafka.admin import AdminClient
  8. from django.conf import settings
  9. from django.core import mail
  10. from django.test.utils import override_settings
  11. from sentry.incidents.action_handlers import (
  12. EmailActionHandler,
  13. generate_incident_trigger_email_context,
  14. )
  15. from sentry.incidents.logic import (
  16. CRITICAL_TRIGGER_LABEL,
  17. create_alert_rule_trigger,
  18. create_alert_rule_trigger_action,
  19. )
  20. from sentry.incidents.models import (
  21. AlertRuleTriggerAction,
  22. Incident,
  23. IncidentActivity,
  24. IncidentStatus,
  25. IncidentType,
  26. TriggerStatus,
  27. )
  28. from sentry.incidents.tasks import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
  29. from sentry.runner.commands.run import DEFAULT_BLOCK_SIZE
  30. from sentry.snuba.dataset import Dataset
  31. from sentry.snuba.query_subscriptions.constants import topic_to_dataset
  32. from sentry.snuba.query_subscriptions.consumer import subscriber_registry
  33. from sentry.snuba.query_subscriptions.run import get_query_subscription_consumer
  34. from sentry.testutils.cases import TestCase
  35. from sentry.testutils.helpers.datetime import freeze_time
  36. from sentry.testutils.skips import requires_kafka
  37. from sentry.utils import json, kafka_config
  38. from sentry.utils.batching_kafka_consumer import create_topics
  39. pytestmark = [requires_kafka]
  40. @freeze_time()
  41. class HandleSnubaQueryUpdateTest(TestCase):
  42. def setUp(self):
  43. super().setUp()
  44. self.override_settings_cm = override_settings(
  45. KAFKA_TOPICS={self.topic: {"cluster": "default"}}
  46. )
  47. self.override_settings_cm.__enter__()
  48. self.orig_registry = deepcopy(subscriber_registry)
  49. cluster_options = kafka_config.get_kafka_admin_cluster_options(
  50. "default", {"allow.auto.create.topics": "true"}
  51. )
  52. self.admin_client = AdminClient(cluster_options)
  53. kafka_cluster = kafka_config.get_topic_definition(self.topic)["cluster"]
  54. create_topics(kafka_cluster, [self.topic])
  55. def tearDown(self):
  56. super().tearDown()
  57. self.override_settings_cm.__exit__(None, None, None)
  58. subscriber_registry.clear()
  59. subscriber_registry.update(self.orig_registry)
  60. self.admin_client.delete_topics([self.topic])
  61. metrics._metrics_backend = None
  62. @cached_property
  63. def subscription(self):
  64. return self.rule.snuba_query.subscriptions.get()
  65. @cached_property
  66. def rule(self):
  67. with self.tasks():
  68. rule = self.create_alert_rule(
  69. name="some rule",
  70. query="",
  71. aggregate="count()",
  72. time_window=1,
  73. threshold_period=1,
  74. resolve_threshold=10,
  75. )
  76. trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100)
  77. create_alert_rule_trigger_action(
  78. trigger,
  79. AlertRuleTriggerAction.Type.EMAIL,
  80. AlertRuleTriggerAction.TargetType.USER,
  81. str(self.user.id),
  82. )
  83. return rule
  84. @cached_property
  85. def trigger(self):
  86. return self.rule.alertruletrigger_set.get()
  87. @cached_property
  88. def action(self):
  89. return self.trigger.alertruletriggeraction_set.get()
  90. @cached_property
  91. def producer(self):
  92. cluster_name = kafka_config.get_topic_definition(self.topic)["cluster"]
  93. conf = {
  94. "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][
  95. "bootstrap.servers"
  96. ],
  97. "session.timeout.ms": 6000,
  98. }
  99. return Producer(conf)
  100. @cached_property
  101. def topic(self):
  102. return uuid4().hex
  103. def run_test(self, consumer):
  104. # Full integration test to ensure that when a subscription receives an update
  105. # the `QuerySubscriptionConsumer` successfully retries the subscription and
  106. # calls the correct callback, which should result in an incident being created.
  107. message = {
  108. "version": 3,
  109. "payload": {
  110. "subscription_id": self.subscription.subscription_id,
  111. "result": {
  112. "data": [{"some_col": 101}],
  113. "meta": [{"name": "count", "type": "UInt64"}],
  114. },
  115. "request": {
  116. "some": "data",
  117. "query": """MATCH (metrics_counters) SELECT sum(value) AS value BY
  118. tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16
  119. AND tags[3] IN tuple(13, 4)""",
  120. },
  121. "entity": "metrics_counters",
  122. "timestamp": "2020-01-01T01:23:45.1234",
  123. },
  124. }
  125. self.producer.produce(self.topic, json.dumps(message))
  126. self.producer.flush()
  127. def active_incident():
  128. return Incident.objects.filter(
  129. type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule
  130. ).exclude(status=IncidentStatus.CLOSED.value)
  131. original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
  132. def shutdown_callback(*args, **kwargs):
  133. # We want to just exit after the callback so that we can see the result of
  134. # processing.
  135. original_callback(*args, **kwargs)
  136. consumer.signal_shutdown()
  137. subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback
  138. with self.feature(["organizations:incidents", "organizations:performance-view"]):
  139. assert not active_incident().exists()
  140. with self.tasks(), self.capture_on_commit_callbacks(execute=True):
  141. consumer.run()
  142. assert active_incident().exists()
  143. assert len(mail.outbox) == 1
  144. handler = EmailActionHandler(self.action, active_incident().get(), self.project)
  145. incident_activity = (
  146. IncidentActivity.objects.filter(incident=handler.incident).order_by("-id").first()
  147. )
  148. message_builder = handler.build_message(
  149. generate_incident_trigger_email_context(
  150. handler.project,
  151. handler.incident,
  152. handler.action.alert_rule_trigger,
  153. TriggerStatus.ACTIVE,
  154. IncidentStatus.CRITICAL,
  155. notification_uuid=str(incident_activity.notification_uuid),
  156. ),
  157. TriggerStatus.ACTIVE,
  158. self.user.id,
  159. )
  160. out = mail.outbox[0]
  161. assert isinstance(out, mail.EmailMultiAlternatives)
  162. assert out.to == [self.user.email]
  163. assert out.subject == message_builder.subject
  164. built_message = message_builder.build(self.user.email)
  165. assert out.body == built_message.body
  166. def test_arroyo(self):
  167. with mock.patch.dict(topic_to_dataset, {self.topic: Dataset.Metrics}):
  168. consumer = get_query_subscription_consumer(
  169. self.topic,
  170. "hi",
  171. True,
  172. "earliest",
  173. 1,
  174. 1,
  175. 1,
  176. DEFAULT_BLOCK_SIZE,
  177. DEFAULT_BLOCK_SIZE,
  178. multi_proc=False,
  179. )
  180. self.run_test(consumer)