|
@@ -47,7 +47,7 @@ from sentry.issues.search import (
|
|
|
)
|
|
|
from sentry.models import Environment, Group, Organization, Project
|
|
|
from sentry.search.events.filter import convert_search_filter_to_snuba_query, format_search_filter
|
|
|
-from sentry.search.utils import validate_cdc_search_filters
|
|
|
+from sentry.search.utils import SupportedConditions, validate_cdc_search_filters
|
|
|
from sentry.snuba.dataset import Dataset
|
|
|
from sentry.utils import json, metrics, snuba
|
|
|
from sentry.utils.cursors import Cursor, CursorResult
|
|
@@ -1331,3 +1331,205 @@ class CdcPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
|
|
|
# TODO: Add types to paginators and remove this
|
|
|
return cast(CursorResult[Group], paginator_results)
|
|
|
+
|
|
|
+
|
|
|
+class GroupAttributesPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
+ entities = {
|
|
|
+ "event": Entity("events", alias="e"),
|
|
|
+ "attrs": Entity("group_attributes", alias="g"),
|
|
|
+ }
|
|
|
+
|
|
|
+ supported_cdc_conditions = [
|
|
|
+ SupportedConditions("status", frozenset(["IN"])),
|
|
|
+ ]
|
|
|
+ supported_cdc_conditions_lookup = {
|
|
|
+ condition.field_name: condition for condition in supported_cdc_conditions
|
|
|
+ }
|
|
|
+
|
|
|
+ last_seen_aggregation = Function(
|
|
|
+ "ifNull",
|
|
|
+ [
|
|
|
+ Function(
|
|
|
+ "multiply",
|
|
|
+ [
|
|
|
+ Function(
|
|
|
+ "toUInt64", [Function("max", [Column("timestamp", entities["event"])])]
|
|
|
+ ),
|
|
|
+ 1000,
|
|
|
+ ],
|
|
|
+ ),
|
|
|
+ 0,
|
|
|
+ ],
|
|
|
+ )
|
|
|
+
|
|
|
+ def calculate_start_end(
|
|
|
+ self,
|
|
|
+ retention_window_start: Optional[datetime],
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]],
|
|
|
+ date_from: Optional[datetime],
|
|
|
+ date_to: Optional[datetime],
|
|
|
+ ) -> Tuple[datetime, datetime, datetime]:
|
|
|
+ now = timezone.now()
|
|
|
+ end = None
|
|
|
+ end_params = [_f for _f in [date_to, get_search_filter(search_filters, "date", "<")] if _f]
|
|
|
+ if end_params:
|
|
|
+ end = min(end_params)
|
|
|
+
|
|
|
+ if not end:
|
|
|
+ end = now + ALLOWED_FUTURE_DELTA
|
|
|
+
|
|
|
+ retention_date = max(_f for _f in [retention_window_start, now - timedelta(days=90)] if _f)
|
|
|
+ start_params = [date_from, retention_date, get_search_filter(search_filters, "date", ">")]
|
|
|
+ start = max(_f for _f in start_params if _f)
|
|
|
+ end = max([retention_date, end])
|
|
|
+ return start, end, retention_date
|
|
|
+
|
|
|
+ def validate_cdc_search_filters(self, search_filters: Optional[Sequence[SearchFilter]]) -> bool:
|
|
|
+ """
|
|
|
+ Validates whether a set of search filters can be handled by the cdc search backend.
|
|
|
+ """
|
|
|
+ for search_filter in search_filters or ():
|
|
|
+ supported_condition = self.supported_cdc_conditions_lookup.get(search_filter.key.name)
|
|
|
+ if not supported_condition:
|
|
|
+ return False
|
|
|
+ if (
|
|
|
+ supported_condition.operators
|
|
|
+ and search_filter.operator not in supported_condition.operators
|
|
|
+ ):
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ def query(
|
|
|
+ self,
|
|
|
+ projects: Sequence[Project],
|
|
|
+ retention_window_start: Optional[datetime],
|
|
|
+ group_queryset: BaseQuerySet,
|
|
|
+ environments: Optional[Sequence[Environment]],
|
|
|
+ sort_by: str,
|
|
|
+ limit: int,
|
|
|
+ cursor: Optional[Cursor],
|
|
|
+ count_hits: bool,
|
|
|
+ paginator_options: Optional[Mapping[str, Any]],
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]],
|
|
|
+ date_from: Optional[datetime],
|
|
|
+ date_to: Optional[datetime],
|
|
|
+ max_hits: Optional[int] = None,
|
|
|
+ referrer: Optional[str] = None,
|
|
|
+ actor: Optional[Any] = None,
|
|
|
+ aggregate_kwargs: Optional[PrioritySortWeights] = None,
|
|
|
+ ) -> CursorResult[Group]:
|
|
|
+ if not self.validate_cdc_search_filters(search_filters):
|
|
|
+ raise InvalidQueryForExecutor("Search filters invalid for this query executor")
|
|
|
+
|
|
|
+ start, end, retention_date = self.calculate_start_end(
|
|
|
+ retention_window_start, search_filters, date_from, date_to
|
|
|
+ )
|
|
|
+
|
|
|
+ if start == retention_date and end == retention_date:
|
|
|
+ # Both `start` and `end` must have been trimmed to `retention_date`,
|
|
|
+ # so this entire search was against a time range that is outside of
|
|
|
+ # retention. We'll return empty results to maintain backwards compatibility
|
|
|
+ # with Django search (for now).
|
|
|
+ return self.empty_result
|
|
|
+
|
|
|
+ if start >= end:
|
|
|
+ # TODO: This maintains backwards compatibility with Django search, but
|
|
|
+ # in the future we should find a way to notify the user that their search
|
|
|
+ # is invalid.
|
|
|
+ return self.empty_result
|
|
|
+
|
|
|
+ event_entity = self.entities["event"]
|
|
|
+ attr_entity = self.entities["attrs"]
|
|
|
+
|
|
|
+ where_conditions = [
|
|
|
+ Condition(Column("project_id", event_entity), Op.IN, [p.id for p in projects]),
|
|
|
+ Condition(Column("project_id", attr_entity), Op.IN, [p.id for p in projects]),
|
|
|
+ Condition(Column("timestamp", event_entity), Op.GTE, start),
|
|
|
+ Condition(Column("timestamp", event_entity), Op.LT, end),
|
|
|
+ ]
|
|
|
+ # TODO: This is still basically only handling status, handle this better once we introduce
|
|
|
+ # more conditions.
|
|
|
+ for search_filter in search_filters or ():
|
|
|
+ where_conditions.append(
|
|
|
+ Condition(
|
|
|
+ Column(search_filter.key.name, attr_entity),
|
|
|
+ Op.IN,
|
|
|
+ search_filter.value.raw_value,
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ if environments:
|
|
|
+ # TODO: Should this be handled via filter_keys, once we have a snql compatible version?
|
|
|
+ where_conditions.append(
|
|
|
+ Condition(
|
|
|
+ Column("environment", event_entity), Op.IN, [e.name for e in environments]
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ sort_func = self.last_seen_aggregation
|
|
|
+
|
|
|
+ having = []
|
|
|
+ if cursor is not None:
|
|
|
+ op = Op.GTE if cursor.is_prev else Op.LTE
|
|
|
+ having.append(Condition(sort_func, op, cursor.value))
|
|
|
+
|
|
|
+ tenant_ids = {"organization_id": projects[0].organization_id} if projects else None
|
|
|
+
|
|
|
+ query = Query(
|
|
|
+ match=Join([Relationship(event_entity, "attributes", attr_entity)]),
|
|
|
+ select=[
|
|
|
+ Column("group_id", attr_entity),
|
|
|
+ replace(sort_func, alias="score"),
|
|
|
+ ],
|
|
|
+ where=where_conditions,
|
|
|
+ groupby=[Column("group_id", attr_entity)],
|
|
|
+ having=having,
|
|
|
+ orderby=[OrderBy(sort_func, direction=Direction.DESC)],
|
|
|
+ limit=Limit(limit + 1),
|
|
|
+ )
|
|
|
+ request = Request(
|
|
|
+ dataset="events",
|
|
|
+ app_id="group_attributes",
|
|
|
+ query=query,
|
|
|
+ tenant_ids=tenant_ids,
|
|
|
+ )
|
|
|
+ data = snuba.raw_snql_query(request, referrer="search.snuba.group_attributes_search.query")[
|
|
|
+ "data"
|
|
|
+ ]
|
|
|
+
|
|
|
+ hits_query = Query(
|
|
|
+ match=Join([Relationship(event_entity, "attributes", attr_entity)]),
|
|
|
+ select=[
|
|
|
+ Function("uniq", [Column("group_id", attr_entity)], alias="count"),
|
|
|
+ ],
|
|
|
+ where=where_conditions,
|
|
|
+ )
|
|
|
+ hits = None
|
|
|
+ if count_hits:
|
|
|
+ request = Request(
|
|
|
+ dataset="events", app_id="group_attributes", query=hits_query, tenant_ids=tenant_ids
|
|
|
+ )
|
|
|
+ hits = snuba.raw_snql_query(
|
|
|
+ request, referrer="search.snuba.group_attributes_search.hits"
|
|
|
+ )["data"][0]["count"]
|
|
|
+
|
|
|
+ paginator_options = paginator_options or {}
|
|
|
+ paginator_results = SequencePaginator(
|
|
|
+ [(row["score"], row["g.group_id"]) for row in data],
|
|
|
+ reverse=True,
|
|
|
+ **paginator_options,
|
|
|
+ ).get_result(limit, cursor, known_hits=hits, max_hits=max_hits)
|
|
|
+
|
|
|
+ # We filter against `group_queryset` here so that we recheck all conditions in Postgres.
|
|
|
+ # Since replay between Postgres and Clickhouse can happen, we might get back results that
|
|
|
+ # have changed state in Postgres. By rechecking them we guarantee than any returned results
|
|
|
+ # have the correct state.
|
|
|
+ # TODO: This can result in us returning less than a full page of results, but shouldn't
|
|
|
+ # affect cursors. If we want to, we can iterate and query snuba until we manage to get a
|
|
|
+ # full page. In practice, this will likely only skip a couple of results at worst, and
|
|
|
+ # probably not be noticeable to the user, so holding off for now to reduce complexity.
|
|
|
+
|
|
|
+ groups = group_queryset.in_bulk(paginator_results.results)
|
|
|
+ paginator_results.results = [groups[k] for k in paginator_results.results if k in groups]
|
|
|
+ # TODO: Add types to paginators and remove this
|
|
|
+ return cast(CursorResult[Group], paginator_results)
|