test_tasks.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from copy import deepcopy
  2. from functools import cached_property
  3. from uuid import uuid4
  4. from arroyo.utils import metrics
  5. from confluent_kafka import Producer
  6. from confluent_kafka.admin import AdminClient
  7. from django.conf import settings
  8. from django.core import mail
  9. from django.test.utils import override_settings
  10. from freezegun import freeze_time
  11. from sentry.incidents.action_handlers import (
  12. EmailActionHandler,
  13. generate_incident_trigger_email_context,
  14. )
  15. from sentry.incidents.logic import (
  16. CRITICAL_TRIGGER_LABEL,
  17. create_alert_rule_trigger,
  18. create_alert_rule_trigger_action,
  19. )
  20. from sentry.incidents.models import (
  21. AlertRuleTriggerAction,
  22. Incident,
  23. IncidentStatus,
  24. IncidentType,
  25. TriggerStatus,
  26. )
  27. from sentry.incidents.tasks import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
  28. from sentry.snuba.query_subscription_consumer import (
  29. QuerySubscriptionConsumer,
  30. get_query_subscription_consumer,
  31. subscriber_registry,
  32. )
  33. from sentry.testutils import TestCase
  34. from sentry.utils import json, kafka_config
  35. from sentry.utils.batching_kafka_consumer import create_topics
  36. @freeze_time()
  37. class HandleSnubaQueryUpdateTest(TestCase):
  38. def setUp(self):
  39. super().setUp()
  40. self.override_settings_cm = override_settings(
  41. KAFKA_TOPICS={self.topic: {"cluster": "default"}}
  42. )
  43. self.override_settings_cm.__enter__()
  44. self.orig_registry = deepcopy(subscriber_registry)
  45. cluster_options = kafka_config.get_kafka_admin_cluster_options(
  46. "default", {"allow.auto.create.topics": "true"}
  47. )
  48. self.admin_client = AdminClient(cluster_options)
  49. kafka_cluster = settings.KAFKA_TOPICS[self.topic]["cluster"]
  50. create_topics(kafka_cluster, [self.topic])
  51. def tearDown(self):
  52. super().tearDown()
  53. self.override_settings_cm.__exit__(None, None, None)
  54. subscriber_registry.clear()
  55. subscriber_registry.update(self.orig_registry)
  56. self.admin_client.delete_topics([self.topic])
  57. metrics._metrics_backend = None
  58. @cached_property
  59. def subscription(self):
  60. return self.rule.snuba_query.subscriptions.get()
  61. @cached_property
  62. def rule(self):
  63. with self.tasks():
  64. rule = self.create_alert_rule(
  65. name="some rule",
  66. query="",
  67. aggregate="count()",
  68. time_window=1,
  69. threshold_period=1,
  70. resolve_threshold=10,
  71. )
  72. trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100)
  73. create_alert_rule_trigger_action(
  74. trigger,
  75. AlertRuleTriggerAction.Type.EMAIL,
  76. AlertRuleTriggerAction.TargetType.USER,
  77. str(self.user.id),
  78. )
  79. return rule
  80. @cached_property
  81. def trigger(self):
  82. return self.rule.alertruletrigger_set.get()
  83. @cached_property
  84. def action(self):
  85. return self.trigger.alertruletriggeraction_set.get()
  86. @cached_property
  87. def producer(self):
  88. cluster_name = settings.KAFKA_TOPICS[self.topic]["cluster"]
  89. conf = {
  90. "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][
  91. "bootstrap.servers"
  92. ],
  93. "session.timeout.ms": 6000,
  94. }
  95. return Producer(conf)
  96. @cached_property
  97. def topic(self):
  98. return uuid4().hex
  99. def run_test(self, consumer):
  100. # Full integration test to ensure that when a subscription receives an update
  101. # the `QuerySubscriptionConsumer` successfully retries the subscription and
  102. # calls the correct callback, which should result in an incident being created.
  103. message = {
  104. "version": 3,
  105. "payload": {
  106. "subscription_id": self.subscription.subscription_id,
  107. "result": {"data": [{"some_col": 101}]},
  108. "request": {
  109. "some": "data",
  110. "query": """MATCH (metrics_counters) SELECT sum(value) AS value BY
  111. tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16
  112. AND tags[3] IN tuple(13, 4)""",
  113. },
  114. "entity": "metrics_counters",
  115. "timestamp": "2020-01-01T01:23:45.1234",
  116. },
  117. }
  118. self.producer.produce(self.topic, json.dumps(message))
  119. self.producer.flush()
  120. def active_incident():
  121. return Incident.objects.filter(
  122. type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule
  123. ).exclude(status=IncidentStatus.CLOSED.value)
  124. original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
  125. def shutdown_callback(*args, **kwargs):
  126. # We want to just exit after the callback so that we can see the result of
  127. # processing.
  128. original_callback(*args, **kwargs)
  129. consumer.signal_shutdown()
  130. subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback
  131. with self.feature(["organizations:incidents", "organizations:performance-view"]):
  132. assert not active_incident().exists()
  133. with self.tasks(), self.capture_on_commit_callbacks(execute=True):
  134. consumer.run()
  135. assert active_incident().exists()
  136. assert len(mail.outbox) == 1
  137. handler = EmailActionHandler(self.action, active_incident().get(), self.project)
  138. message = handler.build_message(
  139. generate_incident_trigger_email_context(
  140. handler.project,
  141. handler.incident,
  142. handler.action.alert_rule_trigger,
  143. TriggerStatus.ACTIVE,
  144. IncidentStatus.CRITICAL,
  145. ),
  146. TriggerStatus.ACTIVE,
  147. self.user.id,
  148. )
  149. out = mail.outbox[0]
  150. assert out.to == [self.user.email]
  151. assert out.subject == message.subject
  152. built_message = message.build(self.user.email)
  153. assert out.body == built_message.body
  154. def test(self):
  155. consumer = QuerySubscriptionConsumer("hi", topic=self.topic)
  156. self.run_test(consumer)
  157. def test_arroyo(self):
  158. consumer = get_query_subscription_consumer(self.topic, "hi", True, "earliest")
  159. self.run_test(consumer)