Browse Source

feat(crons): Switch to using sentry-kafka-schemas schema (#69622)

This uses the schema defined in sentry-kafka-schemas in the monitor
consumer.
Evan Purkhiser 10 months ago
parent
commit
d0443adc76

+ 1 - 1
requirements-base.txt

@@ -63,7 +63,7 @@ rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
 sentry-arroyo>=2.16.5
-sentry-kafka-schemas>=0.1.68
+sentry-kafka-schemas>=0.1.70
 sentry-ophio==0.2.7
 sentry-redis-tools>=0.1.7
 sentry-relay>=0.8.57

+ 1 - 1
requirements-dev-frozen.txt

@@ -180,7 +180,7 @@ sentry-cli==2.16.0
 sentry-devenv==1.6.2
 sentry-forked-django-stubs==4.2.7.post3
 sentry-forked-djangorestframework-stubs==3.14.5.post1
-sentry-kafka-schemas==0.1.68
+sentry-kafka-schemas==0.1.70
 sentry-ophio==0.2.7
 sentry-redis-tools==0.1.7
 sentry-relay==0.8.57

+ 1 - 1
requirements-frozen.txt

@@ -120,7 +120,7 @@ rpds-py==0.15.2
 rsa==4.8
 s3transfer==0.10.0
 sentry-arroyo==2.16.5
-sentry-kafka-schemas==0.1.68
+sentry-kafka-schemas==0.1.70
 sentry-ophio==0.2.7
 sentry-redis-tools==0.1.7
 sentry-relay==0.8.57

+ 17 - 3
src/sentry/monitors/consumers/monitor_consumer.py

@@ -18,6 +18,9 @@ from arroyo.processing.strategies.commit import CommitOffsets
 from arroyo.processing.strategies.run_task import RunTask
 from arroyo.types import BrokerValue, Commit, Message, Partition
 from django.db import router, transaction
+from sentry_kafka_schemas import get_codec
+from sentry_kafka_schemas.codecs import ValidationError
+from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import IngestMonitorMessage
 from sentry_sdk.tracing import Span, Transaction
 
 from sentry import quotas, ratelimits
@@ -40,7 +43,7 @@ from sentry.monitors.models import (
     MonitorLimitsExceeded,
     MonitorType,
 )
-from sentry.monitors.types import CheckinItem, CheckinMessage, ClockPulseMessage
+from sentry.monitors.types import CheckinItem
 from sentry.monitors.utils import (
     get_new_timeout_at,
     get_timeout_at,
@@ -56,6 +59,8 @@ from sentry.utils.outcomes import Outcome, track_outcome
 
 logger = logging.getLogger(__name__)
 
+MONITOR_CODEC = get_codec("ingest-monitors")
+
 CHECKIN_QUOTA_LIMIT = 6
 CHECKIN_QUOTA_WINDOW = 60
 
@@ -859,7 +864,11 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
         assert isinstance(item, BrokerValue)
 
         try:
-            wrapper: CheckinMessage | ClockPulseMessage = msgpack.unpackb(item.payload.value)
+            try:
+                wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(item.payload.value)
+            except ValidationError:
+                wrapper = msgpack.unpackb(item.payload.value)
+                logger.exception("Failed to unpack message payload via sentry_kafka_schemas")
         except Exception:
             logger.exception("Failed to unpack message payload")
             continue
@@ -904,7 +913,12 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
 def process_single(message: Message[KafkaPayload]):
     assert isinstance(message.value, BrokerValue)
     try:
-        wrapper = msgpack.unpackb(message.payload.value)
+        try:
+            wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(message.payload.value)
+        except ValidationError:
+            logger.exception("Failed to unpack message payload via sentry_kafka_schemas")
+            wrapper = msgpack.unpackb(message.payload.value)
+
         ts = message.value.timestamp
         partition = message.value.partition.index
 

+ 2 - 2
src/sentry/monitors/tasks/clock_pulse.py

@@ -11,10 +11,10 @@ from arroyo import Topic as ArroyoTopic
 from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
 from confluent_kafka.admin import AdminClient, PartitionMetadata
 from django.conf import settings
+from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import ClockPulse
 
 from sentry.conf.types.kafka_definition import Topic
 from sentry.monitors.clock_dispatch import try_monitor_tasks_trigger
-from sentry.monitors.types import ClockPulseMessage
 from sentry.silo.base import SiloMode
 from sentry.tasks.base import instrumented_task
 from sentry.utils.arroyo_producer import SingletonProducer
@@ -68,7 +68,7 @@ def clock_pulse(current_datetime=None):
             try_monitor_tasks_trigger(current_datetime, partition.id)
         return
 
-    message: ClockPulseMessage = {
+    message: ClockPulse = {
         "message_type": "clock_pulse",
     }
 

+ 3 - 14
src/sentry/monitors/types.py

@@ -4,22 +4,11 @@ from typing import Literal, NotRequired, TypedDict, Union
 
 from django.utils.functional import cached_property
 from django.utils.text import slugify
+from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import CheckIn
 
 from sentry.monitors.constants import MAX_SLUG_LENGTH
 
 
-class CheckinMessage(TypedDict):
-    message_type: Literal["check_in"]
-    payload: str
-    start_time: float
-    project_id: str
-    sdk: str
-
-
-class ClockPulseMessage(TypedDict):
-    message_type: Literal["clock_pulse"]
-
-
 class CheckinTrace(TypedDict):
     trace_id: str
 
@@ -47,7 +36,7 @@ class CheckinItem:
     ts: datetime
     """
     The timestamp the check-in was produced into the kafka topic. This differs
-    from the start_time that is part of the CheckinMessage
+    from the start_time that is part of the CheckIn
     """
 
     partition: int
@@ -55,7 +44,7 @@ class CheckinItem:
     The kafka partition id the check-in was produced into.
     """
 
-    message: CheckinMessage
+    message: CheckIn
     """
     The original unpacked check-in message contents.
     """

+ 0 - 1
tests/sentry/conf/test_kafka_definition.py

@@ -20,7 +20,6 @@ def test_topic_definition() -> None:
         "ingest-transactions",
         "profiles",
         "ingest-occurrences",
-        "ingest-monitors",
     ]
 
     for topic in Topic:

+ 3 - 1
tests/sentry/monitors/test_monitor_consumer.py

@@ -10,6 +10,7 @@ from arroyo.processing.strategies import ProcessingStrategy
 from arroyo.types import BrokerValue, Message, Partition, Topic
 from django.conf import settings
 from django.test.utils import override_settings
+from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import CheckIn
 
 from sentry import killswitches
 from sentry.constants import ObjectStatus
@@ -89,12 +90,13 @@ class MonitorConsumerTest(TestCase):
         }
         payload.update(overrides)
 
-        wrapper = {
+        wrapper: CheckIn = {
             "message_type": "check_in",
             "start_time": ts.timestamp(),
             "project_id": self.project.id,
             "payload": json.dumps(payload),
             "sdk": "test/1.0",
+            "retention_days": 90,
         }
 
         consumer.submit(