Просмотр исходного кода

perf(replay): reduce postgres queries in recording consumer using get_from_cache (#77365)

@cmanallen pointed out the queries we make in _handle_breadcrumb, used
to check feature flags and project options, could overload Postgres as
we scale. To fix this we can do the query at the top-level and use
`get_from_cache`

---------

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Andrew Liu 5 месяцев назад
Родитель
Сommit
f68f0d0477

+ 3 - 1
src/sentry/replays/consumers/recording_buffered.py

@@ -56,6 +56,7 @@ from sentry_kafka_schemas.codecs import Codec, ValidationError
 from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
 
 from sentry.conf.types.kafka_definition import Topic, get_topic_codec
+from sentry.models.project import Project
 from sentry.replays.lib.storage import (
     RecordingSegmentStorageMeta,
     make_recording_filename,
@@ -318,8 +319,9 @@ def process_message(buffer: RecordingBuffer, message: bytes) -> None:
                 else None
             )
 
+        project = Project.objects.get_from_cache(id=decoded_message["project_id"])
         replay_actions = parse_replay_actions(
-            decoded_message["project_id"],
+            project,
             decoded_message["replay_id"],
             decoded_message["retention_days"],
             parsed_recording_data,

+ 2 - 1
src/sentry/replays/usecases/ingest/__init__.py

@@ -262,9 +262,10 @@ def recording_post_processor(
             op="replays.usecases.ingest.parse_and_emit_replay_actions",
             description="parse_and_emit_replay_actions",
         ):
+            project = Project.objects.get_from_cache(id=message.project_id)
             parse_and_emit_replay_actions(
                 retention_days=message.retention_days,
-                project_id=message.project_id,
+                project=project,
                 replay_id=message.replay_id,
                 segment_data=parsed_segment_data,
                 replay_event=parsed_replay_event,

+ 40 - 24
src/sentry/replays/usecases/ingest/dom_index.py

@@ -62,7 +62,7 @@ class ReplayActionsEvent(TypedDict):
 
 
 def parse_and_emit_replay_actions(
-    project_id: int,
+    project: Project,
     replay_id: str,
     retention_days: int,
     segment_data: list[dict[str, Any]],
@@ -70,7 +70,7 @@ def parse_and_emit_replay_actions(
 ) -> None:
     with metrics.timer("replays.usecases.ingest.dom_index.parse_and_emit_replay_actions"):
         message = parse_replay_actions(
-            project_id, replay_id, retention_days, segment_data, replay_event
+            project, replay_id, retention_days, segment_data, replay_event
         )
         if message is not None:
             emit_replay_actions(message)
@@ -82,19 +82,19 @@ def emit_replay_actions(action: ReplayActionsEvent) -> None:
 
 
 def parse_replay_actions(
-    project_id: int,
+    project: Project,
     replay_id: str,
     retention_days: int,
     segment_data: list[dict[str, Any]],
     replay_event: dict[str, Any] | None,
 ) -> ReplayActionsEvent | None:
     """Parse RRWeb payload to ReplayActionsEvent."""
-    actions = get_user_actions(project_id, replay_id, segment_data, replay_event)
+    actions = get_user_actions(project, replay_id, segment_data, replay_event)
     if len(actions) == 0:
         return None
 
     payload = create_replay_actions_payload(replay_id, actions)
-    return create_replay_actions_event(replay_id, project_id, retention_days, payload)
+    return create_replay_actions_event(replay_id, project.id, retention_days, payload)
 
 
 def create_replay_actions_event(
@@ -153,7 +153,7 @@ def log_canvas_size(
 
 
 def get_user_actions(
-    project_id: int,
+    project: Project,
     replay_id: str,
     events: list[dict[str, Any]],
     replay_event: dict[str, Any] | None,
@@ -177,6 +177,10 @@ def get_user_actions(
             "textContent": "Helloworld!"
         }
     """
+    # Feature flag and project option queries
+    should_report_rage = _should_report_rage_click_issue(project)
+    should_report_hydration = _should_report_hydration_error_issue(project)
+
     result: list[ReplayActionsEventPayloadClick] = []
     for event in _iter_custom_events(events):
         if len(result) == 20:
@@ -185,7 +189,14 @@ def get_user_actions(
         tag = event.get("data", {}).get("tag")
 
         if tag == "breadcrumb":
-            click = _handle_breadcrumb(event, project_id, replay_id, replay_event)
+            click = _handle_breadcrumb(
+                event,
+                project,
+                replay_id,
+                replay_event,
+                should_report_rage_click_issue=should_report_rage,
+                should_report_hydration_error_issue=should_report_hydration,
+            )
             if click is not None:
                 result.append(click)
         # look for request / response breadcrumbs and report metrics on them
@@ -193,7 +204,7 @@ def get_user_actions(
             _handle_resource_metric_event(event)
         # log the SDK options sent from the SDK 1/500 times
         if tag == "options" and random.randint(0, 499) < 1:
-            _handle_options_logging_event(project_id, replay_id, event)
+            _handle_options_logging_event(project.id, replay_id, event)
         # log large dom mutation breadcrumb events 1/100 times
 
         payload = event.get("data", {}).get("payload", {})
@@ -203,7 +214,7 @@ def get_user_actions(
             and payload.get("category") == "replay.mutations"
             and random.randint(0, 500) < 1
         ):
-            _handle_mutations_event(project_id, replay_id, event)
+            _handle_mutations_event(project.id, replay_id, event)
 
     return result
 
@@ -287,12 +298,10 @@ def _parse_classes(classes: str) -> list[str]:
     return list(filter(lambda n: n != "", classes.split(" ")))[:10]
 
 
-def _should_report_hydration_error_issue(project_id: int) -> bool:
-    project = Project.objects.get(id=project_id)
+def _should_report_hydration_error_issue(project: Project) -> bool:
     """
-    The feature is controlled by Sentry admins for release of the feature,
-    while the project option is controlled by the project owner, and is a
-    permanent setting
+    Checks the feature that's controlled by Sentry admins for release of the feature,
+    and the permanent project option, controlled by the project owner.
     """
     return features.has(
         "organizations:session-replay-hydration-error-issue-creation",
@@ -300,8 +309,10 @@ def _should_report_hydration_error_issue(project_id: int) -> bool:
     ) and project.get_option("sentry:replay_hydration_error_issues")
 
 
-def _should_report_rage_click_issue(project_id: int) -> bool:
-    project = Project.objects.get(id=project_id)
+def _should_report_rage_click_issue(project: Project) -> bool:
+    """
+    Checks the project option, controlled by a project owner.
+    """
     return project.get_option("sentry:replay_rage_click_issues")
 
 
@@ -374,7 +385,12 @@ def _handle_mutations_event(project_id: int, replay_id: str, event: dict[str, An
 
 
 def _handle_breadcrumb(
-    event: dict[str, Any], project_id: int, replay_id: str, replay_event: dict[str, Any] | None
+    event: dict[str, Any],
+    project: Project,
+    replay_id: str,
+    replay_event: dict[str, Any] | None,
+    should_report_rage_click_issue=False,
+    should_report_hydration_error_issue=False,
 ) -> ReplayActionsEventPayloadClick | None:
 
     click = None
@@ -399,15 +415,15 @@ def _handle_breadcrumb(
                 payload["data"].get("clickCount", 0) or payload["data"].get("clickcount", 0)
             ) >= 5
             click = create_click_event(
-                payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project_id
+                payload, replay_id, is_dead=True, is_rage=is_rage, project_id=project.id
             )
             if click is not None:
                 if is_rage:
                     metrics.incr("replay.rage_click_detected")
-                    if _should_report_rage_click_issue(project_id):
+                    if should_report_rage_click_issue:
                         if replay_event is not None:
                             report_rage_click_issue_with_replay_event(
-                                project_id,
+                                project.id,
                                 replay_id,
                                 payload["timestamp"],
                                 payload["message"],
@@ -418,7 +434,7 @@ def _handle_breadcrumb(
                             )
         # Log the event for tracking.
         log = event["data"].get("payload", {}).copy()
-        log["project_id"] = project_id
+        log["project_id"] = project.id
         log["replay_id"] = replay_id
         log["dom_tree"] = log.pop("message")
 
@@ -426,16 +442,16 @@ def _handle_breadcrumb(
 
     elif category == "ui.click":
         click = create_click_event(
-            payload, replay_id, is_dead=False, is_rage=False, project_id=project_id
+            payload, replay_id, is_dead=False, is_rage=False, project_id=project.id
         )
         if click is not None:
             return click
 
     elif category == "replay.hydrate-error":
         metrics.incr("replay.hydration_error_breadcrumb")
-        if replay_event is not None and _should_report_hydration_error_issue(project_id):
+        if replay_event is not None and should_report_hydration_error_issue:
             report_hydration_error_issue_with_replay_event(
-                project_id,
+                project.id,
                 replay_id,
                 payload["timestamp"],
                 payload.get("data", {}).get("url"),

+ 41 - 31
tests/sentry/replays/unit/test_ingest_dom_index.py

@@ -3,9 +3,11 @@ from __future__ import annotations
 import uuid
 from typing import Any
 from unittest import mock
+from unittest.mock import Mock
 
 import pytest
 
+from sentry.models.project import Project
 from sentry.replays.testutils import mock_replay_event
 from sentry.replays.usecases.ingest.dom_index import (
     _get_testid,
@@ -27,7 +29,15 @@ def patch_rage_click_issue_with_replay_event():
         yield m
 
 
-def test_get_user_actions():
+@pytest.fixture(autouse=True)
+def mock_project() -> Project:
+    """Has id=1. Use for unit tests so we can skip @django_db"""
+    proj = Mock(spec=Project)
+    proj.id = 1
+    return proj
+
+
+def test_get_user_actions(mock_project):
     """Test "get_user_actions" function."""
     events = [
         {
@@ -63,7 +73,7 @@ def test_get_user_actions():
         }
     ]
 
-    user_actions = get_user_actions(1, uuid.uuid4().hex, events, None)
+    user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None)
     assert len(user_actions) == 1
     assert user_actions[0]["node_id"] == 1
     assert user_actions[0]["tag"] == "div"
@@ -82,7 +92,7 @@ def test_get_user_actions():
     assert len(user_actions[0]["event_hash"]) == 36
 
 
-def test_get_user_actions_str_payload():
+def test_get_user_actions_str_payload(mock_project):
     """Test "get_user_actions" function."""
     events = [
         {
@@ -95,11 +105,11 @@ def test_get_user_actions_str_payload():
         }
     ]
 
-    user_actions = get_user_actions(1, uuid.uuid4().hex, events, None)
+    user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None)
     assert len(user_actions) == 0
 
 
-def test_get_user_actions_missing_node():
+def test_get_user_actions_missing_node(mock_project):
     """Test "get_user_actions" function."""
     events = [
         {
@@ -117,11 +127,11 @@ def test_get_user_actions_missing_node():
         }
     ]
 
-    user_actions = get_user_actions(1, uuid.uuid4().hex, events, None)
+    user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None)
     assert len(user_actions) == 0
 
 
-def test_get_user_actions_performance_spans():
+def test_get_user_actions_performance_spans(mock_project):
     """Test that "get_user_actions" doesn't error when collecting rsrc metrics, on various formats of performanceSpan"""
     # payloads are not realistic examples - only include the fields necessary for testing
     # TODO: does not test if metrics.distribution() is called downstream, with correct param types.
@@ -193,10 +203,10 @@ def test_get_user_actions_performance_spans():
             },
         },
     ]
-    get_user_actions(1, uuid.uuid4().hex, events, None)
+    get_user_actions(mock_project, uuid.uuid4().hex, events, None)
 
 
-def test_parse_replay_actions():
+def test_parse_replay_actions(mock_project):
     events = [
         {
             "type": 5,
@@ -231,7 +241,7 @@ def test_parse_replay_actions():
             },
         }
     ]
-    replay_actions = parse_replay_actions(1, "1", 30, events, None)
+    replay_actions = parse_replay_actions(mock_project, "1", 30, events, None)
 
     assert replay_actions is not None
     assert replay_actions["type"] == "replay_event"
@@ -375,7 +385,7 @@ def test_parse_replay_dead_click_actions(patch_rage_click_issue_with_replay_even
     ]
 
     default_project.update_option("sentry:replay_rage_click_issues", True)
-    replay_actions = parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event())
+    replay_actions = parse_replay_actions(default_project, "1", 30, events, mock_replay_event())
     assert patch_rage_click_issue_with_replay_event.call_count == 2
     assert replay_actions is not None
     assert replay_actions["type"] == "replay_event"
@@ -528,7 +538,7 @@ def test_rage_click_issue_creation_no_component_name(
     ]
 
     default_project.update_option("sentry:replay_rage_click_issues", True)
-    parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event())
+    parse_replay_actions(default_project, "1", 30, events, mock_replay_event())
 
     # test that 2 rage click issues are still created
     assert patch_rage_click_issue_with_replay_event.call_count == 2
@@ -575,7 +585,7 @@ def test_parse_replay_click_actions_not_dead(
         }
     ]
 
-    replay_actions = parse_replay_actions(default_project.id, "1", 30, events, None)
+    replay_actions = parse_replay_actions(default_project, "1", 30, events, None)
     assert patch_rage_click_issue_with_replay_event.delay.call_count == 0
     assert replay_actions is None
 
@@ -619,7 +629,7 @@ def test_parse_replay_rage_click_actions(default_project):
             },
         }
     ]
-    replay_actions = parse_replay_actions(default_project.id, "1", 30, events, None)
+    replay_actions = parse_replay_actions(default_project, "1", 30, events, None)
 
     assert replay_actions is not None
     assert replay_actions["type"] == "replay_event"
@@ -659,7 +669,7 @@ def test_encode_as_uuid():
     assert isinstance(uuid.UUID(a), uuid.UUID)
 
 
-def test_parse_request_response_latest():
+def test_parse_request_response_latest(mock_project):
     events = [
         {
             "type": 5,
@@ -698,14 +708,14 @@ def test_parse_request_response_latest():
         }
     ]
     with mock.patch("sentry.utils.metrics.distribution") as timing:
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert timing.call_args_list == [
             mock.call("replays.usecases.ingest.request_body_size", 2949, unit="byte"),
             mock.call("replays.usecases.ingest.response_body_size", 94, unit="byte"),
         ]
 
 
-def test_parse_request_response_no_info():
+def test_parse_request_response_no_info(mock_project):
     events = [
         {
             "type": 5,
@@ -726,11 +736,11 @@ def test_parse_request_response_no_info():
             },
         },
     ]
-    parse_replay_actions(1, "1", 30, events, None)
+    parse_replay_actions(mock_project, "1", 30, events, None)
     # just make sure we don't raise
 
 
-def test_parse_request_response_old_format_request_only():
+def test_parse_request_response_old_format_request_only(mock_project):
     events = [
         {
             "type": 5,
@@ -753,13 +763,13 @@ def test_parse_request_response_old_format_request_only():
         },
     ]
     with mock.patch("sentry.utils.metrics.distribution") as timing:
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert timing.call_args_list == [
             mock.call("replays.usecases.ingest.request_body_size", 1002, unit="byte"),
         ]
 
 
-def test_parse_request_response_old_format_response_only():
+def test_parse_request_response_old_format_response_only(mock_project):
     events = [
         {
             "type": 5,
@@ -781,13 +791,13 @@ def test_parse_request_response_old_format_response_only():
         },
     ]
     with mock.patch("sentry.utils.metrics.distribution") as timing:
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert timing.call_args_list == [
             mock.call("replays.usecases.ingest.response_body_size", 1002, unit="byte"),
         ]
 
 
-def test_parse_request_response_old_format_request_and_response():
+def test_parse_request_response_old_format_request_and_response(mock_project):
     events = [
         {
             "type": 5,
@@ -810,7 +820,7 @@ def test_parse_request_response_old_format_request_and_response():
         },
     ]
     with mock.patch("sentry.utils.metrics.distribution") as timing:
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert timing.call_args_list == [
             mock.call("replays.usecases.ingest.request_body_size", 1002, unit="byte"),
             mock.call("replays.usecases.ingest.response_body_size", 8001, unit="byte"),
@@ -930,7 +940,7 @@ def test_parse_replay_rage_clicks_with_replay_event(
     ]
 
     default_project.update_option("sentry:replay_rage_click_issues", True)
-    replay_actions = parse_replay_actions(default_project.id, "1", 30, events, mock_replay_event())
+    replay_actions = parse_replay_actions(default_project, "1", 30, events, mock_replay_event())
     assert patch_rage_click_issue_with_replay_event.call_count == 2
     assert replay_actions is not None
     assert replay_actions["type"] == "replay_event"
@@ -941,7 +951,7 @@ def test_parse_replay_rage_clicks_with_replay_event(
     assert isinstance(replay_actions["payload"], list)
 
 
-def test_log_sdk_options():
+def test_log_sdk_options(mock_project):
     events: list[dict[str, Any]] = [
         {
             "data": {
@@ -973,11 +983,11 @@ def test_log_sdk_options():
         mock.patch("random.randint") as randint,
     ):
         randint.return_value = 0
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert logger.info.call_args_list == [mock.call("sentry.replays.slow_click", extra=log)]
 
 
-def test_log_large_dom_mutations():
+def test_log_large_dom_mutations(mock_project):
     events: list[dict[str, Any]] = [
         {
             "type": 5,
@@ -1003,7 +1013,7 @@ def test_log_large_dom_mutations():
         mock.patch("random.randint") as randint,
     ):
         randint.return_value = 0
-        parse_replay_actions(1, "1", 30, events, None)
+        parse_replay_actions(mock_project, "1", 30, events, None)
         assert logger.info.call_args_list == [mock.call("Large DOM Mutations List:", extra=log)]
 
 
@@ -1080,7 +1090,7 @@ def test_log_canvas_size():
     log_canvas_size(1, 1, "a", [])
 
 
-def test_emit_click_negative_node_id():
+def test_emit_click_negative_node_id(mock_project):
     """Test "get_user_actions" function."""
     events = [
         {
@@ -1116,5 +1126,5 @@ def test_emit_click_negative_node_id():
         }
     ]
 
-    user_actions = get_user_actions(1, uuid.uuid4().hex, events, None)
+    user_actions = get_user_actions(mock_project, uuid.uuid4().hex, events, None)
     assert len(user_actions) == 0