Browse Source

fix(replays): Add custom replay paginator (#65724)

Adds a custom replay paginator to handle cases where the search and sort
query returns more results than the data query. We want to ensure that
the next page is always fetch-able _if it has the possibility of
existing_.

Pagination works on the following logic:

- Fetch the `limit` + 1.
- If `limit` + 1 items found then the resource has a next page.
- Pop the last item and return the items to the client with a next page
header.
- If < `limit` + 1 items found then the resource has no next page.
    - Return the items to the client without a next page header.

Replays makes two queries. A search and sort query and a data query. The
data query has an assumption baked in that the *scalar* search and sort
query does not. There must be a segment 0. The scalar query does not
enforce this constraint because its trying to find at least one varying
field and that varying field might not be on segment 0.

The search and sort query will return `limit` + 1 items. But because any
of those replays may be missing their segment 0 entry they will be
removed from the data query. The paginator sees `< limit + 1` and
assumes there must not be a second page.

We fix this by returning the next page header if the search and sort
query thinks there's a next page -- not the data query.
Colton Allen 1 year ago
parent
commit
3c5217eeec

+ 25 - 5
src/sentry/replays/endpoints/organization_replay_index.py

@@ -1,3 +1,5 @@
+from collections.abc import Callable
+
 from drf_spectacular.utils import extend_schema
 from rest_framework.exceptions import ParseError
 from rest_framework.request import Request
@@ -9,7 +11,6 @@ from sentry.api.api_publish_status import ApiPublishStatus
 from sentry.api.base import region_silo_endpoint
 from sentry.api.bases.organization import NoProjects, OrganizationEndpoint
 from sentry.api.event_search import parse_search_query
-from sentry.api.paginator import GenericOffsetPaginator
 from sentry.apidocs.constants import RESPONSE_BAD_REQUEST, RESPONSE_FORBIDDEN
 from sentry.apidocs.examples.replay_examples import ReplayExamples
 from sentry.apidocs.parameters import GlobalParams
@@ -17,9 +18,10 @@ from sentry.apidocs.utils import inline_sentry_response_serializer
 from sentry.exceptions import InvalidSearchQuery
 from sentry.models.organization import Organization
 from sentry.replays.post_process import ReplayDetailsResponse, process_raw_response
-from sentry.replays.query import query_replays_collection, replay_url_parser_config
+from sentry.replays.query import query_replays_collection_raw, replay_url_parser_config
 from sentry.replays.usecases.errors import handled_snuba_exceptions
 from sentry.replays.validators import ReplayValidator
+from sentry.utils.cursors import Cursor, CursorResult
 
 
 @region_silo_endpoint
@@ -76,7 +78,7 @@ class OrganizationReplayIndexEndpoint(OrganizationEndpoint):
             if key not in filter_params:
                 filter_params[key] = value
 
-        def data_fn(offset, limit):
+        def data_fn(offset: int, limit: int):
             try:
                 search_filters = parse_search_query(
                     request.query_params.get("query", ""), config=replay_url_parser_config
@@ -84,7 +86,7 @@ class OrganizationReplayIndexEndpoint(OrganizationEndpoint):
             except InvalidSearchQuery as e:
                 raise ParseError(str(e))
 
-            return query_replays_collection(
+            return query_replays_collection_raw(
                 project_ids=filter_params["project_id"],
                 start=filter_params["start"],
                 end=filter_params["end"],
@@ -100,7 +102,7 @@ class OrganizationReplayIndexEndpoint(OrganizationEndpoint):
 
         return self.paginate(
             request=request,
-            paginator=GenericOffsetPaginator(data_fn=data_fn),
+            paginator=ReplayPaginator(data_fn=data_fn),
             on_results=lambda results: {
                 "data": process_raw_response(
                     results,
@@ -108,3 +110,21 @@ class OrganizationReplayIndexEndpoint(OrganizationEndpoint):
                 )
             },
         )
+
+
+class ReplayPaginator:
+    """Defers all pagination decision making to the implementation."""
+
+    def __init__(self, data_fn: Callable[[int, int], tuple[list, bool]]) -> None:
+        self.data_fn = data_fn
+
+    def get_result(self, limit: int, cursor=None):
+        assert limit > 0
+        offset = int(cursor.offset) if cursor is not None else 0
+        data, has_more = self.data_fn(offset, limit + 1)
+
+        return CursorResult(
+            data,
+            prev=Cursor(0, max(0, offset - limit), True, offset > 0),
+            next=Cursor(0, max(0, offset + limit), False, has_more),
+        )

+ 10 - 5
src/sentry/replays/query.py

@@ -39,21 +39,26 @@ MAX_REPLAY_LENGTH_HOURS = 1
 ELIGIBLE_SUBQUERY_SORTS = {"started_at", "browser.name", "os.name"}
 
 
-def query_replays_collection(
+# Compatibility function for getsentry code.
+def query_replays_collection(*args, **kwargs):
+    return query_replays_collection_raw(*args, **kwargs)[0]
+
+
+def query_replays_collection_raw(
     project_ids: list[int],
     start: datetime,
     end: datetime,
     environment: list[str],
     fields: list[str],
     sort: str | None,
-    limit: str | None,
-    offset: str | None,
+    limit: int,
+    offset: int,
     search_filters: Sequence[SearchFilter],
     organization: Organization | None = None,
     actor: Any | None = None,
-) -> dict:
+):
     """Query aggregated replay collection."""
-    paginators = make_pagination_values(limit, offset)
+    paginators = Paginators(limit, offset)
 
     return query_using_optimized_search(
         fields=fields,

+ 19 - 8
src/sentry/replays/usecases/query/__init__.py

@@ -12,9 +12,9 @@ external source, makes decisions around what to query and when, and is responsib
 intelligible output for the "post_process" module.  More information on its implementation can be
 found in the function.
 """
+
 from __future__ import annotations
 
-from collections import namedtuple
 from collections.abc import Mapping, Sequence
 from datetime import datetime, timedelta
 from typing import Any, cast
@@ -124,6 +124,8 @@ def search_filter_to_condition(
 
 # Everything below here will move to replays/query.py once we deprecate the old query behavior.
 # Leaving it here for now so this is easier to review/remove.
+import dataclasses
+
 from sentry.replays.usecases.query.configs.aggregate import search_config as agg_search_config
 from sentry.replays.usecases.query.configs.aggregate_sort import sort_config as agg_sort_config
 from sentry.replays.usecases.query.configs.aggregate_sort import sort_is_scalar_compatible
@@ -132,7 +134,11 @@ from sentry.replays.usecases.query.configs.scalar import (
     scalar_search_config,
 )
 
-Paginators = namedtuple("Paginators", ("limit", "offset"))
+
+@dataclasses.dataclass
+class Paginators:
+    limit: int
+    offset: int
 
 
 def query_using_optimized_search(
@@ -140,7 +146,7 @@ def query_using_optimized_search(
     search_filters: Sequence[SearchFilter | str | ParenExpression],
     environments: list[str],
     sort: str | None,
-    pagination: Paginators | None,
+    pagination: Paginators,
     organization: Organization | None,
     project_ids: list[int],
     period_start: datetime,
@@ -178,16 +184,21 @@ def query_using_optimized_search(
         )
         referrer = "replays.query.browse_aggregated_conditions_subquery"
 
-    if pagination:
-        query = query.set_limit(pagination.limit)
-        query = query.set_offset(pagination.offset)
+    query = query.set_limit(pagination.limit)
+    query = query.set_offset(pagination.offset)
 
     subquery_response = execute_query(query, tenant_id, referrer)
 
+    # The query "has more rows" if the number of rows found matches the limit (which is
+    # the requested limit + 1).
+    has_more = len(subquery_response.get("data", [])) == pagination.limit
+    if has_more:
+        subquery_response["data"].pop()
+
     # These replay_ids are ordered by the OrderBy expression in the query above.
     replay_ids = [row["replay_id"] for row in subquery_response.get("data", [])]
     if not replay_ids:
-        return []
+        return [], has_more
 
     # The final aggregation step.  Here we pass the replay_ids as the only filter.  In this step
     # we select everything and use as much memory as we need to complete the operation.
@@ -206,7 +217,7 @@ def query_using_optimized_search(
         referrer="replays.query.browse_query",
     )["data"]
 
-    return _make_ordered(replay_ids, results)
+    return _make_ordered(replay_ids, results), has_more
 
 
 def make_scalar_search_conditions_query(

+ 51 - 1
tests/sentry/replays/test_organization_replay_index.py

@@ -475,6 +475,56 @@ class OrganizationReplayIndexTest(APITestCase, ReplaysSnubaTestCase):
             assert "data" in response_data
             assert len(response_data["data"]) == 0
 
+    def test_get_replays_pagination_missing_segment(self):
+        """Test replays can be paginated.
+
+        This test presents an interesting case where the first page is blank but the
+        second page contains data. This is not a bug. In normal operation there will
+        be 50 rows with possibly one missing. The hope is no one will notice the missing
+        row and the pagination buttons will work correctly.
+
+        To properly fix this bug, the scalar optimized query must filter by segment-id
+        0. However, doing this will significantly decrease the performance of large
+        customers. Until a solution is found for this problem this weird pagination
+        behavior should be left in place.
+        """
+        project = self.create_project(teams=[self.team])
+
+        replay1_id = uuid.uuid4().hex
+        replay2_id = uuid.uuid4().hex
+        replay1_timestamp0 = datetime.datetime.now() - datetime.timedelta(seconds=15)
+        replay1_timestamp1 = datetime.datetime.now() - datetime.timedelta(seconds=5)
+        replay2_timestamp1 = datetime.datetime.now() - datetime.timedelta(seconds=2)
+
+        self.store_replays(mock_replay(replay1_timestamp0, project.id, replay1_id, segment_id=0))
+        self.store_replays(mock_replay(replay1_timestamp1, project.id, replay1_id, segment_id=1))
+        # Missing segment 0
+        self.store_replays(mock_replay(replay2_timestamp1, project.id, replay2_id, segment_id=1))
+
+        with self.feature(REPLAYS_FEATURES):
+            # The first replay found does not have a starting segment. So it is removed from
+            # the response output. But the response still contains the next_page headers.
+            response = self.get_success_response(
+                self.organization.slug,
+                cursor=Cursor(0, 0),
+                per_page=1,
+            )
+            response_data = response.json()
+            assert "data" in response_data
+            assert len(response_data["data"]) == 0
+            assert 'rel="next"; results="true"; cursor="0:1:0"' in response.headers["Link"]
+
+            # The next page has a replay with a starting segment so it is returned.
+            response = self.get_success_response(
+                self.organization.slug,
+                cursor=Cursor(0, 1),
+                per_page=1,
+            )
+            response_data = response.json()
+            assert "data" in response_data
+            assert response_data["data"][0]["id"] == replay1_id
+            assert len(response_data["data"]) == 1
+
     def test_get_replays_user_filters(self):
         """Test replays conform to the interchange format."""
         project = self.create_project(teams=[self.team])
@@ -1321,7 +1371,7 @@ class OrganizationReplayIndexTest(APITestCase, ReplaysSnubaTestCase):
         with self.feature(REPLAYS_FEATURES):
             # Invalid field-names error regardless of ordering.
             with mock.patch(
-                "sentry.replays.endpoints.organization_replay_index.query_replays_collection",
+                "sentry.replays.endpoints.organization_replay_index.query_replays_collection_raw",
                 side_effect=QueryMemoryLimitExceeded("mocked error"),
             ):
                 response = self.client.get(self.url)