|
@@ -1,6 +1,8 @@
|
|
|
|
+from __future__ import annotations
|
|
|
|
+
|
|
import time
|
|
import time
|
|
import uuid
|
|
import uuid
|
|
-from typing import Optional
|
|
|
|
|
|
+from typing import Any, Optional
|
|
|
|
|
|
from django.conf import settings
|
|
from django.conf import settings
|
|
|
|
|
|
@@ -8,7 +10,8 @@ from sentry.replays.lib.storage import FilestoreBlob, StorageBlob
|
|
from sentry.replays.models import ReplayRecordingSegment
|
|
from sentry.replays.models import ReplayRecordingSegment
|
|
from sentry.replays.usecases.reader import fetch_segments_metadata
|
|
from sentry.replays.usecases.reader import fetch_segments_metadata
|
|
from sentry.tasks.base import instrumented_task
|
|
from sentry.tasks.base import instrumented_task
|
|
-from sentry.utils import json, kafka_config
|
|
|
|
|
|
+from sentry.utils import json
|
|
|
|
+from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
|
|
from sentry.utils.pubsub import KafkaPublisher
|
|
from sentry.utils.pubsub import KafkaPublisher
|
|
|
|
|
|
replay_publisher: Optional[KafkaPublisher] = None
|
|
replay_publisher: Optional[KafkaPublisher] = None
|
|
@@ -28,22 +31,23 @@ def delete_recording_segments(project_id: int, replay_id: str, **kwargs: dict) -
|
|
|
|
|
|
def delete_replay_recording(project_id: int, replay_id: str) -> None:
|
|
def delete_replay_recording(project_id: int, replay_id: str) -> None:
|
|
"""Delete all recording-segments associated with a Replay."""
|
|
"""Delete all recording-segments associated with a Replay."""
|
|
- segments = fetch_segments_metadata(project_id, replay_id, offset=0, limit=10000)
|
|
|
|
- for segment in segments:
|
|
|
|
- driver = FilestoreBlob() if segment.file_id else StorageBlob()
|
|
|
|
- driver.delete(segment)
|
|
|
|
|
|
+ # Delete the segments which are now stored in clickhouse
|
|
|
|
+ segments_from_metadata = fetch_segments_metadata(project_id, replay_id, offset=0, limit=10000)
|
|
|
|
+ for segment_metadata in segments_from_metadata:
|
|
|
|
+ driver = FilestoreBlob() if segment_metadata.file_id else StorageBlob()
|
|
|
|
+ driver.delete(segment_metadata)
|
|
|
|
|
|
- # delete old rows from SQL too
|
|
|
|
- segment_sql_rows = ReplayRecordingSegment.objects.filter(
|
|
|
|
|
|
+ # Delete the ReplayRecordingSegment models that we previously stored using django models
|
|
|
|
+ segments_from_django_models = ReplayRecordingSegment.objects.filter(
|
|
replay_id=replay_id, project_id=project_id
|
|
replay_id=replay_id, project_id=project_id
|
|
).all()
|
|
).all()
|
|
- for segment in segment_sql_rows:
|
|
|
|
- segment.delete() # Three queries + one request to the message broker
|
|
|
|
|
|
+ for segment_model in segments_from_django_models:
|
|
|
|
+ segment_model.delete() # Three queries + one request to the message broker
|
|
|
|
|
|
|
|
|
|
def archive_replay(project_id: int, replay_id: str) -> None:
|
|
def archive_replay(project_id: int, replay_id: str) -> None:
|
|
"""Archive a Replay instance. The Replay is not deleted."""
|
|
"""Archive a Replay instance. The Replay is not deleted."""
|
|
- replay_payload = {
|
|
|
|
|
|
+ replay_payload: dict[str, Any] = {
|
|
"type": "replay_event",
|
|
"type": "replay_event",
|
|
"replay_id": replay_id,
|
|
"replay_id": replay_id,
|
|
"event_id": uuid.uuid4().hex,
|
|
"event_id": uuid.uuid4().hex,
|
|
@@ -77,9 +81,9 @@ def _initialize_publisher() -> KafkaPublisher:
|
|
global replay_publisher
|
|
global replay_publisher
|
|
|
|
|
|
if replay_publisher is None:
|
|
if replay_publisher is None:
|
|
- config = settings.KAFKA_TOPICS[settings.KAFKA_INGEST_REPLAY_EVENTS]
|
|
|
|
|
|
+ config = get_topic_definition(settings.KAFKA_INGEST_REPLAY_EVENTS)
|
|
replay_publisher = KafkaPublisher(
|
|
replay_publisher = KafkaPublisher(
|
|
- kafka_config.get_kafka_producer_cluster_options(config["cluster"]),
|
|
|
|
|
|
+ get_kafka_producer_cluster_options(config["cluster"]),
|
|
asynchronous=False,
|
|
asynchronous=False,
|
|
)
|
|
)
|
|
|
|
|