test_tasks.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from copy import deepcopy
  2. from functools import cached_property
  3. from uuid import uuid4
  4. from confluent_kafka import Producer
  5. from django.conf import settings
  6. from django.core import mail
  7. from django.test.utils import override_settings
  8. from freezegun import freeze_time
  9. from sentry.incidents.action_handlers import (
  10. EmailActionHandler,
  11. generate_incident_trigger_email_context,
  12. )
  13. from sentry.incidents.logic import (
  14. CRITICAL_TRIGGER_LABEL,
  15. create_alert_rule_trigger,
  16. create_alert_rule_trigger_action,
  17. )
  18. from sentry.incidents.models import (
  19. AlertRuleTriggerAction,
  20. Incident,
  21. IncidentStatus,
  22. IncidentType,
  23. TriggerStatus,
  24. )
  25. from sentry.incidents.tasks import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
  26. from sentry.snuba.query_subscription_consumer import QuerySubscriptionConsumer, subscriber_registry
  27. from sentry.testutils import TestCase
  28. from sentry.utils import json
  29. @freeze_time()
  30. class HandleSnubaQueryUpdateTest(TestCase):
  31. def setUp(self):
  32. super().setUp()
  33. self.override_settings_cm = override_settings(
  34. KAFKA_TOPICS={self.topic: {"cluster": "default"}}
  35. )
  36. self.override_settings_cm.__enter__()
  37. self.orig_registry = deepcopy(subscriber_registry)
  38. def tearDown(self):
  39. super().tearDown()
  40. self.override_settings_cm.__exit__(None, None, None)
  41. subscriber_registry.clear()
  42. subscriber_registry.update(self.orig_registry)
  43. @cached_property
  44. def subscription(self):
  45. return self.rule.snuba_query.subscriptions.get()
  46. @cached_property
  47. def rule(self):
  48. with self.tasks():
  49. rule = self.create_alert_rule(
  50. name="some rule",
  51. query="",
  52. aggregate="count()",
  53. time_window=1,
  54. threshold_period=1,
  55. resolve_threshold=10,
  56. )
  57. trigger = create_alert_rule_trigger(rule, CRITICAL_TRIGGER_LABEL, 100)
  58. create_alert_rule_trigger_action(
  59. trigger,
  60. AlertRuleTriggerAction.Type.EMAIL,
  61. AlertRuleTriggerAction.TargetType.USER,
  62. str(self.user.id),
  63. )
  64. return rule
  65. @cached_property
  66. def trigger(self):
  67. return self.rule.alertruletrigger_set.get()
  68. @cached_property
  69. def action(self):
  70. return self.trigger.alertruletriggeraction_set.get()
  71. @cached_property
  72. def producer(self):
  73. cluster_name = settings.KAFKA_TOPICS[self.topic]["cluster"]
  74. conf = {
  75. "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["common"][
  76. "bootstrap.servers"
  77. ],
  78. "session.timeout.ms": 6000,
  79. }
  80. return Producer(conf)
  81. @cached_property
  82. def topic(self):
  83. return uuid4().hex
  84. def test(self):
  85. # Full integration test to ensure that when a subscription receives an update
  86. # the `QuerySubscriptionConsumer` successfully retries the subscription and
  87. # calls the correct callback, which should result in an incident being created.
  88. message = {
  89. "version": 3,
  90. "payload": {
  91. "subscription_id": self.subscription.subscription_id,
  92. "result": {"data": [{"some_col": 101}]},
  93. "request": {
  94. "some": "data",
  95. "query": """MATCH (metrics_counters) SELECT sum(value) AS value BY
  96. tags[3] WHERE org_id = 1 AND project_id IN tuple(1) AND metric_id = 16
  97. AND tags[3] IN tuple(13, 4)""",
  98. },
  99. "entity": "metrics_counters",
  100. "timestamp": "2020-01-01T01:23:45.1234",
  101. },
  102. }
  103. self.producer.produce(self.topic, json.dumps(message))
  104. self.producer.flush()
  105. def active_incident():
  106. return Incident.objects.filter(
  107. type=IncidentType.ALERT_TRIGGERED.value, alert_rule=self.rule
  108. ).exclude(status=IncidentStatus.CLOSED.value)
  109. consumer = QuerySubscriptionConsumer("hi", topic=self.topic)
  110. original_callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
  111. def shutdown_callback(*args, **kwargs):
  112. # We want to just exit after the callback so that we can see the result of
  113. # processing.
  114. original_callback(*args, **kwargs)
  115. consumer.signal_shutdown()
  116. subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = shutdown_callback
  117. with self.feature(["organizations:incidents", "organizations:performance-view"]):
  118. assert not active_incident().exists()
  119. with self.tasks(), self.capture_on_commit_callbacks(execute=True):
  120. consumer.run()
  121. assert active_incident().exists()
  122. assert len(mail.outbox) == 1
  123. handler = EmailActionHandler(self.action, active_incident().get(), self.project)
  124. message = handler.build_message(
  125. generate_incident_trigger_email_context(
  126. handler.project,
  127. handler.incident,
  128. handler.action.alert_rule_trigger,
  129. TriggerStatus.ACTIVE,
  130. IncidentStatus.CRITICAL,
  131. ),
  132. TriggerStatus.ACTIVE,
  133. self.user.id,
  134. )
  135. out = mail.outbox[0]
  136. assert out.to == [self.user.email]
  137. assert out.subject == message.subject
  138. built_message = message.build(self.user.email)
  139. assert out.body == built_message.body