Browse Source

feat(snuba): snuba support issue platform for snql (#68374)

This PR adds a parallel query for the issue platform dataset so we can
query tags on those events.
Stephen Cefali 11 months ago
parent
commit
d912f6ca3a

+ 131 - 97
src/sentry/search/snuba/executors.py

@@ -1118,7 +1118,9 @@ class InvalidQueryForExecutor(Exception):
 
 
 class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
-    def get_basic_group_snuba_condition(self, search_filter: SearchFilter) -> Condition:
+    def get_basic_group_snuba_condition(
+        self, search_filter: SearchFilter, joined_entitity: Entity
+    ) -> Condition:
         """
         Returns the basic lookup for a search filter.
         """
@@ -1128,18 +1130,23 @@ class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
             search_filter.value.raw_value,
         )
 
-    def get_basic_event_snuba_condition(self, search_filter: SearchFilter) -> Condition:
+    def get_basic_event_snuba_condition(
+        self, search_filter: SearchFilter, joined_entitity: Entity
+    ) -> Condition:
         """
         Returns the basic lookup for a search filter.
         """
+
+        dataset = Dataset.Events if joined_entitity.alias == "e" else Dataset.IssuePlatform
+
         query_builder = UnresolvedQuery(
-            dataset=Dataset.Events,
-            entity=self.entities["event"],
+            dataset=dataset,
+            entity=joined_entitity,
             params={},
         )
         return query_builder.default_filter_converter(search_filter)
 
-    def get_assigned(self, search_filter: SearchFilter) -> Condition:
+    def get_assigned(self, search_filter: SearchFilter, joined_entitity: Entity) -> Condition:
         """
         Returns the assigned lookup for a search filter.
         """
@@ -1175,7 +1182,7 @@ class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
 
         return BooleanCondition(op=BooleanOp.OR, conditions=conditions)
 
-    def get_suggested(self, search_filter: SearchFilter) -> Condition:
+    def get_suggested(self, search_filter: SearchFilter, joined_entitity: Entity) -> Condition:
         """
         Returns the suggested lookup for a search filter.
         """
@@ -1245,20 +1252,42 @@ class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
             conditions=conditions,
         )
 
-    def get_assigned_or_suggested(self, search_filter: SearchFilter) -> Condition:
+    def get_assigned_or_suggested(
+        self, search_filter: SearchFilter, joined_entitity: Entity
+    ) -> Condition:
         return BooleanCondition(
             op=BooleanOp.OR,
             conditions=[
-                self.get_assigned(search_filter),
-                self.get_suggested(search_filter),
+                self.get_assigned(search_filter, joined_entitity),
+                self.get_suggested(search_filter, joined_entitity),
             ],
         )
 
+    def get_last_seen_aggregation(self, joined_entity: Entity) -> Function:
+        return Function(
+            "ifNull",
+            [
+                Function(
+                    "multiply",
+                    [
+                        Function(
+                            "toUInt64",
+                            [Function("max", [Column("timestamp", joined_entity)])],
+                        ),
+                        1000,
+                    ],
+                ),
+                0,
+            ],
+            alias="score",
+        )
+
     ISSUE_FIELD_NAME = "group_id"
 
     entities = {
         "event": Entity("events", alias="e"),
         "attrs": Entity("group_attributes", alias="g"),
+        "search_issues": Entity("search_issues", alias="si"),
     }
 
     group_conditions_lookup = {
@@ -1268,31 +1297,16 @@ class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
         "assigned_to": get_assigned,
     }
 
-    last_seen_aggregation = Function(
-        "ifNull",
-        [
-            Function(
-                "multiply",
-                [
-                    Function(
-                        "toUInt64", [Function("max", [Column("timestamp", entities["event"])])]
-                    ),
-                    1000,
-                ],
-            ),
-            0,
-        ],
-        alias="score",
-    )
     first_seen = Column("group_first_seen", entities["attrs"])
     times_seen_aggregation = Function("count", [], alias="times_seen")
 
-    sort_defs = {
-        "date": last_seen_aggregation,
-        "new": first_seen,
-        "freq": times_seen_aggregation,
-        "user": Function("uniq", [Column("tags[sentry:user]", entities["event"])], "user_count"),
-    }
+    def get_sort_defs(self, entity):
+        return {
+            "date": self.get_last_seen_aggregation(entity),
+            "new": self.first_seen,
+            "freq": self.times_seen_aggregation,
+            "user": Function("uniq", [Column("tags[sentry:user]", entity)], "user_count"),
+        }
 
     sort_strategies = {
         "new": "g.group_first_seen",
@@ -1362,80 +1376,100 @@ class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
 
         event_entity = self.entities["event"]
         attr_entity = self.entities["attrs"]
+        search_issues_entity = self.entities["search_issues"]
+
+        queries = []
+        entities_to_check = [event_entity, search_issues_entity]
+        for joined_entity in entities_to_check:
+            where_conditions = [
+                Condition(Column("project_id", joined_entity), Op.IN, [p.id for p in projects]),
+                Condition(Column("project_id", attr_entity), Op.IN, [p.id for p in projects]),
+                Condition(Column("timestamp", joined_entity), Op.GTE, start),
+                Condition(Column("timestamp", joined_entity), Op.LT, end),
+            ]
+            for search_filter in search_filters or ():
+                # use the stored function if it exists in our mapping, otherwise use the basic lookup
+                fn = self.group_conditions_lookup.get(search_filter.key.name)
+                if fn:
+                    where_conditions.append(fn(self, search_filter, joined_entity))
+                else:
+                    where_conditions.append(
+                        self.get_basic_event_snuba_condition(search_filter, joined_entity)
+                    )
 
-        where_conditions = [
-            Condition(Column("project_id", event_entity), Op.IN, [p.id for p in projects]),
-            Condition(Column("project_id", attr_entity), Op.IN, [p.id for p in projects]),
-            Condition(Column("timestamp", event_entity), Op.GTE, start),
-            Condition(Column("timestamp", event_entity), Op.LT, end),
-        ]
-        for search_filter in search_filters or ():
-            # use the stored function if it exists in our mapping, otherwise use the basic lookup
-            fn = self.group_conditions_lookup.get(search_filter.key.name)
-            if fn:
-                where_conditions.append(fn(self, search_filter))
-            else:
-                where_conditions.append(self.get_basic_event_snuba_condition(search_filter))
-
-        if environments:
-            # TODO: Should this be handled via filter_keys, once we have a snql compatible version?
-            where_conditions.append(
-                Condition(
-                    Column("environment", event_entity), Op.IN, [e.name for e in environments]
+            if environments:
+                # TODO: Should this be handled via filter_keys, once we have a snql compatible version?
+                where_conditions.append(
+                    Condition(
+                        Column("environment", joined_entity), Op.IN, [e.name for e in environments]
+                    )
                 )
-            )
 
-        sort_func = self.sort_defs[sort_by]
+            sort_func = self.get_sort_defs(joined_entity)[sort_by]
 
-        having = []
-        if cursor is not None:
-            op = Op.GTE if cursor.is_prev else Op.LTE
-            having.append(Condition(sort_func, op, cursor.value))
-
-        tenant_ids = {"organization_id": projects[0].organization_id} if projects else None
-        groupby = [Column("group_id", attr_entity)]
-        select = [Column("group_id", attr_entity)]
-        if sort_by == "new":
-            groupby.append(Column("group_first_seen", attr_entity))
-            select.append(Column("group_first_seen", attr_entity))
-
-        select.append(sort_func)
-
-        query = Query(
-            match=Join([Relationship(event_entity, "attributes", attr_entity)]),
-            select=select,
-            where=where_conditions,
-            groupby=groupby,
-            having=having,
-            orderby=[OrderBy(sort_func, direction=Direction.DESC)],
-            limit=Limit(limit + 1),
-        )
-        request = Request(
-            dataset="events",
-            app_id="group_attributes",
-            query=query,
-            tenant_ids=tenant_ids,
-        )
-        data = snuba.raw_snql_query(request, referrer="search.snuba.group_attributes_search.query")[
-            "data"
-        ]
+            having = []
+            if cursor is not None:
+                op = Op.GTE if cursor.is_prev else Op.LTE
+                having.append(Condition(sort_func, op, cursor.value))
 
-        hits_query = Query(
-            match=Join([Relationship(event_entity, "attributes", attr_entity)]),
-            select=[
-                Function("uniq", [Column("group_id", attr_entity)], alias="count"),
-            ],
-            where=where_conditions,
-        )
-        hits = None
-        if count_hits:
+            tenant_ids = {"organization_id": projects[0].organization_id} if projects else None
+            groupby = [Column("group_id", attr_entity)]
+            select = [Column("group_id", attr_entity)]
+            if sort_by == "new":
+                groupby.append(Column("group_first_seen", attr_entity))
+                select.append(Column("group_first_seen", attr_entity))
+
+            select.append(sort_func)
+
+            query = Query(
+                match=Join([Relationship(joined_entity, "attributes", attr_entity)]),
+                select=select,
+                where=where_conditions,
+                groupby=groupby,
+                having=having,
+                orderby=[OrderBy(sort_func, direction=Direction.DESC)],
+                limit=Limit(limit + 1),
+            )
             request = Request(
-                dataset="events", app_id="group_attributes", query=hits_query, tenant_ids=tenant_ids
+                dataset="events",
+                app_id="group_attributes",
+                query=query,
+                tenant_ids=tenant_ids,
             )
-            hits = snuba.raw_snql_query(
-                request, referrer="search.snuba.group_attributes_search.hits"
-            )["data"][0]["count"]
+            queries.append(request)
+
+            if count_hits:
+                hits_query = Query(
+                    match=Join([Relationship(joined_entity, "attributes", attr_entity)]),
+                    select=[
+                        Function("uniq", [Column("group_id", attr_entity)], alias="count"),
+                    ],
+                    where=where_conditions,
+                )
+                request = Request(
+                    dataset="events",
+                    app_id="group_attributes",
+                    query=hits_query,
+                    tenant_ids=tenant_ids,
+                )
+                queries.append(request)
+
+        bulk_result = snuba.bulk_snql_query(
+            queries, referrer="search.snuba.group_attributes_search.query"
+        )
 
+        data = []
+        count = 0
+        # get the query data and the query counts
+        k = 0
+        for _ in range(len(entities_to_check)):
+            data.extend(bulk_result[k]["data"])
+            if count_hits:
+                k += 1
+                count += bulk_result[k]["data"][0]["count"]
+            k += 1
+
+        hits = 0
         paginator_results = SequencePaginator(
             [(row[self.sort_strategies[sort_by]], row["g.group_id"]) for row in data],
             reverse=True,

+ 70 - 4
tests/sentry/issues/endpoints/test_organization_group_index.py

@@ -1,15 +1,19 @@
 import functools
 from datetime import UTC, datetime, timedelta
+from time import sleep
 from unittest.mock import Mock, call, patch
 from uuid import uuid4
 
-import pytest
 from dateutil.parser import parse as parse_datetime
 from django.urls import reverse
 from django.utils import timezone
 
 from sentry import options
-from sentry.issues.grouptype import PerformanceNPlusOneGroupType, PerformanceSlowDBQueryGroupType
+from sentry.issues.grouptype import (
+    PerformanceNPlusOneGroupType,
+    PerformanceRenderBlockingAssetSpanGroupType,
+    PerformanceSlowDBQueryGroupType,
+)
 from sentry.models.activity import Activity
 from sentry.models.apitoken import ApiToken
 from sentry.models.group import Group, GroupStatus
@@ -55,9 +59,10 @@ from sentry.testutils.silo import assume_test_silo_mode
 from sentry.types.activity import ActivityType
 from sentry.types.group import GroupSubStatus, PriorityLevel
 from sentry.utils import json
+from tests.sentry.issues.test_utils import SearchIssueTestMixin
 
 
-class GroupListTest(APITestCase, SnubaTestCase):
+class GroupListTest(APITestCase, SnubaTestCase, SearchIssueTestMixin):
     endpoint = "sentry-api-0-organization-group-index"
 
     def setUp(self):
@@ -2666,7 +2671,6 @@ class GroupListTest(APITestCase, SnubaTestCase):
                 )
                 assert mock_query.call_count == 0
 
-    @pytest.mark.skip(reason="Need to fix")
     @patch(
         "sentry.search.snuba.executors.GroupAttributesPostgresSnubaQueryExecutor.query",
         side_effect=GroupAttributesPostgresSnubaQueryExecutor.query,
@@ -2684,6 +2688,8 @@ class GroupListTest(APITestCase, SnubaTestCase):
             project_id=self.project.id,
         )
         self.login_as(user=self.user)
+        # give time for consumers to run and propogate changes to clickhouse
+        sleep(1)
         response = self.get_success_response(
             sort="new",
             useGroupSnubaDataset=1,
@@ -2693,6 +2699,66 @@ class GroupListTest(APITestCase, SnubaTestCase):
         assert int(response.data[0]["id"]) == event1.group.id
         assert mock_query.call_count == 1
 
+    @patch(
+        "sentry.search.snuba.executors.GroupAttributesPostgresSnubaQueryExecutor.query",
+        side_effect=GroupAttributesPostgresSnubaQueryExecutor.query,
+        autospec=True,
+    )
+    @override_options({"issues.group_attributes.send_kafka": True})
+    @with_feature("organizations:issue-platform")
+    def test_snuba_perf_issue(self, mock_query):
+        self.project = self.create_project(organization=self.organization)
+        # create a performance issue
+        _, _, group_info = self.store_search_issue(
+            self.project.id,
+            233,
+            [f"{PerformanceRenderBlockingAssetSpanGroupType.type_id}-group1"],
+            user={"email": "myemail@example.com"},
+            event_data={
+                "type": "transaction",
+                "start_timestamp": iso_format(datetime.now() - timedelta(minutes=1)),
+                "contexts": {"trace": {"trace_id": "b" * 32, "span_id": "c" * 16, "op": ""}},
+            },
+        )
+
+        # make mypy happy
+        perf_group_id = group_info.group.id if group_info else None
+
+        # create an error issue with the same tag
+        error_event = self.store_event(
+            data={
+                "fingerprint": ["error-issue"],
+                "event_id": "e" * 32,
+                "user": {"email": "myemail@example.com"},
+            },
+            project_id=self.project.id,
+        )
+        # another error issue with a different tag
+        self.store_event(
+            data={
+                "fingerprint": ["error-issue-2"],
+                "event_id": "e" * 32,
+                "user": {"email": "different@example.com"},
+            },
+            project_id=self.project.id,
+        )
+
+        assert Group.objects.filter(id=perf_group_id).exists()
+        self.login_as(user=self.user)
+        # give time for consumers to run and propogate changes to clickhouse
+        sleep(1)
+        response = self.get_success_response(
+            sort="new",
+            useGroupSnubaDataset=1,
+            query="user.email:myemail@example.com",
+        )
+        assert len(response.data) == 2
+        assert {r["id"] for r in response.data} == {
+            str(perf_group_id),
+            str(error_event.group.id),
+        }
+        assert mock_query.call_count == 1
+
 
 class GroupUpdateTest(APITestCase, SnubaTestCase):
     endpoint = "sentry-api-0-organization-group-index"

+ 2 - 0
tests/sentry/issues/test_utils.py

@@ -94,6 +94,7 @@ class SearchIssueTestMixin(OccurrenceTestMixin):
         tags: Sequence[tuple[str, Any]] | None = None,
         release: str | None = None,
         user: dict[str, Any] | None = None,
+        event_data: dict[str, Any] | None = None,
     ) -> tuple[Event, IssueOccurrence, GroupInfo | None]:
         from sentry.utils import snuba
 
@@ -103,6 +104,7 @@ class SearchIssueTestMixin(OccurrenceTestMixin):
         event_data = {
             "tags": [("sentry:user", user_id_val)],
             "timestamp": iso_format(insert_timestamp),
+            **(event_data or {}),
         }
         if tags:
             event_data["tags"].extend(tags)

+ 1 - 1
tests/snuba/search/test_backend.py

@@ -2602,7 +2602,7 @@ class EventsJoinedGroupAttributesSnubaSearchTest(TransactionTestCase, EventsSnub
         assert set(results) == {self.group1}
 
         # introduce a slight delay so the async future has time to run and log the metric
-        time.sleep(0.10)
+        time.sleep(1)
 
         metrics_incr_called = False
         for call in metrics_incr.call_args_list: