Browse Source

ref(ddm): Switch correlations endpoint query parsing to use visitors (#63943)

Riccardo Busetti 1 year ago
parent
commit
73de44de26

+ 76 - 36
src/sentry/sentry_metrics/querying/metadata/metrics_correlations.py

@@ -17,19 +17,25 @@ from snuba_sdk import (
     Request,
 )
 from snuba_sdk.conditions import ConditionGroup
+from snuba_sdk.mql.mql import parse_mql
+from snuba_sdk.timeseries import Timeseries
 
 from sentry.exceptions import InvalidParams
 from sentry.models.environment import Environment
 from sentry.models.organization import Organization
 from sentry.models.project import Project
 from sentry.sentry_metrics.querying.common import SNUBA_QUERY_LIMIT
-from sentry.sentry_metrics.querying.errors import CorrelationsQueryExecutionError
-from sentry.sentry_metrics.querying.metadata.utils import (
-    add_environments_condition,
-    get_snuba_conditions_from_query,
-    transform_conditions_to_tags,
-    transform_conditions_with,
-    transform_latest_release_condition,
+from sentry.sentry_metrics.querying.errors import (
+    CorrelationsQueryExecutionError,
+    InvalidMetricsQueryError,
+)
+from sentry.sentry_metrics.querying.types import QueryCondition
+from sentry.sentry_metrics.querying.visitors import (
+    EnvironmentsInjectionVisitor,
+    LatestReleaseTransformationVisitor,
+    MappingTransformationVisitor,
+    QueryConditionVisitor,
+    TagsTransformationVisitor,
 )
 from sentry.snuba.dataset import Dataset, EntityKey
 from sentry.snuba.metrics.naming_layer.mri import (
@@ -129,6 +135,49 @@ class MetricCorrelations:
         return hash(self.metric_mri)
 
 
+class QueryConditions:
+    def __init__(self, conditions: List[QueryCondition]):
+        self._conditions = conditions
+        self._visitors: List[QueryConditionVisitor[QueryCondition]] = []
+
+    @classmethod
+    def build(cls, query: Optional[str], environments: Sequence[Environment]) -> "QueryConditions":
+        """
+        Returns a set of Snuba conditions from a query string which is assumed to contain filters in the MQL grammar.
+
+        Since MQL does not support parsing only filters, we have to create a phantom query to feed the parser,
+        in order for it to correctly resolve a `Timeseries` out of which we extract the `filters`.
+        """
+        # We want to create a phantom query to feed into the parser in order to be able to extract the conditions
+        # from the returned timeseries.
+        phantom_query = f"count(phantom){{{query or ''}}}"
+
+        parsed_phantom_query = parse_mql(phantom_query).query
+        if not isinstance(parsed_phantom_query, Timeseries):
+            # For now, we reuse data from `api` but we will soon lift out common components from that file.
+            raise InvalidMetricsQueryError("The supplied query is not valid")
+
+        if parsed_phantom_query.filters is None:
+            parsed_phantom_query = parsed_phantom_query.set_filters([])
+
+        # We inject the environments in the phantom query.
+        parsed_phantom_query = EnvironmentsInjectionVisitor(environments).visit(
+            parsed_phantom_query
+        )
+        return QueryConditions(cast(List[QueryCondition], parsed_phantom_query.filters))
+
+    def add_visitor(self, visitor: QueryConditionVisitor[QueryCondition]) -> "QueryConditions":
+        self._visitors.append(visitor)
+        return self
+
+    def get(self) -> List[QueryCondition]:
+        conditions = self._conditions
+        for visitor in self._visitors:
+            conditions = visitor.visit_group(conditions)
+
+        return conditions
+
+
 class CorrelationsSource(ABC):
     def __init__(
         self,
@@ -148,9 +197,9 @@ class CorrelationsSource(ABC):
         max_value: Optional[float],
         environments: Sequence[Environment],
     ) -> Sequence[Segment]:
-        conditions = get_snuba_conditions_from_query(query)
-        conditions = add_environments_condition(conditions, environments)
-        conditions = transform_latest_release_condition(conditions, self.projects)
+        conditions = QueryConditions.build(query, environments).add_visitor(
+            LatestReleaseTransformationVisitor(self.projects)
+        )
 
         return self._get_segments(
             metric_mri=metric_mri,
@@ -170,7 +219,7 @@ class CorrelationsSource(ABC):
     def _get_segments(
         self,
         metric_mri: str,
-        conditions: Optional[ConditionGroup],
+        conditions: QueryConditions,
         start: datetime,
         end: datetime,
         min_value: Optional[float],
@@ -183,7 +232,7 @@ class MetricsSummariesCorrelationsSource(CorrelationsSource):
     def _get_metrics_summaries_by_span(
         self,
         metric_mri: str,
-        conditions: Optional[ConditionGroup],
+        conditions: QueryConditions,
         start: datetime,
         end: datetime,
         min_value: Optional[float],
@@ -218,7 +267,7 @@ class MetricsSummariesCorrelationsSource(CorrelationsSource):
                 Condition(Column("end_timestamp"), Op.LT, end),
                 Condition(Column("metric_mri"), Op.EQ, metric_mri),
             ]
-            + (conditions or []),
+            + conditions.get(),
             having=having,
             groupby=[Column("span_id")],
             limit=Limit(SNUBA_QUERY_LIMIT),
@@ -294,18 +343,19 @@ class MetricsSummariesCorrelationsSource(CorrelationsSource):
     def _get_segments(
         self,
         metric_mri: str,
-        conditions: Optional[ConditionGroup],
+        conditions: QueryConditions,
         start: datetime,
         end: datetime,
         min_value: Optional[float],
         max_value: Optional[float],
     ) -> Sequence[Segment]:
-        transformed_conditions = transform_conditions_to_tags(conditions)
+        if conditions:
+            conditions.add_visitor(TagsTransformationVisitor(check_sentry_tags=False))
 
         # First, we fetch the spans we are interested in given the metric and the bounds.
         metric_summaries_by_span = self._get_metrics_summaries_by_span(
             metric_mri=metric_mri,
-            conditions=transformed_conditions,
+            conditions=conditions,
             start=start,
             end=end,
             min_value=min_value,
@@ -364,22 +414,17 @@ class TransactionDurationCorrelationsSource(CorrelationsSource):
     def _get_segments(
         self,
         metric_mri: str,
-        conditions: Optional[ConditionGroup],
+        conditions: QueryConditions,
         start: datetime,
         end: datetime,
         min_value: Optional[float],
         max_value: Optional[float],
     ) -> Sequence[Segment]:
-        where = []
+        where: List[QueryCondition] = []
 
-        transformed_conditions = transform_conditions_to_tags(
-            conditions=conditions, check_sentry_tags=True
-        )
-        transformed_conditions = transform_conditions_with(
-            conditions=transformed_conditions, mappings=SENTRY_TAG_TO_COLUMN_NAME
-        )
-        if transformed_conditions:
-            where += transformed_conditions
+        conditions.add_visitor(TagsTransformationVisitor(check_sentry_tags=True))
+        conditions.add_visitor(MappingTransformationVisitor(mappings=SENTRY_TAG_TO_COLUMN_NAME))
+        where += conditions.get()
 
         if min_value:
             where += [Condition(Column("duration"), Op.GTE, min_value)]
@@ -413,22 +458,17 @@ class MeasurementsCorrelationsSource(CorrelationsSource):
     def _get_segments(
         self,
         metric_mri: str,
-        conditions: Optional[ConditionGroup],
+        conditions: QueryConditions,
         start: datetime,
         end: datetime,
         min_value: Optional[float],
         max_value: Optional[float],
     ) -> Sequence[Segment]:
-        where = []
+        where: List[QueryCondition] = []
 
-        transformed_conditions = transform_conditions_to_tags(
-            conditions=conditions, check_sentry_tags=True
-        )
-        transformed_conditions = transform_conditions_with(
-            conditions=transformed_conditions, mappings=SENTRY_TAG_TO_COLUMN_NAME
-        )
-        if transformed_conditions:
-            where += transformed_conditions
+        conditions.add_visitor(TagsTransformationVisitor(check_sentry_tags=True))
+        conditions.add_visitor(MappingTransformationVisitor(mappings=SENTRY_TAG_TO_COLUMN_NAME))
+        where += conditions.get()
 
         measurement_name = self._extract_measurement_name(metric_mri)
         # We add this condition every time, since if a measurement is not set, Snuba will return 0, but it could also

+ 0 - 183
src/sentry/sentry_metrics/querying/metadata/utils.py

@@ -1,183 +0,0 @@
-from typing import Callable, Mapping, Optional, Sequence
-
-from snuba_sdk import Column, Condition, Timeseries
-from snuba_sdk.conditions import BooleanCondition, BooleanOp, ConditionGroup, Op
-from snuba_sdk.mql.mql import parse_mql
-
-from sentry.api.serializers import bulk_fetch_project_latest_releases
-from sentry.models.environment import Environment
-from sentry.models.project import Project
-from sentry.sentry_metrics.querying.errors import (
-    InvalidMetricsQueryError,
-    LatestReleaseNotFoundError,
-)
-
-
-def _visit_conditions(
-    conditions: ConditionGroup, block: Callable[[Condition], Optional[ConditionGroup]]
-) -> ConditionGroup:
-    """
-    Traverses a group of conditions, applies a function on each terminal condition and returns a transformed group.
-    """
-    transformed_conditions = []
-    for condition in conditions:
-        if isinstance(condition, BooleanCondition):
-            transformed_conditions.append(
-                BooleanCondition(
-                    op=condition.op,
-                    conditions=_visit_conditions(condition.conditions, block),
-                )
-            )
-        elif isinstance(condition, Condition):
-            if (conditions_to_replace := block(condition)) is not None:
-                transformed_conditions += conditions_to_replace
-            else:
-                transformed_conditions.append(condition)
-
-    return transformed_conditions
-
-
-def transform_conditions_to_tags(
-    conditions: Optional[ConditionGroup], check_sentry_tags: bool = False
-) -> Optional[ConditionGroup]:
-    """
-    Transforms all the conditions to work on tags, by wrapping each `Column` name with 'tags[x]' and `sentry_tags[x]`.
-
-    This function assumes that the query of a metric only refers to tags, since it can't be inferred that it's not
-    referring to tags by just looking at the string. The values that are not tags, are specific to the data layer.
-    """
-    if conditions is None:
-        return None
-
-    def _transform_to_tags(condition: Condition) -> Optional[ConditionGroup]:
-        if not isinstance(condition.lhs, Column):
-            return None
-
-        # We assume that all incoming conditions are on tags, since we do not allow filtering by project in the
-        # query filters.
-        tag_column = f"tags[{condition.lhs.name}]"
-        sentry_tag_column = f"sentry_tags[{condition.lhs.name}]"
-
-        if check_sentry_tags:
-            tag_column = f"tags[{condition.lhs.name}]"
-            # We might have tags across multiple nested structures such as `tags` and `sentry_tags` for this reason
-            # we want to emit a condition that spans both.
-            return [
-                BooleanCondition(
-                    op=BooleanOp.OR,
-                    conditions=[
-                        Condition(lhs=Column(name=tag_column), op=condition.op, rhs=condition.rhs),
-                        Condition(
-                            lhs=Column(name=sentry_tag_column),
-                            op=condition.op,
-                            rhs=condition.rhs,
-                        ),
-                    ],
-                )
-            ]
-        else:
-            return [Condition(lhs=Column(name=tag_column), op=condition.op, rhs=condition.rhs)]
-
-    return _visit_conditions(conditions, _transform_to_tags)
-
-
-def transform_conditions_with(
-    conditions: Optional[ConditionGroup], mappings: Optional[Mapping[str, str]]
-) -> Optional[ConditionGroup]:
-    """
-    Maps all the `Column`(s) whose `key` matches one of the supplied mappings. If found, replaces it with the mapped
-    value.
-    """
-    if conditions is None:
-        return None
-
-    if not mappings:
-        return conditions
-
-    def _transform_conditions_with(condition: Condition) -> Optional[ConditionGroup]:
-        if not isinstance(condition.lhs, Column):
-            return None
-
-        return [
-            Condition(
-                lhs=Column(name=mappings.get(condition.lhs.key, condition.lhs.name)),
-                op=condition.op,
-                rhs=condition.rhs,
-            )
-        ]
-
-    return _visit_conditions(conditions, _transform_conditions_with)
-
-
-def transform_latest_release_condition(
-    conditions: Optional[ConditionGroup], projects: Sequence[Project]
-):
-    """
-    Transforms all the conditions in the form `release:latest` in `release:[x, y...]` where `x` and `y` are
-    the latest releases of the supplied projects.
-    """
-    if conditions is None:
-        return None
-
-    def _transform_latest_release_condition(condition: Condition) -> Optional[ConditionGroup]:
-        # TODO: move to the visitors implementation by using `QueryCondition` visitors from `visitors.py`.
-        if not isinstance(condition.lhs, Column):
-            return None
-
-        if not (
-            condition.lhs.name == "release"
-            and isinstance(condition.rhs, str)
-            and condition.rhs == "latest"
-        ):
-            return None
-
-        latest_releases = bulk_fetch_project_latest_releases(projects)
-        if not latest_releases:
-            raise LatestReleaseNotFoundError(
-                "Latest release(s) not found for the supplied projects"
-            )
-
-        return [
-            Condition(
-                lhs=condition.lhs,
-                op=Op.IN,
-                rhs=[latest_release.version for latest_release in latest_releases],
-            )
-        ]
-
-    return _visit_conditions(conditions, _transform_latest_release_condition)
-
-
-def add_environments_condition(
-    conditions: Optional[ConditionGroup], environments: Sequence[Environment]
-) -> Optional[ConditionGroup]:
-    """
-    Adds the environment filter inside a condition group in the form (environment_condition AND existing_conditions).
-    """
-    if not environments:
-        return conditions
-
-    environments_names = [environment.name for environment in environments]
-    return [Condition(Column("environment"), Op.IN, environments_names)] + (conditions or [])
-
-
-def get_snuba_conditions_from_query(query: Optional[str]) -> Optional[ConditionGroup]:
-    """
-    Returns a set of Snuba conditions from a query string which is assumed to contain filters in the MQL grammar.
-
-    Since MQL does not support parsing only filters, we have to create a phantom query to feed the parser,
-    in order for it to correctly resolve a `Timeseries` out of which we extract the `filters`.
-    """
-    if not query:
-        return None
-
-    # We want to create a phantom query to feed into the parser in order to be able to extract the conditions
-    # from the returned timeseries.
-    phantom_query = f"count(phantom){{{query}}}"
-
-    parsed_phantom_query = parse_mql(phantom_query).query
-    if not isinstance(parsed_phantom_query, Timeseries):
-        # For now, we reuse data from `api` but we will soon lift out common components from that file.
-        raise InvalidMetricsQueryError("The supplied query is not valid")
-
-    return parsed_phantom_query.filters

+ 60 - 2
src/sentry/sentry_metrics/querying/visitors.py

@@ -1,5 +1,5 @@
 from abc import ABC
-from typing import Generic, Optional, Sequence, TypeVar
+from typing import Generic, Mapping, Optional, Sequence, TypeVar
 
 from snuba_sdk import BooleanCondition, BooleanOp, Column, Condition, Formula, Op, Timeseries
 from snuba_sdk.conditions import ConditionGroup
@@ -47,6 +47,9 @@ class QueryConditionVisitor(ABC, Generic[TVisited]):
     """
 
     def visit_group(self, condition_group: ConditionGroup) -> ConditionGroup:
+        if not condition_group:
+            return condition_group
+
         visited_conditions = []
         for condition in condition_group:
             visited_conditions.append(self.visit(condition))
@@ -82,7 +85,6 @@ class EnvironmentsInjectionVisitor(QueryExpressionVisitor[QueryExpression]):
         self._environment_names = [environment.name for environment in environments]
 
     def _visit_timeseries(self, timeseries: Timeseries) -> QueryExpression:
-        # TODO: inject the filter in the formula filters also.
         if self._environment_names:
             current_filters = timeseries.filters if timeseries.filters else []
             current_filters.extend(
@@ -181,3 +183,59 @@ class LatestReleaseTransformationVisitor(QueryConditionVisitor[QueryCondition]):
             op=Op.IN,
             rhs=[latest_release.version for latest_release in latest_releases],
         )
+
+
+class TagsTransformationVisitor(QueryConditionVisitor[QueryCondition]):
+    """
+    Visitor that recursively transforms all conditions to work on tags in the form `tags[x]`.
+    """
+
+    def __init__(self, check_sentry_tags: bool):
+        self._check_sentry_tags = check_sentry_tags
+
+    def _visit_condition(self, condition: Condition) -> QueryCondition:
+        if not isinstance(condition.lhs, Column):
+            return condition
+
+        # We assume that all incoming conditions are on tags, since we do not allow filtering by project in the
+        # query filters.
+        tag_column = f"tags[{condition.lhs.name}]"
+        sentry_tag_column = f"sentry_tags[{condition.lhs.name}]"
+
+        if self._check_sentry_tags:
+            tag_column = f"tags[{condition.lhs.name}]"
+            # We might have tags across multiple nested structures such as `tags` and `sentry_tags` for this reason
+            # we want to emit a condition that spans both.
+            return BooleanCondition(
+                op=BooleanOp.OR,
+                conditions=[
+                    Condition(lhs=Column(name=tag_column), op=condition.op, rhs=condition.rhs),
+                    Condition(
+                        lhs=Column(name=sentry_tag_column),
+                        op=condition.op,
+                        rhs=condition.rhs,
+                    ),
+                ],
+            )
+        else:
+            return Condition(lhs=Column(name=tag_column), op=condition.op, rhs=condition.rhs)
+
+
+class MappingTransformationVisitor(QueryConditionVisitor[QueryCondition]):
+    """
+    Visitor that recursively transforms all conditions whose `key` matches one of the supplied mappings. If found,
+    replaces it with the mapped value.
+    """
+
+    def __init__(self, mappings: Mapping[str, str]):
+        self._mappings = mappings
+
+    def _visit_condition(self, condition: Condition) -> QueryCondition:
+        if not isinstance(condition.lhs, Column):
+            return condition
+
+        return Condition(
+            lhs=Column(name=self._mappings.get(condition.lhs.key, condition.lhs.name)),
+            op=condition.op,
+            rhs=condition.rhs,
+        )