Browse Source

feat(replays): modify ingestion to emit to kafka topic if replay_id exists on event (#54986)

This PR allows us to get accurate error counts associated with session
replays.

This is done by emitting a kafka message in post_process, which our
snuba consumer will write to clickhouse marking the association between
an event and a replay. Previously, this was done on the SDK so it did
not take into account events that were filtered / rate limited, etc.

we will use an option to ramp this up slowly to ensure stability. We
will also collect telemetry on how frequently we are emitting messages.
Since we are producing asynchronously, there is a chance that the
message does not get emitted. This is okay for now, and we'll work
towards figuring out how we can get near 100%, but if a very small % of
messages are dropped, it is not the end of the world.

part of https://github.com/getsentry/team-replay/issues/145
Josh Ferge 1 year ago
parent
commit
560f3ea65a

+ 141 - 0
src/sentry/replays/lib/event_linking.py

@@ -0,0 +1,141 @@
+from __future__ import annotations
+
+import uuid
+from hashlib import md5
+from typing import TYPE_CHECKING, TypedDict, Union
+
+from sentry.utils import json
+
+if TYPE_CHECKING:
+    from sentry.eventstore.models import BaseEvent
+
+
+class EventLinkKafkaMessage(TypedDict):
+    type: str
+    start_time: int
+    replay_id: str
+    project_id: int
+    segment_id: None
+    payload: list[int]
+    retention_days: int
+
+
+class EventLinkPayload(TypedDict):
+    type: str
+    replay_id: str
+    timestamp: int
+    event_hash: str
+
+
+class EventLinkPayloadDebugId(EventLinkPayload):
+    debug_id: str
+
+
+class EventLinkPayloadInfoId(EventLinkPayload):
+    info_id: str
+
+
+class EventLinkPayloadWarningId(EventLinkPayload):
+    warning_id: str
+
+
+class EventLinkPayloadErrorId(EventLinkPayload):
+    error_id: str
+
+
+class EventLinkPayloadFatalId(EventLinkPayload):
+    fatal_id: str
+
+
+PayloadUnionType = Union[
+    EventLinkPayloadDebugId,
+    EventLinkPayloadInfoId,
+    EventLinkPayloadWarningId,
+    EventLinkPayloadErrorId,
+    EventLinkPayloadFatalId,
+]
+
+
+def get_level_key(
+    type: str,
+    replay_id: str,
+    event_hash: str,
+    timestamp: int,
+    level: str | None,
+    event_id: str,
+) -> PayloadUnionType:
+
+    if level == "debug":
+        return EventLinkPayloadDebugId(
+            type=type,
+            replay_id=replay_id,
+            event_hash=event_hash,
+            timestamp=timestamp,
+            debug_id=event_id,
+        )
+    elif level == "info":
+        return EventLinkPayloadInfoId(
+            type=type,
+            replay_id=replay_id,
+            event_hash=event_hash,
+            timestamp=timestamp,
+            info_id=event_id,
+        )
+    elif level == "warning":
+        return EventLinkPayloadWarningId(
+            type=type,
+            replay_id=replay_id,
+            event_hash=event_hash,
+            timestamp=timestamp,
+            warning_id=event_id,
+        )
+    elif level == "error":
+        return EventLinkPayloadErrorId(
+            type=type,
+            replay_id=replay_id,
+            event_hash=event_hash,
+            timestamp=timestamp,
+            error_id=event_id,
+        )
+    elif level == "fatal":
+        return EventLinkPayloadFatalId(
+            type=type,
+            replay_id=replay_id,
+            event_hash=event_hash,
+            timestamp=timestamp,
+            fatal_id=event_id,
+        )
+    else:
+        # note that this in theory should never happen, but we want to be careful
+        raise ValueError(f"Invalid level {level}")
+
+
+def transform_event_for_linking_payload(replay_id: str, event: BaseEvent) -> EventLinkKafkaMessage:
+    def _make_json_binary_payload() -> PayloadUnionType:
+        level: str | None = event.data.get("level")
+
+        payload_with_level = get_level_key(
+            type="event_link",
+            replay_id=replay_id,
+            event_hash=_make_event_hash(event.event_id),
+            timestamp=int(event.datetime.timestamp()),
+            level=level,
+            event_id=event.event_id,
+        )
+
+        return payload_with_level
+
+    return {
+        "type": "replay_event",
+        "start_time": int(event.datetime.timestamp()),
+        "replay_id": replay_id,
+        "project_id": event.project.id,
+        "segment_id": None,
+        "retention_days": 90,
+        "payload": list(bytes(json.dumps(_make_json_binary_payload()).encode())),
+    }
+
+
+def _make_event_hash(event_id: str) -> str:
+    md5_hash = md5(event_id.encode("utf-8")).hexdigest()
+    return str(uuid.UUID(md5_hash))

+ 14 - 1
src/sentry/tasks/post_process.py

@@ -16,12 +16,14 @@ from sentry.exceptions import PluginError
 from sentry.issues.grouptype import GroupCategory
 from sentry.issues.issue_occurrence import IssueOccurrence
 from sentry.killswitches import killswitch_matches_context
+from sentry.replays.lib.event_linking import transform_event_for_linking_payload
+from sentry.replays.lib.kafka import initialize_replays_publisher
 from sentry.sentry_metrics.client import generic_metrics_backend
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
 from sentry.signals import event_processed, issue_unignored, transaction_processed
 from sentry.silo import SiloMode
 from sentry.tasks.base import instrumented_task
-from sentry.utils import metrics
+from sentry.utils import json, metrics
 from sentry.utils.cache import cache
 from sentry.utils.event_frames import get_sdk_name
 from sentry.utils.locking import UnableToAcquireLock
@@ -873,6 +875,17 @@ def process_replay_link(job: PostProcessJob) -> None:
 
     metrics.incr("post_process.process_replay_link.id_exists")
 
+    publisher = initialize_replays_publisher(is_async=True)
+    try:
+        kafka_payload = transform_event_for_linking_payload(replay_id, group_event)
+    except ValueError:
+        metrics.incr("post_process.process_replay_link.id_invalid")
+
+    publisher.publish(
+        "ingest-replay-events",
+        json.dumps(kafka_payload),
+    )
+
 
 def process_rules(job: PostProcessJob) -> None:
     if job["is_reprocessed"]:

+ 0 - 0
tests/sentry/replays/lib/__init__.py


+ 23 - 0
tests/sentry/replays/lib/test_event_linking.py

@@ -0,0 +1,23 @@
+import uuid
+
+from sentry.replays.lib.event_linking import transform_event_for_linking_payload
+from sentry.testutils.cases import ReplaysSnubaTestCase
+
+
+class TestEventLink(ReplaysSnubaTestCase):
+    def test_event_link_types(self):
+        replay_id = uuid.uuid4().hex
+
+        for level in ["debug", "info", "warning", "error", "fatal"]:
+            event = self.store_event(
+                data={
+                    "level": level,
+                    "message": "testing",
+                    "contexts": {"replay": {"replay_id": replay_id}},
+                },
+                project_id=self.project.id,
+            )
+            stored = transform_event_for_linking_payload(replay_id, event)
+            # make sure snuba 200s which means that the payload was successfully written to clickhouse
+            # (method will raise if it doesnt)
+            self.store_replays(stored)

+ 32 - 18
tests/sentry/tasks/test_post_process.py

@@ -4,6 +4,7 @@ import abc
 import time
 import uuid
 from datetime import datetime, timedelta, timezone
+from hashlib import md5
 from typing import Any
 from unittest import mock
 from unittest.mock import Mock, patch
@@ -43,6 +44,7 @@ from sentry.models.groupowner import (
 from sentry.models.projectownership import ProjectOwnership
 from sentry.models.projectteam import ProjectTeam
 from sentry.ownership.grammar import Matcher, Owner, Rule, dump_schema
+from sentry.replays.lib import kafka as replays_kafka
 from sentry.rules import init_registry
 from sentry.services.hybrid_cloud.user.service import user_service
 from sentry.silo import unguarded_write
@@ -61,6 +63,7 @@ from sentry.testutils.performance_issues.store_transaction import PerfIssueTrans
 from sentry.testutils.silo import region_silo_test
 from sentry.types.activity import ActivityType
 from sentry.types.group import GroupSubStatus
+from sentry.utils import json
 from sentry.utils.cache import cache
 from tests.sentry.issues.test_utils import OccurrenceTestMixin
 
@@ -1644,9 +1647,11 @@ class SDKCrashMonitoringTestMixin(BasePostProgressGroupMixin):
         mock_sdk_crash_detection.detect_sdk_crash.assert_not_called()
 
 
+@mock.patch.object(replays_kafka, "get_kafka_producer_cluster_options")
+@mock.patch.object(replays_kafka, "KafkaPublisher")
 @mock.patch("sentry.utils.metrics.incr")
 class ReplayLinkageTestMixin(BasePostProgressGroupMixin):
-    def test_replay_linkage(self, incr):
+    def test_replay_linkage(self, incr, kafka_producer, kafka_publisher):
         replay_id = uuid.uuid4().hex
         event = self.create_event(
             data={"message": "testing", "contexts": {"replay": {"replay_id": replay_id}}},
@@ -1660,27 +1665,33 @@ class ReplayLinkageTestMixin(BasePostProgressGroupMixin):
                 is_new_group_environment=True,
                 event=event,
             )
-            incr.assert_any_call("post_process.process_replay_link.id_sampled")
-            incr.assert_any_call("post_process.process_replay_link.id_exists")
+            assert kafka_producer.return_value.publish.call_count == 1
+            assert kafka_producer.return_value.publish.call_args[0][0] == "ingest-replay-events"
 
-    def test_replay_linkage_with_tag(self, incr):
-        replay_id = uuid.uuid4().hex
-        event = self.create_event(
-            data={"message": "testing", "tags": {"replayId": replay_id}},
-            project_id=self.project.id,
-        )
+            ret_value = json.loads(kafka_producer.return_value.publish.call_args[0][1])
+
+            assert ret_value["type"] == "replay_event"
+            assert ret_value["start_time"] == int(event.datetime.timestamp())
+            assert ret_value["replay_id"] == replay_id
+            assert ret_value["project_id"] == self.project.id
+            assert ret_value["segment_id"] is None
+            assert ret_value["retention_days"] == 90
+
+            # convert ret_value_payload which is a list of bytes to a string
+            ret_value_payload = json.loads(bytes(ret_value["payload"]).decode("utf-8"))
+
+            assert ret_value_payload == {
+                "type": "event_link",
+                "replay_id": replay_id,
+                "error_id": event.event_id,
+                "timestamp": int(event.datetime.timestamp()),
+                "event_hash": str(uuid.UUID(md5((event.event_id).encode("utf-8")).hexdigest())),
+            }
 
-        with self.feature({"organizations:session-replay-event-linking": True}):
-            self.call_post_process_group(
-                is_new=True,
-                is_regression=False,
-                is_new_group_environment=True,
-                event=event,
-            )
             incr.assert_any_call("post_process.process_replay_link.id_sampled")
             incr.assert_any_call("post_process.process_replay_link.id_exists")
 
-    def test_no_replay(self, incr):
+    def test_no_replay(self, incr, kafka_producer, kafka_publisher):
         event = self.create_event(
             data={"message": "testing"},
             project_id=self.project.id,
@@ -1693,9 +1704,11 @@ class ReplayLinkageTestMixin(BasePostProgressGroupMixin):
                 is_new_group_environment=True,
                 event=event,
             )
+            assert kafka_producer.return_value.publish.call_count == 0
             incr.assert_called_with("post_process.process_replay_link.id_sampled")
 
-    def test_0_sample_rate_replays(self, incr):
+    def test_0_sample_rate_replays(self, incr, kafka_producer, kafka_publisher):
+
         event = self.create_event(
             data={"message": "testing"},
             project_id=self.project.id,
@@ -1708,6 +1721,7 @@ class ReplayLinkageTestMixin(BasePostProgressGroupMixin):
                 is_new_group_environment=True,
                 event=event,
             )
+            assert kafka_producer.return_value.publish.call_count == 0
             for args, _ in incr.call_args_list:
                 self.assertNotEqual(args, ("post_process.process_replay_link.id_sampled"))