Browse Source

Enable queries using project slug as filter and groupby in Metrics API (#69111)

Simon Hellmayr 10 months ago
parent
commit
7780004636

+ 14 - 4
src/sentry/sentry_metrics/querying/data/api.py

@@ -1,6 +1,5 @@
 from collections.abc import Sequence
 from datetime import datetime
-from typing import cast
 
 from snuba_sdk import MetricsQuery, MetricsScope, Rollup
 
@@ -8,18 +7,25 @@ from sentry import features
 from sentry.models.environment import Environment
 from sentry.models.organization import Organization
 from sentry.models.project import Project
-from sentry.sentry_metrics.querying.data.execution import QueryExecutor, QueryResult
+from sentry.sentry_metrics.querying.data.execution import QueryExecutor
+from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig, Project2ProjectIDMapper
 from sentry.sentry_metrics.querying.data.parsing import QueryParser
+from sentry.sentry_metrics.querying.data.postprocessing.base import run_post_processing_steps
+from sentry.sentry_metrics.querying.data.postprocessing.remapping import QueryRemappingStep
 from sentry.sentry_metrics.querying.data.preparation.base import (
     IntermediateQuery,
+    PreparationStep,
     run_preparation_steps,
 )
+from sentry.sentry_metrics.querying.data.preparation.mapping import QueryMappingStep
 from sentry.sentry_metrics.querying.data.preparation.units_normalization import (
     UnitsNormalizationStep,
 )
 from sentry.sentry_metrics.querying.data.query import MQLQueriesResult, MQLQuery
 from sentry.sentry_metrics.querying.types import QueryType
 
+DEFAULT_MAPPINGS: MapperConfig = MapperConfig().add(Project2ProjectIDMapper)
+
 
 def run_queries(
     mql_queries: Sequence[MQLQuery],
@@ -62,12 +68,15 @@ def run_queries(
             )
         )
 
-    preparation_steps = []
+    preparation_steps: list[PreparationStep] = []
+
     if features.has(
         "organizations:ddm-metrics-api-unit-normalization", organization=organization, actor=None
     ):
         preparation_steps.append(UnitsNormalizationStep())
 
+    preparation_steps.append(QueryMappingStep(projects, DEFAULT_MAPPINGS))
+
     # We run a series of preparation steps which operate on the entire list of queries.
     intermediate_queries = run_preparation_steps(intermediate_queries, *preparation_steps)
 
@@ -77,6 +86,7 @@ def run_queries(
         executor.schedule(intermediate_query=intermediate_query, query_type=query_type)
 
     results = executor.execute()
+    results = run_post_processing_steps(results, QueryRemappingStep(projects))
 
     # We wrap the result in a class that exposes some utils methods to operate on results.
-    return MQLQueriesResult(cast(list[QueryResult], results))
+    return MQLQueriesResult(results)

+ 24 - 5
src/sentry/sentry_metrics/querying/data/execution.py

@@ -1,5 +1,5 @@
 from collections.abc import Mapping, Sequence
-from dataclasses import dataclass, replace
+from dataclasses import dataclass, field, replace
 from datetime import datetime
 from enum import Enum
 from typing import Any, Union, cast
@@ -11,6 +11,7 @@ from snuba_sdk.conditions import BooleanCondition, BooleanOp, Condition, Op
 from sentry.models.organization import Organization
 from sentry.models.project import Project
 from sentry.sentry_metrics.querying.constants import SNUBA_QUERY_LIMIT
+from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
 from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery
 from sentry.sentry_metrics.querying.data.utils import adjust_time_bounds_with_interval
 from sentry.sentry_metrics.querying.errors import (
@@ -145,6 +146,7 @@ class ScheduledQuery:
     unit_family: UnitFamily | None = None
     unit: MeasurementUnit | None = None
     scaling_factor: float | None = None
+    mappers: list[Mapper] = field(default_factory=list)
 
     def initialize(
         self,
@@ -318,7 +320,7 @@ class ScheduledQuery:
         return metrics_query, None
 
 
-@dataclass(frozen=True)
+@dataclass
 class QueryResult:
     """
     Represents the result of a ScheduledQuery containing its associated series and totals results.
@@ -445,12 +447,24 @@ class QueryResult:
 
     @property
     def series(self) -> Sequence[Mapping[str, Any]]:
+        if "series" not in self.result:
+            return []
         return self.result["series"]["data"]
 
+    @series.setter
+    def series(self, value: Sequence[Mapping[str, Any]]) -> None:
+        self.result["series"]["data"] = value
+
     @property
     def totals(self) -> Sequence[Mapping[str, Any]]:
+        if "totals" not in self.result:
+            return []
         return self.result["totals"]["data"]
 
+    @totals.setter
+    def totals(self, value: Sequence[Mapping[str, Any]]) -> None:
+        self.result["totals"]["data"] = value
+
     @property
     def meta(self) -> Sequence[Mapping[str, str]]:
         # By default, we extract the metadata from the totals query, if that is not there we extract from the series
@@ -464,7 +478,11 @@ class QueryResult:
         # that we can correctly render groups in case they are not returned from the db because of missing data.
         #
         # Sorting of the groups is done to maintain consistency across function calls.
-        return sorted(UsedGroupBysVisitor().visit(self._any_query().metrics_query.query))
+        scheduled_query = self._any_query()
+        mappers = [mapper for mapper in scheduled_query.mappers if mapper.applied_on_groupby]
+        return sorted(
+            UsedGroupBysVisitor(mappers=mappers).visit(scheduled_query.metrics_query.query)
+        )
 
     @property
     def interval(self) -> int | None:
@@ -774,7 +792,7 @@ class QueryExecutor:
         while continue_execution:
             continue_execution = self._bulk_execute()
 
-    def execute(self) -> Sequence[QueryResult]:
+    def execute(self) -> list[QueryResult]:
         """
         Executes the scheduled queries in the execution loop.
 
@@ -798,7 +816,7 @@ class QueryExecutor:
                     "Not all queries were executed in the execution loop"
                 )
 
-        return cast(Sequence[QueryResult], self._query_results)
+        return cast(list[QueryResult], self._query_results)
 
     def schedule(self, intermediate_query: IntermediateQuery, query_type: QueryType):
         """
@@ -813,6 +831,7 @@ class QueryExecutor:
             unit_family=intermediate_query.unit_family,
             unit=intermediate_query.unit,
             scaling_factor=intermediate_query.scaling_factor,
+            mappers=intermediate_query.mappers,
         )
 
         # In case the user chooses to run also a series query, we will duplicate the query and chain it after totals.

+ 0 - 0
src/sentry/sentry_metrics/querying/data/mapping/__init__.py


+ 94 - 0
src/sentry/sentry_metrics/querying/data/mapping/mapper.py

@@ -0,0 +1,94 @@
+import abc
+from collections.abc import Sequence
+from typing import Any, TypeVar
+
+from sentry.models.project import Project
+
+
+class Mapper(abc.ABC):
+    from_key: str = ""
+    to_key: str = ""
+    applied_on_groupby: bool = False
+
+    def __init__(self):
+        # This exists to satisfy mypy, which complains otherwise
+        self.map: dict[Any, Any] = {}
+
+    def __hash__(self):
+        return hash((self.from_key, self.to_key))
+
+    @abc.abstractmethod
+    def forward(self, projects: Sequence[Project], value: Any) -> Any:
+        return value
+
+    @abc.abstractmethod
+    def backward(self, projects: Sequence[Project], value: Any) -> Any:
+        return value
+
+
+TMapper = TypeVar("TMapper", bound=Mapper)
+
+
+class MapperConfig:
+    def __init__(self):
+        self.mappers: set[type[Mapper]] = set()
+
+    def add(self, mapper: type[Mapper]) -> "MapperConfig":
+        self.mappers.add(mapper)
+        return self
+
+    def get(self, from_key: str | None = None, to_key: str | None = None) -> type[Mapper] | None:
+        for mapper in self.mappers:
+            if mapper.from_key == from_key:
+                return mapper
+            if mapper.to_key == to_key:
+                return mapper
+        return None
+
+
+def get_or_create_mapper(
+    mapper_config: MapperConfig,
+    mappers: list[Mapper],
+    from_key: str | None = None,
+    to_key: str | None = None,
+) -> Mapper | None:
+    # retrieve the mapper type that is applicable for the given key
+    mapper_class = mapper_config.get(from_key=from_key, to_key=to_key)
+    # check if a mapper of the type already exists
+    if mapper_class:
+        for mapper in mappers:
+            if mapper_class == type(mapper):
+                # if a mapper already exists, return the existing mapper
+                return mapper
+        else:
+            # if no mapper exists yet, instantiate the object and append it to the mappers list
+            mapper_instance = mapper_class()
+            mappers.append(mapper_instance)
+            return mapper_instance
+    else:
+        # if no mapper is configured for the key, return None
+        return None
+
+
+class Project2ProjectIDMapper(Mapper):
+    from_key: str = "project"
+    to_key: str = "project_id"
+
+    def __init__(self):
+        super().__init__()
+
+    def forward(self, projects: Sequence[Project], value: str) -> int:
+        if value not in self.map:
+            self.map[value] = None
+            for project in projects:
+                if project.slug == value:
+                    self.map[value] = project.id
+        return self.map[value]
+
+    def backward(self, projects: Sequence[Project], value: int) -> str:
+        if value not in self.map:
+            for project in projects:
+                if project.id == value:
+                    self.map[value] = project.slug
+
+        return self.map[value]

+ 0 - 0
src/sentry/sentry_metrics/querying/data/postprocessing/__init__.py


+ 37 - 0
src/sentry/sentry_metrics/querying/data/postprocessing/base.py

@@ -0,0 +1,37 @@
+from abc import ABC, abstractmethod
+
+from sentry.sentry_metrics.querying.data.execution import QueryResult
+
+
+class PostProcessingStep(ABC):
+    """
+    Represents an abstract step that post-processes a collection of QueryResult objects.
+
+    The post-processing of these objects might include transforming them or just obtaining some intermediate data that
+    is useful to compute other things before returning the results.
+    """
+
+    @abstractmethod
+    def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
+        """
+        Runs the post-processing steps on a list of query results.
+
+        Returns:
+            A list of post-processed query results.
+        """
+        raise NotImplementedError
+
+
+def run_post_processing_steps(query_results: list[QueryResult], *steps) -> list[QueryResult]:
+    """
+    Takes a series of query results and steps and runs the post-processing steps one after each other in order they are
+    supplied in.
+
+    Returns:
+        A list of query results after running the post-processing steps.
+    """
+    for step in steps:
+        if isinstance(step, PostProcessingStep):
+            query_results = step.run(query_results=query_results)
+
+    return query_results

+ 54 - 0
src/sentry/sentry_metrics/querying/data/postprocessing/remapping.py

@@ -0,0 +1,54 @@
+from collections.abc import Mapping, Sequence
+from copy import deepcopy
+from typing import Any, cast
+
+from sentry.models.project import Project
+from sentry.sentry_metrics.querying.data.execution import QueryResult
+from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
+from sentry.sentry_metrics.querying.data.postprocessing.base import PostProcessingStep
+
+
+class QueryRemappingStep(PostProcessingStep):
+    def __init__(self, projects: Sequence[Project]):
+        self.projects = projects
+
+    def run(self, query_results: list[QueryResult]) -> list[QueryResult]:
+        for query_result in query_results:
+            if (
+                query_result.totals is not None
+                and query_result.totals_query is not None
+                and len(query_result.totals) > 0
+            ):
+                query_result.totals = self._unmap_data(
+                    query_result.totals, query_result.totals_query.mappers
+                )
+            if (
+                query_result.series is not None
+                and query_result.series_query is not None
+                and len(query_result.series) > 0
+            ):
+                query_result.series = self._unmap_data(
+                    query_result.series, query_result.series_query.mappers
+                )
+
+        return query_results
+
+    def _unmap_data(
+        self, data: Sequence[Mapping[str, Any]], mappers: list[Mapper]
+    ) -> Sequence[Mapping[str, Any]]:
+        unmapped_data: list[dict[str, Any]] = cast(list[dict[str, Any]], deepcopy(data))
+        for element in unmapped_data:
+            updated_element = dict()
+            keys_to_delete = []
+            for result_key in element.keys():
+                for mapper in mappers:
+                    if mapper.to_key == result_key and mapper.applied_on_groupby:
+                        original_value = mapper.backward(self.projects, element[result_key])
+                        updated_element[mapper.from_key] = original_value
+                        keys_to_delete.append(result_key)
+
+            for key in keys_to_delete:
+                del element[key]
+            element.update(updated_element)
+
+        return cast(Sequence[Mapping[str, Any]], unmapped_data)

+ 3 - 1
src/sentry/sentry_metrics/querying/data/preparation/base.py

@@ -1,8 +1,9 @@
 from abc import ABC, abstractmethod
-from dataclasses import dataclass
+from dataclasses import dataclass, field
 
 from snuba_sdk import MetricsQuery
 
+from sentry.sentry_metrics.querying.data.mapping.mapper import Mapper
 from sentry.sentry_metrics.querying.types import QueryOrder
 from sentry.sentry_metrics.querying.units import MeasurementUnit, UnitFamily
 
@@ -27,6 +28,7 @@ class IntermediateQuery:
     unit_family: UnitFamily | None = None
     unit: MeasurementUnit | None = None
     scaling_factor: float | None = None
+    mappers: list[Mapper] = field(default_factory=list)
 
 
 class PreparationStep(ABC):

+ 35 - 0
src/sentry/sentry_metrics/querying/data/preparation/mapping.py

@@ -0,0 +1,35 @@
+from collections.abc import Sequence
+from dataclasses import replace
+
+from sentry.models.project import Project
+from sentry.sentry_metrics.querying.data.mapping.mapper import MapperConfig
+from sentry.sentry_metrics.querying.data.preparation.base import IntermediateQuery, PreparationStep
+from sentry.sentry_metrics.querying.visitors.query_expression import MapperVisitor
+
+
+class QueryMappingStep(PreparationStep):
+    def __init__(self, projects: Sequence[Project], mapper_config: MapperConfig):
+        self.projects = projects
+        self.mapper_config = mapper_config
+
+    def _get_mapped_intermediate_query(
+        self, intermediate_query: IntermediateQuery
+    ) -> IntermediateQuery:
+        visitor = MapperVisitor(self.projects, self.mapper_config)
+        mapped_query = visitor.visit(intermediate_query.metrics_query.query)
+
+        return replace(
+            intermediate_query,
+            metrics_query=intermediate_query.metrics_query.set_query(mapped_query),
+            mappers=visitor.mappers,
+        )
+
+    def run(self, intermediate_queries: list[IntermediateQuery]) -> list[IntermediateQuery]:
+        mapped_intermediate_queries = []
+
+        for intermediate_query in intermediate_queries:
+            mapped_intermediate_queries.append(
+                self._get_mapped_intermediate_query(intermediate_query)
+            )
+
+        return mapped_intermediate_queries

+ 36 - 0
src/sentry/sentry_metrics/querying/visitors/query_condition.py

@@ -4,6 +4,11 @@ from snuba_sdk import BooleanCondition, BooleanOp, Column, Condition, Op
 
 from sentry.api.serializers import bulk_fetch_project_latest_releases
 from sentry.models.project import Project
+from sentry.sentry_metrics.querying.data.mapping.mapper import (
+    Mapper,
+    MapperConfig,
+    get_or_create_mapper,
+)
 from sentry.sentry_metrics.querying.errors import LatestReleaseNotFoundError
 from sentry.sentry_metrics.querying.types import QueryCondition
 from sentry.sentry_metrics.querying.visitors.base import QueryConditionVisitor
@@ -96,3 +101,34 @@ class MappingTransformationVisitor(QueryConditionVisitor[QueryCondition]):
             op=condition.op,
             rhs=condition.rhs,
         )
+
+
+class MapperConditionVisitor(QueryConditionVisitor):
+    def __init__(self, projects: Sequence[Project], mapper_config: MapperConfig):
+        self.projects = projects
+        self.mapper_config = mapper_config
+        self.mappers: list[Mapper] = []
+
+    def _visit_condition(self, condition: Condition) -> Condition:
+        lhs = condition.lhs
+        rhs = condition.rhs
+
+        if isinstance(lhs, Column):
+            mapper = get_or_create_mapper(self.mapper_config, self.mappers, from_key=lhs.name)
+            if mapper:
+                new_lhs = Column(mapper.to_key)
+                if isinstance(rhs, list):
+                    new_rhs = [mapper.forward(self.projects, element) for element in rhs]
+                else:
+                    new_rhs = mapper.forward(self.projects, rhs)
+
+                return Condition(lhs=new_lhs, op=condition.op, rhs=new_rhs)
+
+        return condition
+
+    def _visit_boolean_condition(self, boolean_condition: BooleanCondition) -> BooleanCondition:
+        conditions = []
+        for condition in boolean_condition.conditions:
+            conditions.append(self.visit(condition))
+
+        return BooleanCondition(op=boolean_condition.op, conditions=conditions)

Some files were not shown because too many files changed in this diff