Browse Source

feat(crons): Add ability dispatch clock ticks to kafka (#69896)

Evan Purkhiser 10 months ago
parent
commit
058f5a2ea7
2 changed files with 47 additions and 5 deletions
  1. 27 4
      src/sentry/monitors/clock_dispatch.py
  2. 20 1
      tests/sentry/monitors/test_clock_dispatch.py

+ 27 - 4
src/sentry/monitors/clock_dispatch.py

@@ -4,12 +4,18 @@ import logging
 from datetime import datetime, timezone
 
 import sentry_sdk
+from arroyo import Topic as ArroyoTopic
+from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
 from django.conf import settings
+from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick
 
 from sentry import options
+from sentry.conf.types.kafka_definition import Topic
 from sentry.monitors.tasks.check_missed import check_missing
 from sentry.monitors.tasks.check_timeout import check_timeout
-from sentry.utils import metrics, redis
+from sentry.utils import json, metrics, redis
+from sentry.utils.arroyo_producer import SingletonProducer
+from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
 
 logger = logging.getLogger("sentry")
 # This key is used to store the last timestamp that the tasks were triggered.
@@ -26,6 +32,17 @@ def _int_or_none(s: str | None) -> int | None:
         return int(s)
 
 
+def _get_producer() -> KafkaProducer:
+    cluster_name = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["cluster"]
+    producer_config = get_kafka_producer_cluster_options(cluster_name)
+    producer_config.pop("compression.type", None)
+    producer_config.pop("message.max.bytes", None)
+    return KafkaProducer(build_kafka_configuration(default_config=producer_config))
+
+
+_clock_tick_producer = SingletonProducer(_get_producer)
+
+
 def _dispatch_tick(ts: datetime):
     """
     Dispatch a clock tick which will trigger monitor tasks.
@@ -43,9 +60,15 @@ def _dispatch_tick(ts: datetime):
     skip any tasks it missed)
     """
     if options.get("crons.use_clock_pulse_consumer"):
-        # TODO(epurkhiser): This should dispatch the pulse as a message on the
-        # monitors-clock-pulse topic
-        pass
+        if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
+            # XXX(epurkhiser): Unclear what we want to do if we're not using kafka
+            return
+
+        message: ClockTick = {"ts": ts.timestamp()}
+        payload = KafkaPayload(None, json.dumps(message).encode("utf-8"), [])
+
+        topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
+        _clock_tick_producer.produce(ArroyoTopic(topic), payload)
     else:
         check_missing.delay(current_datetime=ts)
         check_timeout.delay(current_datetime=ts)

+ 20 - 1
tests/sentry/monitors/test_clock_dispatch.py

@@ -1,9 +1,14 @@
 from datetime import timedelta
 from unittest import mock
 
+from arroyo import Topic
+from arroyo.backends.kafka import KafkaPayload
+from django.test.utils import override_settings
 from django.utils import timezone
 
-from sentry.monitors.clock_dispatch import try_monitor_clock_tick
+from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
+from sentry.testutils.helpers.options import override_options
+from sentry.utils import json
 
 
 @mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@@ -141,3 +146,17 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
 
     assert dispatch_tick.call_count == 2
     assert dispatch_tick.mock_calls[1] == mock.call(now + timedelta(minutes=2))
+
+
+@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})
+@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+@override_options({"crons.use_clock_pulse_consumer": True})
+@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer")
+def test_dispatch_to_kafka(clock_tick_producer_mock):
+    now = timezone.now().replace(second=0, microsecond=0)
+    _dispatch_tick(now)
+
+    clock_tick_producer_mock.produce.assert_called_with(
+        Topic("clock-tick-test-topic"),
+        KafkaPayload(None, json.dumps({"ts": now.timestamp()}).encode("utf-8"), []),
+    )