test_tasks.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from copy import deepcopy
  2. from functools import cached_property
  3. from uuid import uuid4
  4. from arroyo.utils import metrics
  5. from confluent_kafka import Producer
  6. from confluent_kafka.admin import AdminClient
  7. from django.conf import settings
  8. from django.core import mail
  9. from sentry.conf.types.kafka_definition import Topic
  10. from sentry.incidents.action_handlers import (
  11. EmailActionHandler,
  12. generate_incident_trigger_email_context,
  13. )
  14. from sentry.incidents.logic import (
  15. CRITICAL_TRIGGER_LABEL,
  16. create_alert_rule_trigger,
  17. create_alert_rule_trigger_action,
  18. )
  19. from sentry.incidents.models.alert_rule import AlertRuleTriggerAction
  20. from sentry.incidents.models.incident import (
  21. Incident,
  22. IncidentActivity,
  23. IncidentStatus,
  24. IncidentType,
  25. TriggerStatus,
  26. )
  27. from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
  28. from sentry.snuba.query_subscriptions.consumer import subscriber_registry
  29. from sentry.testutils.cases import TestCase
  30. from sentry.testutils.helpers.datetime import freeze_time
  31. from sentry.testutils.skips import requires_kafka
  32. from sentry.utils import json, kafka_config
  33. from sentry.utils.batching_kafka_consumer import create_topics
  34. pytestmark = [requires_kafka]
  35. @freeze_time()
  36. class HandleSnubaQueryUpdateTest(TestCase):
  37. def setUp(self):
  38. super().setUp()
  39. self.topic = Topic.METRICS_SUBSCRIPTIONS_RESULTS
  40. self.orig_registry = deepcopy(subscriber_registry)
  41. cluster_options = kafka_config.get_kafka_admin_cluster_options(
  42. "default", {"allow.auto.create.topics": "true"}
  43. )
  44. self.admin_client = AdminClient(cluster_options)
  45. topic_defn = kafka_config.get_topic_definition(self.topic)
  46. self.real_topic = topic_defn["real_topic_name"]
  47. self.cluster = topic_defn["cluster"]
  48. create_topics(self.cluster, [self.real_topic])
  49. def tearDown(self):
  50. super().tearDown()
  51. subscriber_registry.clear()
  52. subscriber_registry.update(self.orig_registry)
  53. self.admin_client.delete_topics([self.real_topic])
  54. metrics._metrics_backend = None
  55. @cached_property
  56. def subscription(self):
  57. return self.rule.snuba_query.subscriptions.get()
  58. @cached_property
  59. def rule(self):
  60. with self.tasks():
  61. rule = self.create_alert_rule(
  62. name="some rule",
  63. query="",
  64. aggregate="count()",
  65. time_window=1,
  66. threshold_period=1,
  67. resolve_threshold=10,
  68. )
  69. trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100)
  70. create_alert_rule_trigger_action(
  71. trigger,
  72. AlertRuleTriggerAction.Type.EMAIL,
  73. AlertRuleTriggerAction.TargetType.USER,
  74. str(self.user.id),
  75. )
  76. return rule
  77. @cached_property
  78. def trigger(self):
  79. return self.rule.alertruletrigger_set.get()
  80. @cached_property
  81. def action(self):
  82. return self.trigger.alertruletriggeraction_set.get()
  83. @cached_property
  84. def producer(self):
  85. conf = {
  86. "bootstrap.servers": settings.KAFKA_CLUSTERS[self.cluster]["common"][
  87. "bootstrap.servers"
  88. ],
  89. "session.timeout.ms": 6000,
  90. }
  91. return Producer(conf)
  92. @cached_property
  93. def topic(self):
  94. return uuid4().hex
  95. def run_test(self, consumer):
  96. # Full integration test to ensure that when a subscription receives an update
  97. # the `QuerySubscriptionConsumer` successfully retries the subscription and
  98. # calls the correct callback, which should result in an incident being created.
  99. message = {
  100. "version": 3,
  101. "payload": {
  102. "subscription_id": self.subscription.subscription_id,
  103. "result": {
  104. "data": [{"some_col": 101}],
  105. "meta": [{"name": "count", "type": "UInt64"}],
  106. },
  107. "request": {
  108. "some": "data",
  109. "query": """MATCH (metrics_counters) SELECT sum(value) AS value BY
  110. tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16
  111. AND tags[3] IN tuple(13, 4)""",
  112. },
  113. "entity": "metrics_counters",
  114. "timestamp": "2020-01-01T01:23:45.1234",
  115. },
  116. }
  117. self.producer.produce(self.real_topic, json.dumps(message))
  118. self.producer.flush()
  119. def active_incident():
  120. return Incident.objects.filter(
  121. type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule
  122. ).exclude(status=IncidentStatus.CLOSED.value)
  123. original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
  124. def shutdown_callback(*args, **kwargs):
  125. # We want to just exit after the callback so that we can see the result of
  126. # processing.
  127. original_callback(*args, **kwargs)
  128. consumer.signal_shutdown()
  129. subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback
  130. with self.feature(["organizations:incidents", "organizations:performance-view"]):
  131. assert not active_incident().exists()
  132. with self.tasks(), self.capture_on_commit_callbacks(execute=True):
  133. consumer.run()
  134. assert active_incident().exists()
  135. assert len(mail.outbox) == 1
  136. handler = EmailActionHandler(self.action, active_incident().get(), self.project)
  137. incident_activity = IncidentActivity.objects.filter(incident=handler.incident).order_by(
  138. "-id"
  139. )[0]
  140. message_builder = handler.build_message(
  141. generate_incident_trigger_email_context(
  142. handler.project,
  143. handler.incident,
  144. handler.action.alert_rule_trigger,
  145. TriggerStatus.ACTIVE,
  146. IncidentStatus.CRITICAL,
  147. notification_uuid=str(incident_activity.notification_uuid),
  148. ),
  149. TriggerStatus.ACTIVE,
  150. self.user.id,
  151. )
  152. out = mail.outbox[0]
  153. assert isinstance(out, mail.EmailMultiAlternatives)
  154. assert out.to == [self.user.email]
  155. assert out.subject == message_builder.subject
  156. built_message = message_builder.build(self.user.email)
  157. assert out.body == built_message.body
  158. def test_arroyo(self):
  159. from sentry.consumers import get_stream_processor
  160. consumer = get_stream_processor(
  161. "metrics-subscription-results",
  162. consumer_args=["--max-batch-size=1", "--max-batch-time-ms=1000", "--processes=1"],
  163. topic=None,
  164. cluster=None,
  165. group_id="hi",
  166. strict_offset_reset=True,
  167. auto_offset_reset="earliest",
  168. enforce_schema=True,
  169. )
  170. self.run_test(consumer)