|
@@ -4,12 +4,11 @@ import functools
|
|
|
import logging
|
|
|
import time
|
|
|
from abc import ABCMeta, abstractmethod
|
|
|
-from copy import deepcopy
|
|
|
from dataclasses import replace
|
|
|
from datetime import datetime, timedelta
|
|
|
from hashlib import md5
|
|
|
from heapq import merge
|
|
|
-from typing import Any, Dict, List, Mapping, Sequence, Set, Tuple, cast
|
|
|
+from typing import Any, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, cast
|
|
|
|
|
|
import sentry_sdk
|
|
|
from django.utils import timezone
|
|
@@ -30,21 +29,25 @@ from snuba_sdk.query import Query
|
|
|
from snuba_sdk.relationships import Relationship
|
|
|
|
|
|
from sentry import features, options
|
|
|
-from sentry.api.event_search import SearchFilter, SearchKey, SearchValue
|
|
|
+from sentry.api.event_search import SearchFilter
|
|
|
from sentry.api.paginator import DateTimePaginator, Paginator, SequencePaginator
|
|
|
from sentry.api.serializers.models.group import SKIP_SNUBA_FIELDS
|
|
|
from sentry.constants import ALLOWED_FUTURE_DELTA
|
|
|
from sentry.db.models.manager.base_query_set import BaseQuerySet
|
|
|
-from sentry.models import Environment, Group, Optional, Organization, Project
|
|
|
+from sentry.issues.search import MergeableRow, SearchQueryPartial, search_strategies_for_categories
|
|
|
+from sentry.models import Environment, Group, Project
|
|
|
from sentry.search.events.fields import DateArg
|
|
|
from sentry.search.events.filter import convert_search_filter_to_snuba_query
|
|
|
from sentry.search.utils import validate_cdc_search_filters
|
|
|
from sentry.types.issues import GROUP_TYPE_TO_CATEGORY, GroupCategory, GroupType
|
|
|
from sentry.utils import json, metrics, snuba
|
|
|
from sentry.utils.cursors import Cursor, CursorResult
|
|
|
+from sentry.utils.snuba import SnubaQueryParams, aliased_query_params, bulk_raw_query
|
|
|
|
|
|
|
|
|
-def get_search_filter(search_filters: Sequence[SearchFilter], name: str, operator: str) -> Any:
|
|
|
+def get_search_filter(
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]], name: str, operator: str
|
|
|
+) -> Optional[Any]:
|
|
|
"""
|
|
|
Finds the value of a search filter with the passed name and operator. If
|
|
|
multiple values are found, returns the most restrictive value
|
|
@@ -69,7 +72,8 @@ def get_search_filter(search_filters: Sequence[SearchFilter], name: str, operato
|
|
|
|
|
|
class AbstractQueryExecutor(metaclass=ABCMeta):
|
|
|
"""This class serves as a template for Query Executors.
|
|
|
- We subclass it in order to implement query methods (we use it to implement two classes: joined Postgres+Snuba queries, and Snuba only queries)
|
|
|
+ We subclass it in order to implement query methods (we use it to implement two classes: joined
|
|
|
+ Postgres+Snuba queries, and Snuba only queries)
|
|
|
It's used to keep the query logic out of the actual search backend,
|
|
|
which can now just build query parameters and use the appropriate query executor to run the query
|
|
|
"""
|
|
@@ -134,43 +138,12 @@ class AbstractQueryExecutor(metaclass=ABCMeta):
|
|
|
We usually return a paginator object, which contains the results and the number of hits"""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
- def update_conditions(
|
|
|
- self,
|
|
|
- key: str,
|
|
|
- operator: str,
|
|
|
- value: str,
|
|
|
- organization_id: int,
|
|
|
- project_ids: Sequence[int],
|
|
|
- environments: Sequence[Environment],
|
|
|
- environment_ids: Sequence[int],
|
|
|
- conditions: List[Any],
|
|
|
- ) -> Sequence[Any]:
|
|
|
- search_filter = SearchFilter(
|
|
|
- key=SearchKey(name=key),
|
|
|
- operator=operator,
|
|
|
- value=SearchValue(raw_value=value),
|
|
|
- )
|
|
|
- converted_filter = convert_search_filter_to_snuba_query(
|
|
|
- search_filter,
|
|
|
- params={
|
|
|
- "organization_id": organization_id,
|
|
|
- "project_id": project_ids,
|
|
|
- "environment": environments,
|
|
|
- },
|
|
|
- )
|
|
|
- converted_filter = self._transform_converted_filter(
|
|
|
- search_filter, converted_filter, project_ids, environment_ids
|
|
|
- )
|
|
|
- conditions = deepcopy(conditions)
|
|
|
- conditions.append(converted_filter)
|
|
|
- return conditions
|
|
|
-
|
|
|
def snuba_search(
|
|
|
self,
|
|
|
start: datetime,
|
|
|
end: datetime,
|
|
|
project_ids: Sequence[int],
|
|
|
- environment_ids: Sequence[int],
|
|
|
+ environment_ids: Optional[Sequence[int]],
|
|
|
sort_field: str,
|
|
|
organization_id: int,
|
|
|
cursor: Optional[Cursor] = None,
|
|
@@ -202,7 +175,7 @@ class AbstractQueryExecutor(metaclass=ABCMeta):
|
|
|
conditions = []
|
|
|
having = []
|
|
|
group_categories: Set[GroupCategory] = set()
|
|
|
- for search_filter in search_filters:
|
|
|
+ for search_filter in search_filters or ():
|
|
|
if search_filter.key.name in ("issue.category", "issue.type"):
|
|
|
group_categories.update(
|
|
|
GROUP_TYPE_TO_CATEGORY[GroupType(value)]
|
|
@@ -263,103 +236,87 @@ class AbstractQueryExecutor(metaclass=ABCMeta):
|
|
|
else:
|
|
|
# Get the top matching groups by score, i.e. the actual search results
|
|
|
# in the order that we want them.
|
|
|
- orderby = [
|
|
|
- f"-{sort_field}",
|
|
|
- "group_id",
|
|
|
- ] # ensure stable sort within the same score
|
|
|
+ orderby = [f"-{sort_field}", "group_id"] # ensure stable sort within the same score
|
|
|
referrer = "search"
|
|
|
|
|
|
- query_partial = functools.partial(
|
|
|
- snuba.aliased_query,
|
|
|
- dataset=snuba.Dataset.Discover,
|
|
|
- start=start,
|
|
|
- end=end,
|
|
|
- selected_columns=selected_columns,
|
|
|
- groupby=["group_id"],
|
|
|
- limit=limit,
|
|
|
- offset=offset,
|
|
|
- orderby=orderby,
|
|
|
- referrer=referrer,
|
|
|
- having=having,
|
|
|
- filter_keys=filters,
|
|
|
- totals=True, # Needs to have totals_mode=after_having_exclusive so we get groups matching HAVING only
|
|
|
- turbo=get_sample, # Turn off FINAL when in sampling mode
|
|
|
- sample=1, # Don't use clickhouse sampling, even when in turbo mode.
|
|
|
+ query_partial: SearchQueryPartial = cast(
|
|
|
+ SearchQueryPartial,
|
|
|
+ functools.partial(
|
|
|
+ aliased_query_params,
|
|
|
+ start=start,
|
|
|
+ end=end,
|
|
|
+ selected_columns=selected_columns,
|
|
|
+ groupby=["group_id"],
|
|
|
+ limit=limit,
|
|
|
+ offset=offset,
|
|
|
+ orderby=orderby,
|
|
|
+ referrer=referrer,
|
|
|
+ having=having,
|
|
|
+ filter_keys=filters,
|
|
|
+ totals=True, # Needs to have totals_mode=after_having_exclusive so we get groups matching HAVING only
|
|
|
+ turbo=get_sample, # Turn off FINAL when in sampling mode
|
|
|
+ sample=1, # Don't use clickhouse sampling, even when in turbo mode.
|
|
|
+ ),
|
|
|
)
|
|
|
|
|
|
- rows: Optional[List[Dict[str, int]]] = []
|
|
|
- total = 0
|
|
|
- row_length = 0
|
|
|
- if not group_categories or GroupCategory.ERROR in group_categories:
|
|
|
- error_conditions = self.update_conditions(
|
|
|
- "event.type",
|
|
|
- "!=",
|
|
|
- "transaction",
|
|
|
- organization_id,
|
|
|
- project_ids,
|
|
|
- environments,
|
|
|
- environment_ids,
|
|
|
- conditions,
|
|
|
- )
|
|
|
- snuba_error_results = query_partial(
|
|
|
- conditions=error_conditions,
|
|
|
- aggregations=aggregations,
|
|
|
- condition_resolver=snuba.get_snuba_column_name,
|
|
|
+ query_params_for_categories: Sequence[SnubaQueryParams] = list(
|
|
|
+ filter(
|
|
|
+ None,
|
|
|
+ [
|
|
|
+ fn_query_params(
|
|
|
+ group_categories,
|
|
|
+ aggregations,
|
|
|
+ query_partial,
|
|
|
+ organization_id,
|
|
|
+ project_ids,
|
|
|
+ environments,
|
|
|
+ conditions,
|
|
|
+ )
|
|
|
+ for fn_query_params in search_strategies_for_categories(group_categories)
|
|
|
+ ],
|
|
|
)
|
|
|
- rows = snuba_error_results["data"]
|
|
|
- total = snuba_error_results["totals"]["total"]
|
|
|
- row_length = len(rows)
|
|
|
+ )
|
|
|
|
|
|
- organization = Organization.objects.get(id=organization_id)
|
|
|
- if features.has("organizations:performance-issues", organization) and (
|
|
|
- not group_categories or GroupCategory.PERFORMANCE in group_categories
|
|
|
- ):
|
|
|
- transaction_conditions = self.update_conditions(
|
|
|
- "event.type",
|
|
|
- "=",
|
|
|
- "transaction",
|
|
|
- organization_id,
|
|
|
- project_ids,
|
|
|
- environments,
|
|
|
- environment_ids,
|
|
|
- conditions,
|
|
|
- )
|
|
|
- mod_agg = aggregations.copy() if aggregations else []
|
|
|
- mod_agg.insert(0, ["arrayJoin", ["group_ids"], "group_id"])
|
|
|
-
|
|
|
- snuba_transaction_results = query_partial(
|
|
|
- conditions=transaction_conditions,
|
|
|
- aggregations=mod_agg,
|
|
|
- condition_resolver=functools.partial(
|
|
|
- snuba.get_snuba_column_name, dataset=snuba.Dataset.Transactions
|
|
|
+ bulk_query_results = bulk_raw_query(query_params_for_categories, referrer="search")
|
|
|
+
|
|
|
+ # [([row1a, row2a,], totala, row_lengtha), ([row1b, row2b,], totalb, row_lengthb), ...]
|
|
|
+ mapped_results: Sequence[Tuple[Iterable[MergeableRow], int, int]] = list(
|
|
|
+ map(
|
|
|
+ lambda bulk_result: (
|
|
|
+ bulk_result["data"],
|
|
|
+ bulk_result["totals"]["total"],
|
|
|
+ len(bulk_result),
|
|
|
),
|
|
|
+ filter(lambda bulk_result: bool(bulk_result), bulk_query_results),
|
|
|
)
|
|
|
+ )
|
|
|
|
|
|
- def keyfunc(row: Dict[str, int]) -> Optional[int]:
|
|
|
- return row.get("group_id")
|
|
|
-
|
|
|
- txn_rows = snuba_transaction_results["data"]
|
|
|
- transaction_total = snuba_transaction_results["totals"]["total"]
|
|
|
-
|
|
|
- if transaction_total:
|
|
|
- total += transaction_total
|
|
|
+ merged_results: Tuple[Iterable[MergeableRow], int, int] = functools.reduce(
|
|
|
+ lambda left, right: (
|
|
|
+ merge(left[0], right[0], key=lambda row: row.get("group_id")),
|
|
|
+ left[1] + right[1],
|
|
|
+ left[2] + right[2],
|
|
|
+ ),
|
|
|
+ mapped_results,
|
|
|
+ (cast(Iterable[MergeableRow], []), 0, 0),
|
|
|
+ )
|
|
|
|
|
|
- if txn_rows:
|
|
|
- row_length += len(txn_rows)
|
|
|
- rows = merge(rows, txn_rows, key=keyfunc)
|
|
|
+ rows: Sequence[MergeableRow] = list(merged_results[0])
|
|
|
+ total: int = merged_results[1]
|
|
|
+ row_length: int = merged_results[2]
|
|
|
|
|
|
if not get_sample:
|
|
|
metrics.timing("snuba.search.num_result_groups", row_length)
|
|
|
|
|
|
- return [(row["group_id"], row[sort_field]) for row in rows], total
|
|
|
+ return [(row["group_id"], row[sort_field]) for row in rows], total # type: ignore
|
|
|
|
|
|
+ @staticmethod
|
|
|
def _transform_converted_filter(
|
|
|
- self,
|
|
|
search_filter: Sequence[SearchFilter],
|
|
|
- converted_filter: Optional[Sequence[any]],
|
|
|
+ converted_filter: Optional[Sequence[Any]],
|
|
|
project_ids: Sequence[int],
|
|
|
environment_ids: Optional[Sequence[int]] = None,
|
|
|
- ) -> Optional[Sequence[any]]:
|
|
|
+ ) -> Optional[Sequence[Any]]:
|
|
|
"""
|
|
|
This method serves as a hook - after we convert the search_filter into a
|
|
|
snuba compatible filter (which converts it in a general dataset
|
|
@@ -439,6 +396,7 @@ class PostgresSnubaQueryExecutor(AbstractQueryExecutor):
|
|
|
) -> CursorResult[Group]:
|
|
|
now = timezone.now()
|
|
|
end = None
|
|
|
+ paginator_options = {} if paginator_options is None else paginator_options
|
|
|
end_params = [_f for _f in [date_to, get_search_filter(search_filters, "date", "<")] if _f]
|
|
|
if end_params:
|
|
|
end = min(end_params)
|
|
@@ -484,7 +442,7 @@ class PostgresSnubaQueryExecutor(AbstractQueryExecutor):
|
|
|
# This handles tags and date parameters for search filters.
|
|
|
not [
|
|
|
sf
|
|
|
- for sf in search_filters
|
|
|
+ for sf in (search_filters or ())
|
|
|
if sf.key.name not in self.postgres_only_fields.union(["date"])
|
|
|
]
|
|
|
):
|
|
@@ -685,13 +643,13 @@ class PostgresSnubaQueryExecutor(AbstractQueryExecutor):
|
|
|
projects: Sequence[Project],
|
|
|
retention_window_start: Optional[datetime],
|
|
|
group_queryset: Query,
|
|
|
- environments: Sequence[Environment],
|
|
|
+ environments: Optional[Sequence[Environment]],
|
|
|
sort_by: str,
|
|
|
limit: int,
|
|
|
cursor: Cursor | None,
|
|
|
count_hits: bool,
|
|
|
paginator_options: Mapping[str, Any],
|
|
|
- search_filters: Sequence[SearchFilter],
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]],
|
|
|
start: datetime,
|
|
|
end: datetime,
|
|
|
) -> Optional[int]:
|
|
@@ -764,6 +722,7 @@ class PostgresSnubaQueryExecutor(AbstractQueryExecutor):
|
|
|
hit_ratio = filtered_count / float(snuba_count)
|
|
|
hits = int(hit_ratio * snuba_total)
|
|
|
return hits
|
|
|
+ return None
|
|
|
|
|
|
|
|
|
class InvalidQueryForExecutor(Exception):
|
|
@@ -851,7 +810,7 @@ class CdcPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
def calculate_start_end(
|
|
|
self,
|
|
|
retention_window_start: Optional[datetime],
|
|
|
- search_filters: Sequence[SearchFilter],
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]],
|
|
|
date_from: Optional[datetime],
|
|
|
date_to: Optional[datetime],
|
|
|
) -> Tuple[datetime, datetime, datetime]:
|
|
@@ -875,13 +834,13 @@ class CdcPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
projects: Sequence[Project],
|
|
|
retention_window_start: Optional[datetime],
|
|
|
group_queryset: BaseQuerySet,
|
|
|
- environments: Sequence[Environment],
|
|
|
+ environments: Optional[Sequence[Environment]],
|
|
|
sort_by: str,
|
|
|
limit: int,
|
|
|
cursor: Optional[Cursor],
|
|
|
count_hits: bool,
|
|
|
- paginator_options: Mapping[str, Any],
|
|
|
- search_filters: Sequence[SearchFilter],
|
|
|
+ paginator_options: Optional[Mapping[str, Any]],
|
|
|
+ search_filters: Optional[Sequence[SearchFilter]],
|
|
|
date_from: Optional[datetime],
|
|
|
date_to: Optional[datetime],
|
|
|
max_hits: Optional[int] = None,
|
|
@@ -917,7 +876,7 @@ class CdcPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
]
|
|
|
# TODO: This is still basically only handling status, handle this better once we introduce
|
|
|
# more conditions.
|
|
|
- for search_filter in search_filters:
|
|
|
+ for search_filter in search_filters or ():
|
|
|
where_conditions.append(
|
|
|
Condition(
|
|
|
Column(search_filter.key.name, e_group), Op.IN, search_filter.value.raw_value
|
|
@@ -966,6 +925,7 @@ class CdcPostgresSnubaQueryExecutor(PostgresSnubaQueryExecutor):
|
|
|
0
|
|
|
]["count"]
|
|
|
|
|
|
+ paginator_options = paginator_options or {}
|
|
|
paginator_results = SequencePaginator(
|
|
|
[(row["score"], row["g.id"]) for row in data],
|
|
|
reverse=True,
|