Browse Source

feat: Use sentry_kafka_schemas in ingest replays consumer (#48867)

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Markus Unterwaditzer 1 year ago
parent
commit
76492586b8

+ 1 - 1
requirements-base.txt

@@ -59,7 +59,7 @@ rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
 sentry-arroyo>=2.10.3
-sentry-kafka-schemas>=0.1.6
+sentry-kafka-schemas>=0.1.7
 sentry-redis-tools>=0.1.5
 sentry-relay>=0.8.22
 sentry-sdk>=1.22.2

+ 1 - 1
requirements-dev-frozen.txt

@@ -166,7 +166,7 @@ s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.10.3
 sentry-cli==2.16.0
-sentry-kafka-schemas==0.1.6
+sentry-kafka-schemas==0.1.7
 sentry-redis-tools==0.1.5
 sentry-relay==0.8.22
 sentry-sdk==1.22.2

+ 1 - 1
requirements-frozen.txt

@@ -117,7 +117,7 @@ rsa==4.8
 s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.10.3
-sentry-kafka-schemas==0.1.6
+sentry-kafka-schemas==0.1.7
 sentry-redis-tools==0.1.5
 sentry-relay==0.8.22
 sentry-sdk==1.22.2

+ 9 - 12
src/sentry/replays/consumers/recording.py

@@ -1,9 +1,8 @@
 import dataclasses
 import logging
 import random
-from typing import Any, Dict, Mapping, cast
+from typing import Any, Mapping
 
-import msgpack
 import sentry_sdk
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.processing.strategies import RunTaskInThreads, TransformStep
@@ -11,16 +10,20 @@ from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
 from arroyo.processing.strategies.commit import CommitOffsets
 from arroyo.types import Commit, Message, Partition
 from django.conf import settings
+from sentry_kafka_schemas import get_codec
+from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
 from sentry_sdk.tracing import Span
 
-from sentry.replays.usecases.ingest import RecordingMessage, ingest_recording
+from sentry.replays.usecases.ingest import ingest_recording
 
 logger = logging.getLogger(__name__)
 
+RECORDINGS_CODEC = get_codec("ingest-replay-recordings")
+
 
 @dataclasses.dataclass
 class MessageContext:
-    message: Dict[str, Any]
+    message: ReplayRecording
     transaction: Span
     current_hub: sentry_sdk.Hub
 
@@ -63,7 +66,7 @@ def initialize_message_context(message: Message[KafkaPayload]) -> MessageContext
         < getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0),
     )
     current_hub = sentry_sdk.Hub(sentry_sdk.Hub.current)
-    message_dict = msgpack.unpackb(message.payload.value)
+    message_dict = RECORDINGS_CODEC.decode(message.payload.value)
     return MessageContext(message_dict, transaction, current_hub)
 
 
@@ -71,11 +74,5 @@ def move_replay_to_permanent_storage(message: Message[MessageContext]) -> Any:
     """Move the replay payload to permanent storage."""
     context: MessageContext = message.payload
     message_dict = context.message
-    message_type = message_dict["type"]
 
-    if message_type == "replay_recording_not_chunked":
-        ingest_recording(
-            cast(RecordingMessage, message_dict), context.transaction, context.current_hub
-        )
-    else:
-        raise ValueError(f"Invalid replays recording message type specified: {message_type}")
+    ingest_recording(message_dict, context.transaction, context.current_hub)

+ 4 - 13
src/sentry/replays/usecases/ingest/__init__.py

@@ -4,9 +4,10 @@ import dataclasses
 import logging
 import zlib
 from datetime import datetime, timezone
-from typing import Optional, TypedDict
+from typing import Optional, TypedDict, cast
 
 from django.conf import settings
+from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
 from sentry_sdk import Hub
 from sentry_sdk.tracing import Span
 
@@ -45,16 +46,6 @@ class RecordingSegmentMessage(TypedDict):
     replay_recording: ReplayRecordingSegment
 
 
-class RecordingMessage(TypedDict):
-    retention_days: int
-    replay_id: str
-    key_id: int | None
-    org_id: int
-    project_id: int
-    received: int
-    payload: bytes
-
-
 class MissingRecordingSegmentHeaders(ValueError):
     pass
 
@@ -71,7 +62,7 @@ class RecordingIngestMessage:
 
 
 @metrics.wraps("replays.usecases.ingest.ingest_recording")
-def ingest_recording(message_dict: RecordingMessage, transaction: Span, current_hub: Hub) -> None:
+def ingest_recording(message_dict: ReplayRecording, transaction: Span, current_hub: Hub) -> None:
     """Ingest non-chunked recording messages."""
     with current_hub:
         with transaction.start_child(
@@ -85,7 +76,7 @@ def ingest_recording(message_dict: RecordingMessage, transaction: Span, current_
                 project_id=message_dict["project_id"],
                 received=message_dict["received"],
                 retention_days=message_dict["retention_days"],
-                payload_with_headers=message_dict["payload"],
+                payload_with_headers=cast(bytes, message_dict["payload"]),
             )
             _ingest_recording(message, transaction)
 

+ 4 - 3
tests/sentry/replays/consumers/test_recording.py

@@ -2,12 +2,13 @@ import time
 import uuid
 import zlib
 from datetime import datetime
-from typing import Any, Dict, List
+from typing import List
 from unittest.mock import ANY, patch
 
 import msgpack
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.types import BrokerValue, Message, Partition, Topic
+from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
 
 from sentry import options
 from sentry.models import File
@@ -52,7 +53,7 @@ class RecordingTestCaseMixin:
         message: bytes = b'[{"hello":"world"}]',
         segment_id: int = 0,
         compressed: bool = False,
-    ) -> List[Dict[str, Any]]:
+    ) -> List[ReplayRecording]:
         message = zlib.compress(message) if compressed else message
         return [
             {
@@ -61,7 +62,7 @@ class RecordingTestCaseMixin:
                 "org_id": self.organization.id,
                 "key_id": 123,
                 "project_id": self.project.id,
-                "received": time.time(),
+                "received": int(time.time()),
                 "retention_days": 30,
                 "payload": f'{{"segment_id":{segment_id}}}\n'.encode() + message,
             }