test_tasks.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from __future__ import absolute_import
  2. import json
  3. from copy import deepcopy
  4. from uuid import uuid4
  5. from confluent_kafka import Producer
  6. from django.conf import settings
  7. from django.test.utils import override_settings
  8. from exam import fixture
  9. from sentry.incidents.logic import create_alert_rule
  10. from sentry.snuba.subscriptions import query_aggregation_to_snuba
  11. from sentry.incidents.models import AlertRuleThresholdType, Incident, IncidentStatus, IncidentType
  12. from sentry.incidents.tasks import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
  13. from sentry.snuba.models import QueryAggregations
  14. from sentry.snuba.query_subscription_consumer import QuerySubscriptionConsumer, subscriber_registry
  15. from sentry.testutils import TestCase
  16. class HandleSnubaQueryUpdateTest(TestCase):
  17. def setUp(self):
  18. super(HandleSnubaQueryUpdateTest, self).setUp()
  19. self.override_settings_cm = override_settings(
  20. KAFKA_TOPICS={self.topic: {"cluster": "default", "topic": self.topic}}
  21. )
  22. self.override_settings_cm.__enter__()
  23. self.orig_registry = deepcopy(subscriber_registry)
  24. def tearDown(self):
  25. super(HandleSnubaQueryUpdateTest, self).tearDown()
  26. self.override_settings_cm.__exit__(None, None, None)
  27. subscriber_registry.clear()
  28. subscriber_registry.update(self.orig_registry)
  29. @fixture
  30. def subscription(self):
  31. return self.rule.query_subscriptions.get()
  32. @fixture
  33. def rule(self):
  34. rule = create_alert_rule(
  35. self.organization,
  36. [self.project],
  37. "some rule",
  38. AlertRuleThresholdType.ABOVE,
  39. query="",
  40. aggregation=QueryAggregations.TOTAL,
  41. time_window=1,
  42. alert_threshold=100,
  43. resolve_threshold=10,
  44. threshold_period=1,
  45. )
  46. return rule
  47. @fixture
  48. def producer(self):
  49. cluster_name = settings.KAFKA_TOPICS[self.topic]["cluster"]
  50. conf = {
  51. "bootstrap.servers": settings.KAFKA_CLUSTERS[cluster_name]["bootstrap.servers"],
  52. "session.timeout.ms": 6000,
  53. }
  54. return Producer(conf)
  55. @fixture
  56. def topic(self):
  57. return uuid4().hex
  58. def test(self):
  59. # Full integration test to ensure that when a subscription receives an update
  60. # the `QuerySubscriptionConsumer` successfully retries the subscription and
  61. # calls the correct callback, which should result in an incident being created.
  62. callback = subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE]
  63. def exception_callback(*args, **kwargs):
  64. # We want to just error after the callback so that we can see the result of
  65. # processing. This means the offset won't be committed, but that's fine, we
  66. # can still check the results.
  67. callback(*args, **kwargs)
  68. raise KeyboardInterrupt()
  69. value_name = query_aggregation_to_snuba[QueryAggregations(self.subscription.aggregation)][2]
  70. subscriber_registry[INCIDENTS_SNUBA_SUBSCRIPTION_TYPE] = exception_callback
  71. message = {
  72. "version": 1,
  73. "payload": {
  74. "subscription_id": self.subscription.subscription_id,
  75. "values": {value_name: self.rule.alert_threshold + 1},
  76. "timestamp": 1235,
  77. "interval": 5,
  78. "partition": 50,
  79. "offset": 10,
  80. },
  81. }
  82. self.producer.produce(self.topic, json.dumps(message))
  83. self.producer.flush()
  84. def active_incident_exists():
  85. return Incident.objects.filter(
  86. type=IncidentType.ALERT_TRIGGERED.value,
  87. status=IncidentStatus.OPEN.value,
  88. alert_rule=self.rule,
  89. ).exists()
  90. consumer = QuerySubscriptionConsumer("hi", topic=self.topic)
  91. with self.assertChanges(active_incident_exists, before=False, after=True):
  92. consumer.run()