Browse Source

ref(metrics api): Update the metrics API to send queries to Snuba (#56958)

This PR was initially just meant to start sending requests to Snuba. In
doing
that work though, it uncovered some missing pieces that needed to be
added.

Add code to resolve the columns used in the groupby and filter,
particularly
tag names. Also add some E2E tests for the metrics layer, that
successfully
fetch results from Snuba.

---------

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Evan Hicks 1 year ago
parent
commit
43ddfc143b

+ 1 - 1
requirements-base.txt

@@ -63,7 +63,7 @@ sentry-kafka-schemas>=0.1.29
 sentry-redis-tools>=0.1.7
 sentry-relay>=0.8.30
 sentry-sdk>=1.31.0
-snuba-sdk>=2.0.2
+snuba-sdk>=2.0.3
 simplejson>=3.17.6
 sqlparse>=0.4.4
 statsd>=3.3

+ 1 - 1
requirements-dev-frozen.txt

@@ -177,7 +177,7 @@ sentry-sdk==1.31.0
 simplejson==3.17.6
 six==1.16.0
 sniffio==1.2.0
-snuba-sdk==2.0.2
+snuba-sdk==2.0.3
 sortedcontainers==2.4.0
 soupsieve==2.3.2.post1
 sqlparse==0.4.4

+ 1 - 1
requirements-frozen.txt

@@ -117,7 +117,7 @@ sentry-relay==0.8.30
 sentry-sdk==1.31.0
 simplejson==3.17.6
 six==1.16.0
-snuba-sdk==2.0.2
+snuba-sdk==2.0.3
 soupsieve==2.3.2.post1
 sqlparse==0.4.4
 statsd==3.3

+ 87 - 9
src/sentry/snuba/metrics_layer/query.py

@@ -1,16 +1,30 @@
-from typing import Sequence
-
-from snuba_sdk import Request
-from snuba_sdk.metrics_query import MetricsQuery
-
-from sentry.models import Project
+from __future__ import annotations
+
+from dataclasses import replace
+from typing import Any, Mapping, Sequence, Union
+
+from snuba_sdk import (
+    AliasedExpression,
+    BooleanCondition,
+    Column,
+    Condition,
+    CurriedFunction,
+    MetricsQuery,
+    Request,
+)
+
+from sentry.models.project import Project
+from sentry.sentry_metrics.use_case_id_registry import UseCaseID
 from sentry.sentry_metrics.utils import resolve_weak, string_to_use_case_id
 from sentry.snuba.metrics.fields.base import _get_entity_of_metric_mri, org_id_from_projects
 from sentry.snuba.metrics.naming_layer.mapping import get_mri, get_public_name_from_mri
 from sentry.snuba.metrics.utils import to_intervals
+from sentry.utils.snuba import raw_snql_query
 
+FilterTypes = Union[Column, CurriedFunction, Condition, BooleanCondition]
 
-def run_query(request: Request) -> None:
+
+def run_query(request: Request) -> Mapping[str, Any]:
     """
     Entrypoint for executing a metrics query in Snuba.
 
@@ -44,7 +58,7 @@ def run_query(request: Request) -> None:
     resolved_metrics_query = resolve_metrics_query(metrics_query)
     request.query = resolved_metrics_query
 
-    # TODO: executing MetricQuery validation and serialization, result formatting, etc.
+    return raw_snql_query(request, request.tenant_ids["referrer"], use_cache=True)
 
 
 def resolve_metrics_query(metrics_query: MetricsQuery) -> MetricsQuery:
@@ -64,9 +78,10 @@ def resolve_metrics_query(metrics_query: MetricsQuery) -> MetricsQuery:
         )
 
     projects = get_projects(scope.project_ids)
+    org_id = org_id_from_projects(projects)
     use_case_id = string_to_use_case_id(scope.use_case_id)
     metric_id = resolve_weak(
-        use_case_id, org_id_from_projects(projects), metrics_query.query.metric.mri
+        use_case_id, org_id, metrics_query.query.metric.mri
     )  # only support raw metrics for now
     metrics_query = metrics_query.set_query(
         metrics_query.query.set_metric(metrics_query.query.metric.set_id(metric_id))
@@ -79,9 +94,72 @@ def resolve_metrics_query(metrics_query: MetricsQuery) -> MetricsQuery:
         metrics_query = metrics_query.set_query(
             metrics_query.query.set_metric(metrics_query.query.metric.set_entity(entity.value))
         )
+
+    new_groupby = resolve_groupby(metrics_query.query.groupby, use_case_id, org_id)
+    metrics_query = metrics_query.set_query(metrics_query.query.set_groupby(new_groupby))
+    new_groupby = resolve_groupby(metrics_query.groupby, use_case_id, org_id)
+    metrics_query = metrics_query.set_groupby(new_groupby)
+
+    metrics_query = metrics_query.set_query(
+        metrics_query.query.set_filters(
+            resolve_filters(metrics_query.query.filters, use_case_id, org_id)
+        )
+    )
+    metrics_query = metrics_query.set_filters(
+        resolve_filters(metrics_query.filters, use_case_id, org_id)
+    )
     return metrics_query
 
 
+def resolve_groupby(
+    groupby: list[Column] | None, use_case_id: UseCaseID, org_id: int
+) -> list[Column] | None:
+    """
+    Go through the groupby columns and resolve any that need to be resolved.
+    We also return a reverse mapping of the resolved columns to the original
+    so that we can edit the results
+    """
+    if not groupby:
+        return groupby
+
+    new_groupby = []
+    for col in groupby:
+        resolved = resolve_weak(use_case_id, org_id, col.name)
+        if resolved > -1:
+            # TODO: This currently assumes the use of `tags_raw` but that might not always be correct
+            # It also doesn't take into account mapping indexed tag values back to their original values
+            new_groupby.append(
+                AliasedExpression(exp=replace(col, name=f"tags_raw[{resolved}]"), alias=col.name)
+            )
+        else:
+            new_groupby.append(col)
+
+    return new_groupby
+
+
+def resolve_filters(
+    filters: list[Condition | BooleanCondition], use_case_id: UseCaseID, org_id: int
+) -> list[Condition | BooleanCondition] | None:
+    if not filters:
+        return filters
+
+    def resolve_exp(exp: FilterTypes) -> FilterTypes:
+        if isinstance(exp, Column):
+            resolved = resolve_weak(use_case_id, org_id, exp.name)
+            if resolved > -1:
+                return replace(exp, name=f"tags_raw[{resolved}]")
+        elif isinstance(exp, CurriedFunction):
+            return replace(exp, parameters=[resolve_exp(p) for p in exp.parameters])
+        elif isinstance(exp, BooleanCondition):
+            return replace(exp, conditions=[resolve_exp(c) for c in exp.conditions])
+        elif isinstance(exp, Condition):
+            return replace(exp, lhs=resolve_exp(exp.lhs))
+        return exp
+
+    new_filters = [resolve_exp(exp) for exp in filters]
+    return new_filters
+
+
 def get_projects(project_ids: Sequence[int]) -> Sequence[Project]:
     try:
         projects = list(Project.objects.filter(id__in=project_ids))

+ 7 - 2
src/sentry/testutils/pytest/metrics.py

@@ -21,6 +21,8 @@ STRINGS_THAT_LOOK_LIKE_TAG_VALUES = (
 
 @pytest.fixture(autouse=True)
 def control_metrics_access(monkeypatch, request, set_sentry_option):
+    from snuba_sdk import MetricsQuery
+
     from sentry.sentry_metrics import indexer
     from sentry.sentry_metrics.indexer.mock import MockIndexer
     from sentry.snuba import tasks
@@ -60,8 +62,11 @@ def control_metrics_access(monkeypatch, request, set_sentry_option):
                 # We only support snql queries, and metrics only go through snql
                 return old_build_results(*args, **kwargs)
             query = args[0][0][0].query
-            is_performance_metrics = query.match.name.startswith("generic_metrics")
-            is_metrics = "metrics" in query.match.name
+            if isinstance(query, MetricsQuery):
+                is_performance_metrics = is_metrics = False
+            else:
+                is_performance_metrics = query.match.name.startswith("generic_metrics")
+                is_metrics = "metrics" in query.match.name
 
             if is_performance_metrics:
                 _validate_query(query, True)

+ 104 - 2
tests/sentry/snuba/metrics/test_metrics_query_layer/test_metrics_query_layer.py

@@ -3,8 +3,17 @@ Metrics Service Layer Tests for Performance
 """
 
 import pytest
-from snuba_sdk.metrics_query import MetricsQuery
-from snuba_sdk.timeseries import Metric, MetricsScope, Timeseries
+from snuba_sdk import (
+    AliasedExpression,
+    Column,
+    Condition,
+    Metric,
+    MetricsQuery,
+    MetricsScope,
+    Op,
+    Or,
+    Timeseries,
+)
 
 from sentry.sentry_metrics import indexer
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
@@ -45,3 +54,96 @@ class MetricsQueryLayerTest(BaseMetricsLayerTestCase, TestCase):
             self.project.organization_id,
             TransactionMRI.DURATION.value,
         )
+
+    def test_resolve_metrics_query_with_groupby(self):
+        self.store_performance_metric(
+            name=TransactionMRI.DURATION.value,
+            project_id=self.project.id,
+            tags={"transaction": "/checkout"},
+            value=1,
+        )
+        metrics_query = MetricsQuery(
+            query=Timeseries(Metric(mri=TransactionMRI.DURATION.value), aggregate="count"),
+            scope=MetricsScope(
+                org_ids=[self.project.organization_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+            groupby=[Column("transaction")],
+        )
+        expected_metric_id = indexer.resolve(
+            UseCaseID.TRANSACTIONS,
+            self.project.organization_id,
+            TransactionMRI.DURATION.value,
+        )
+        expected_transaction_id = indexer.resolve(
+            UseCaseID.TRANSACTIONS,
+            self.project.organization_id,
+            "transaction",
+        )
+
+        resolved_metrics_query = resolve_metrics_query(metrics_query)
+        assert resolved_metrics_query.query.metric.public_name == "transaction.duration"
+        assert resolved_metrics_query.query.metric.id == expected_metric_id
+        assert resolved_metrics_query.groupby == [
+            AliasedExpression(Column(f"tags_raw[{expected_transaction_id}]"), "transaction")
+        ]
+
+    def test_resolve_metrics_query_with_filters(self):
+        self.store_performance_metric(
+            name=TransactionMRI.DURATION.value,
+            project_id=self.project.id,
+            tags={"transaction": "/checkout", "device": "BlackBerry"},
+            value=1,
+        )
+        metrics_query = MetricsQuery(
+            query=Timeseries(
+                Metric(mri=TransactionMRI.DURATION.value),
+                aggregate="count",
+                filters=[Condition(Column("transaction"), Op.EQ, "/checkout")],
+            ),
+            scope=MetricsScope(
+                org_ids=[self.project.organization_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+            groupby=[Column("transaction")],
+            filters=[
+                Or(
+                    [
+                        Condition(Column("device"), Op.EQ, "BlackBerry"),
+                        Condition(Column("device"), Op.EQ, "Nokia"),
+                    ]
+                )
+            ],
+        )
+        expected_metric_id = indexer.resolve(
+            UseCaseID.TRANSACTIONS,
+            self.project.organization_id,
+            TransactionMRI.DURATION.value,
+        )
+        expected_transaction_id = indexer.resolve(
+            UseCaseID.TRANSACTIONS,
+            self.project.organization_id,
+            "transaction",
+        )
+        expected_device_id = indexer.resolve(
+            UseCaseID.TRANSACTIONS,
+            self.project.organization_id,
+            "device",
+        )
+
+        resolved_metrics_query = resolve_metrics_query(metrics_query)
+        assert resolved_metrics_query.query.metric.public_name == "transaction.duration"
+        assert resolved_metrics_query.query.metric.id == expected_metric_id
+        assert resolved_metrics_query.query.filters == [
+            Condition(Column(f"tags_raw[{expected_transaction_id}]"), Op.EQ, "/checkout")
+        ]
+        assert resolved_metrics_query.filters == [
+            Or(
+                [
+                    Condition(Column(f"tags_raw[{expected_device_id}]"), Op.EQ, "BlackBerry"),
+                    Condition(Column(f"tags_raw[{expected_device_id}]"), Op.EQ, "Nokia"),
+                ]
+            )
+        ]

+ 255 - 0
tests/snuba/test_metrics_layer.py

@@ -0,0 +1,255 @@
+from datetime import datetime, timedelta, timezone
+from typing import Literal, Mapping
+
+import pytest
+
+# from django.utils import timezone
+from snuba_sdk import (
+    Column,
+    Condition,
+    Direction,
+    Metric,
+    MetricsQuery,
+    MetricsScope,
+    Op,
+    Request,
+    Rollup,
+    Timeseries,
+)
+
+from sentry.sentry_metrics.use_case_id_registry import UseCaseID
+from sentry.snuba.metrics.naming_layer import TransactionMRI
+from sentry.snuba.metrics_layer.query import run_query
+from sentry.testutils.cases import BaseMetricsTestCase, TestCase
+
+pytestmark = pytest.mark.sentry_metrics
+
+
+class SnQLTest(TestCase, BaseMetricsTestCase):
+    def ts(self, dt: datetime) -> int:
+        return int(dt.timestamp())
+
+    def setUp(self):
+        super().setUp()
+
+        self.metrics: Mapping[str, Literal["counter", "set", "distribution"]] = {
+            TransactionMRI.DURATION.value: "distribution",
+            TransactionMRI.USER.value: "set",
+            TransactionMRI.COUNT_PER_ROOT_PROJECT.value: "counter",
+        }
+        self.now = datetime.now(tz=timezone.utc)
+        self.hour_ago = self.now - timedelta(hours=1)
+        self.org_id = self.project.organization_id
+        # Store a data point every 10 seconds for an hour
+        for mri, metric_type in self.metrics.items():
+            assert metric_type in {"counter", "distribution", "set"}
+            for i in range(360):
+                self.store_metric(
+                    self.org_id,
+                    self.project.id,
+                    metric_type,
+                    mri,
+                    {
+                        "transaction": f"transaction_{i % 2}",
+                        "status_code": "500" if i % 10 == 0 else "200",
+                        "device": "BlackBerry" if i % 3 == 0 else "Nokia",
+                    },
+                    self.ts(self.hour_ago + timedelta(minutes=1 * i)),
+                    i,
+                    UseCaseID.TRANSACTIONS,
+                )
+
+    def test_basic(self) -> None:
+        query = MetricsQuery(
+            query=Timeseries(
+                metric=Metric(
+                    "transaction.duration",
+                    TransactionMRI.DURATION.value,
+                ),
+                aggregate="max",
+            ),
+            start=self.hour_ago,
+            end=self.now,
+            rollup=Rollup(interval=60, granularity=60),
+            scope=MetricsScope(
+                org_ids=[self.org_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+        )
+
+        request = Request(
+            dataset="generic_metrics",
+            app_id="tests",
+            query=query,
+            tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
+        )
+        result = run_query(request)
+        assert len(result["data"]) == 61
+        rows = result["data"]
+        for i in range(61):
+            assert rows[i]["aggregate_value"] == i
+            assert (
+                rows[i]["time"]
+                == (
+                    self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
+                ).isoformat()
+            )
+
+    def test_groupby(self) -> None:
+        query = MetricsQuery(
+            query=Timeseries(
+                metric=Metric(
+                    "transaction.duration",
+                    TransactionMRI.DURATION.value,
+                ),
+                aggregate="quantiles",
+                aggregate_params=[0.5, 0.99],
+                groupby=[Column("transaction")],
+            ),
+            start=self.hour_ago,
+            end=self.now,
+            rollup=Rollup(interval=60, granularity=60),
+            scope=MetricsScope(
+                org_ids=[self.org_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+        )
+
+        request = Request(
+            dataset="generic_metrics",
+            app_id="tests",
+            query=query,
+            tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
+        )
+        result = run_query(request)
+        assert len(result["data"]) == 61
+        rows = result["data"]
+        for i in range(61):
+            assert rows[i]["aggregate_value"] == [i, i]
+            assert rows[i]["transaction"] == f"transaction_{i % 2}"
+            assert (
+                rows[i]["time"]
+                == (
+                    self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
+                ).isoformat()
+            )
+
+    def test_filters(self) -> None:
+        query = MetricsQuery(
+            query=Timeseries(
+                metric=Metric(
+                    "transaction.duration",
+                    TransactionMRI.DURATION.value,
+                ),
+                aggregate="quantiles",
+                aggregate_params=[0.5],
+                filters=[Condition(Column("status_code"), Op.EQ, "500")],
+            ),
+            filters=[Condition(Column("device"), Op.EQ, "BlackBerry")],
+            start=self.hour_ago,
+            end=self.now,
+            rollup=Rollup(interval=60, granularity=60),
+            scope=MetricsScope(
+                org_ids=[self.org_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+        )
+
+        request = Request(
+            dataset="generic_metrics",
+            app_id="tests",
+            query=query,
+            tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
+        )
+        result = run_query(request)
+        assert len(result["data"]) == 3
+        rows = result["data"]
+        for i in range(3):  # 500 status codes on Blackberry are sparse
+            assert rows[i]["aggregate_value"] == [i * 30]
+            assert (
+                rows[i]["time"]
+                == (
+                    self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=30 * i)
+                ).isoformat()
+            )
+
+    def test_complex(self) -> None:
+        query = MetricsQuery(
+            query=Timeseries(
+                metric=Metric(
+                    "transaction.duration",
+                    TransactionMRI.DURATION.value,
+                ),
+                aggregate="quantiles",
+                aggregate_params=[0.5],
+                filters=[Condition(Column("status_code"), Op.EQ, "500")],
+                groupby=[Column("transaction")],
+            ),
+            filters=[Condition(Column("device"), Op.EQ, "BlackBerry")],
+            start=self.hour_ago,
+            end=self.now,
+            rollup=Rollup(interval=60, granularity=60),
+            scope=MetricsScope(
+                org_ids=[self.org_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+        )
+
+        request = Request(
+            dataset="generic_metrics",
+            app_id="tests",
+            query=query,
+            tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
+        )
+        result = run_query(request)
+        assert len(result["data"]) == 3
+        rows = result["data"]
+        for i in range(3):  # 500 status codes on BB are sparse
+            assert rows[i]["aggregate_value"] == [i * 30]
+            assert rows[i]["transaction"] == "transaction_0"
+            assert (
+                rows[i]["time"]
+                == (
+                    self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=30 * i)
+                ).isoformat()
+            )
+
+    def test_totals(self) -> None:
+        query = MetricsQuery(
+            query=Timeseries(
+                metric=Metric(
+                    "transaction.duration",
+                    TransactionMRI.DURATION.value,
+                ),
+                aggregate="max",
+                filters=[Condition(Column("status_code"), Op.EQ, "200")],
+                groupby=[Column("transaction")],
+            ),
+            start=self.hour_ago,
+            end=self.now,
+            rollup=Rollup(totals=True, granularity=60, orderby=Direction.ASC),
+            scope=MetricsScope(
+                org_ids=[self.org_id],
+                project_ids=[self.project.id],
+                use_case_id=UseCaseID.TRANSACTIONS.value,
+            ),
+        )
+
+        request = Request(
+            dataset="generic_metrics",
+            app_id="tests",
+            query=query,
+            tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
+        )
+        result = run_query(request)
+        assert len(result["data"]) == 2
+        rows = result["data"]
+
+        assert rows[0]["aggregate_value"] == 58
+        assert rows[0]["transaction"] == "transaction_0"
+        assert rows[1]["aggregate_value"] == 59
+        assert rows[1]["transaction"] == "transaction_1"