|
@@ -7,13 +7,15 @@ import sentry_sdk
|
|
|
from arroyo import Topic as ArroyoTopic
|
|
|
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
|
|
|
from django.conf import settings
|
|
|
+from sentry_kafka_schemas import get_codec
|
|
|
+from sentry_kafka_schemas.codecs import Codec
|
|
|
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick
|
|
|
|
|
|
from sentry import options
|
|
|
from sentry.conf.types.kafka_definition import Topic
|
|
|
from sentry.monitors.tasks.check_missed import check_missing
|
|
|
from sentry.monitors.tasks.check_timeout import check_timeout
|
|
|
-from sentry.utils import json, metrics, redis
|
|
|
+from sentry.utils import metrics, redis
|
|
|
from sentry.utils.arroyo_producer import SingletonProducer
|
|
|
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
|
|
|
|
|
@@ -24,6 +26,8 @@ MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"
|
|
|
# This key is used to store the hashmap of Mapping[PartitionKey, Timestamp]
|
|
|
MONITOR_TASKS_PARTITION_CLOCKS = "sentry.monitors.partition_clocks"
|
|
|
|
|
|
+CLOCK_TICK_CODEC: Codec[ClockTick] = get_codec("monitors-clock-tick")
|
|
|
+
|
|
|
|
|
|
def _int_or_none(s: str | None) -> int | None:
|
|
|
if s is None:
|
|
@@ -65,7 +69,7 @@ def _dispatch_tick(ts: datetime):
|
|
|
return
|
|
|
|
|
|
message: ClockTick = {"ts": ts.timestamp()}
|
|
|
- payload = KafkaPayload(None, json.dumps(message).encode("utf-8"), [])
|
|
|
+ payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), [])
|
|
|
|
|
|
topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
|
|
|
_clock_tick_producer.produce(ArroyoTopic(topic), payload)
|