|
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|
|
|
|
|
import dataclasses
|
|
import dataclasses
|
|
import logging
|
|
import logging
|
|
|
|
+import zlib
|
|
from datetime import datetime, timezone
|
|
from datetime import datetime, timezone
|
|
from typing import TypedDict, Union
|
|
from typing import TypedDict, Union
|
|
|
|
|
|
@@ -158,6 +159,22 @@ def ingest_recording(message: RecordingIngestMessage, transaction: Span) -> None
|
|
driver = make_storage_driver(message.org_id)
|
|
driver = make_storage_driver(message.org_id)
|
|
driver.set(segment_data, recording_segment)
|
|
driver.set(segment_data, recording_segment)
|
|
|
|
|
|
|
|
+ # Decompress and load the recording JSON. This is a performance test. We don't care about the
|
|
|
|
+ # result but knowing its performance characteristics and the failure rate of this operation
|
|
|
|
+ # will inform future releases.
|
|
|
|
+ try:
|
|
|
|
+ with metrics.timer("replays.usecases.ingest.decompress_and_parse"):
|
|
|
|
+ json.loads(decompress(recording_segment))
|
|
|
|
+ except Exception:
|
|
|
|
+ logging.exception(
|
|
|
|
+ "Failed to parse recording org={}, project={}, replay={}, segment={}".format(
|
|
|
|
+ message.org_id,
|
|
|
|
+ message.project_id,
|
|
|
|
+ message.replay_id,
|
|
|
|
+ headers["segment_id"],
|
|
|
|
+ )
|
|
|
|
+ )
|
|
|
|
+
|
|
# The first segment records an accepted outcome. This is for billing purposes. Subsequent
|
|
# The first segment records an accepted outcome. This is for billing purposes. Subsequent
|
|
# segments are not billed.
|
|
# segments are not billed.
|
|
if headers["segment_id"] == 0:
|
|
if headers["segment_id"] == 0:
|
|
@@ -231,3 +248,11 @@ def process_headers(bytes_with_headers: bytes) -> tuple[RecordingSegmentHeaders,
|
|
|
|
|
|
def replay_recording_segment_cache_id(project_id: int, replay_id: str, segment_id: str) -> str:
|
|
def replay_recording_segment_cache_id(project_id: int, replay_id: str, segment_id: str) -> str:
|
|
return f"{project_id}:{replay_id}:{segment_id}"
|
|
return f"{project_id}:{replay_id}:{segment_id}"
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def decompress(data: bytes) -> bytes:
|
|
|
|
+ """Return decompressed bytes."""
|
|
|
|
+ if data.startswith(b"["):
|
|
|
|
+ return data
|
|
|
|
+ else:
|
|
|
|
+ return zlib.decompress(data, zlib.MAX_WBITS | 32)
|