Browse Source

feat(crons): Implement fallback clock pulse task (#54647)

This is the last piece of GH-53661 to ensure tasks are triggered in
scenarios where there is not enough volume.
Evan Purkhiser 1 year ago
parent
commit
d33ae7ff11
3 changed files with 78 additions and 7 deletions
  1. 6 0
      src/sentry/conf/server.py
  2. 41 0
      src/sentry/monitors/tasks.py
  3. 31 7
      tests/sentry/monitors/test_tasks.py

+ 6 - 0
src/sentry/conf/server.py

@@ -962,6 +962,12 @@ CELERYBEAT_SCHEDULE_REGION = {
         "schedule": timedelta(seconds=30),
         "options": {"expires": 30},
     },
+    "monitors-clock-pulse": {
+        "task": "sentry.monitors.tasks.clock_pulse",
+        # Run every 1 minute
+        "schedule": crontab(minute="*/1"),
+        "options": {"expires": 60},
+    },
     "monitors-temp-task-dispatcher": {
         "task": "sentry.monitors.tasks.temp_task_dispatcher",
         # Run every 1 minute

+ 41 - 0
src/sentry/monitors/tasks.py

@@ -1,15 +1,21 @@
 import logging
 from datetime import datetime
 
+import msgpack
 import sentry_sdk
+from arroyo import Topic
+from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
 from django.conf import settings
 from django.utils import timezone
 
 from sentry import options
 from sentry.constants import ObjectStatus
+from sentry.monitors.types import ClockPulseMessage
 from sentry.silo import SiloMode
 from sentry.tasks.base import instrumented_task
 from sentry.utils import metrics, redis
+from sentry.utils.arroyo_producer import SingletonProducer
+from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
 
 from .models import (
     CheckInStatus,
@@ -48,6 +54,17 @@ SUBTITLE_DATETIME_FORMAT = "%b %d, %I:%M %p"
 MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"
 
 
+def _get_monitor_checkin_producer() -> KafkaProducer:
+    cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
+    producer_config = get_kafka_producer_cluster_options(cluster_name)
+    producer_config.pop("compression.type", None)
+    producer_config.pop("message.max.bytes", None)
+    return KafkaProducer(build_kafka_configuration(default_config=producer_config))
+
+
+_checkin_producer = SingletonProducer(_get_monitor_checkin_producer)
+
+
 def _dispatch_tasks(ts: datetime):
     """
     Dispatch monitor tasks triggered by the consumer clock.
@@ -129,6 +146,30 @@ def try_monitor_tasks_trigger(ts: datetime):
     _dispatch_tasks(ts)
 
 
+@instrumented_task(name="sentry.monitors.tasks.clock_pulse", silo_mode=SiloMode.REGION)
+def clock_pulse(current_datetime=None):
+    """
+    This task is run once a minute when to produce a 'clock pulse' into the
+    monitor ingest topic. This is to ensure there is always a message in the
+    topic that can drive the clock which dispatches the monitor tasks.
+    """
+    if current_datetime is None:
+        current_datetime = timezone.now()
+
+    if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
+        # Directly trigger try_monitor_tasks_trigger in dev
+        try_monitor_tasks_trigger(current_datetime)
+        return
+
+    message: ClockPulseMessage = {
+        "message_type": "clock_pulse",
+    }
+
+    # Produce the pulse into the topic
+    payload = KafkaPayload(None, msgpack.packb(message), [])
+    _checkin_producer.produce(Topic(settings.KAFKA_INGEST_MONITORS), payload)
+
+
 @instrumented_task(name="sentry.monitors.tasks.temp_task_dispatcher", silo_mode=SiloMode.REGION)
 def temp_task_dispatcher():
     """

+ 31 - 7
tests/sentry/monitors/test_tasks.py

@@ -1,6 +1,9 @@
 from datetime import datetime, timedelta
-from unittest.mock import patch
+from unittest import mock
 
+import msgpack
+from arroyo.backends.kafka import KafkaPayload
+from django.test import override_settings
 from django.utils import timezone
 
 from sentry.constants import ObjectStatus
@@ -13,7 +16,12 @@ from sentry.monitors.models import (
     MonitorType,
     ScheduleType,
 )
-from sentry.monitors.tasks import check_missing, check_timeout, try_monitor_tasks_trigger
+from sentry.monitors.tasks import (
+    check_missing,
+    check_timeout,
+    clock_pulse,
+    try_monitor_tasks_trigger,
+)
 from sentry.testutils.cases import TestCase
 
 
@@ -405,7 +413,7 @@ class CheckMonitorsTest(TestCase):
             id=monitor_environment.id, status=MonitorStatus.TIMEOUT
         ).exists()
 
-    @patch("sentry.monitors.tasks.logger")
+    @mock.patch("sentry.monitors.tasks.logger")
     def test_missed_exception_handling(self, logger):
         org = self.create_organization()
         project = self.create_project(organization=org)
@@ -458,7 +466,7 @@ class CheckMonitorsTest(TestCase):
             monitor_environment=monitor_environment.id, status=CheckInStatus.MISSED
         ).exists()
 
-    @patch("sentry.monitors.tasks.logger")
+    @mock.patch("sentry.monitors.tasks.logger")
     def test_timeout_exception_handling(self, logger):
         org = self.create_organization()
         project = self.create_project(organization=org)
@@ -548,7 +556,7 @@ class CheckMonitorsTest(TestCase):
             id=monitor_environment.id, status=MonitorStatus.TIMEOUT
         ).exists()
 
-    @patch("sentry.monitors.tasks._dispatch_tasks")
+    @mock.patch("sentry.monitors.tasks._dispatch_tasks")
     def test_monitor_task_trigger(self, dispatch_tasks):
         now = datetime.now().replace(second=0, microsecond=0)
 
@@ -569,13 +577,13 @@ class CheckMonitorsTest(TestCase):
         assert dispatch_tasks.call_count == 2
 
         # A skipped minute trigges the task AND captures an error
-        with patch("sentry_sdk.capture_message") as capture_message:
+        with mock.patch("sentry_sdk.capture_message") as capture_message:
             assert capture_message.call_count == 0
             try_monitor_tasks_trigger(ts=now + timedelta(minutes=3, seconds=5))
             assert dispatch_tasks.call_count == 3
             capture_message.assert_called_with("Monitor task dispatch minute skipped")
 
-    @patch("sentry.monitors.tasks._dispatch_tasks")
+    @mock.patch("sentry.monitors.tasks._dispatch_tasks")
     def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
         """
         When consumer partitions are not completely synchronized we may read
@@ -602,3 +610,19 @@ class CheckMonitorsTest(TestCase):
         # Fourth message moves past a new minute boundary, tick
         try_monitor_tasks_trigger(ts=now + timedelta(minutes=1, seconds=1))
         assert dispatch_tasks.call_count == 2
+
+    @override_settings(KAFKA_INGEST_MONITORS="monitors-test-topic")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    @mock.patch("sentry.monitors.tasks._checkin_producer")
+    def test_clock_pulse(self, _checkin_producer):
+        clock_pulse()
+
+        assert _checkin_producer.produce.call_count == 1
+        assert _checkin_producer.produce.mock_calls[0] == mock.call(
+            mock.ANY,
+            KafkaPayload(
+                None,
+                msgpack.packb({"message_type": "clock_pulse"}),
+                [],
+            ),
+        )