Browse Source

feat(meta): Add functions to query metrics meta tables (#69803)

Generic metrics now have tables that store meta data about the metrics,
notably
the unique metric IDs, tag keys and tag values for any data received.

This adds functions to query those tables and return the relevant
values.
Evan Hicks 10 months ago
parent
commit
7b994e23ff
2 changed files with 276 additions and 3 deletions
  1. 181 2
      src/sentry/snuba/metrics_layer/query.py
  2. 95 1
      tests/snuba/test_metrics_layer.py

+ 181 - 2
src/sentry/snuba/metrics_layer/query.py

@@ -3,7 +3,7 @@ from __future__ import annotations
 import logging
 from collections.abc import Mapping
 from dataclasses import replace
-from datetime import datetime
+from datetime import UTC, datetime, timedelta
 from typing import Any, Union, cast
 
 from snuba_sdk import (
@@ -11,9 +11,14 @@ from snuba_sdk import (
     Column,
     Condition,
     CurriedFunction,
+    Direction,
+    Entity,
     Formula,
     Metric,
     MetricsQuery,
+    Op,
+    OrderBy,
+    Query,
     Request,
     Timeseries,
 )
@@ -22,7 +27,13 @@ from snuba_sdk.mql.mql import parse_mql
 
 from sentry.exceptions import InvalidParams
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
-from sentry.sentry_metrics.utils import resolve_weak, reverse_resolve_weak, string_to_use_case_id
+from sentry.sentry_metrics.utils import (
+    bulk_reverse_resolve,
+    resolve_many_weak,
+    resolve_weak,
+    reverse_resolve_weak,
+    string_to_use_case_id,
+)
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.metrics.naming_layer.mapping import get_mri
 from sentry.snuba.metrics.naming_layer.mri import parse_mri
@@ -519,3 +530,171 @@ def convert_snuba_result(
                     if reverse_resolve:
                         data_point[key] = reverse_resolve
     return snuba_result
+
+
+def fetch_metric_mris(
+    org_id: int, project_ids: list[int], use_case_id: UseCaseID, app_id: str = ""
+) -> dict[int, list[str]]:
+    """
+    Fetches all the metric MRIs for a set of projects and use case. This will reverse
+    resolve all the metric IDs into MRIs.
+    """
+    return _query_meta_table(org_id, project_ids, use_case_id, app_id=app_id)
+
+
+def fetch_metric_tag_keys(
+    org_id: int, project_ids: list[int], use_case_id: UseCaseID, mri: str, app_id: str = ""
+) -> dict[int, list[str]]:
+    """
+    Fetches the tag keys for a given metric MRI. This will reverse
+    resolve all the tag keys into strings.
+    """
+    return _query_meta_table(org_id, project_ids, use_case_id, mri, app_id)
+
+
+def _query_meta_table(
+    org_id: int,
+    project_ids: list[int],
+    use_case_id: UseCaseID,
+    mri: str | None = None,
+    app_id: str = "",
+) -> dict[int, list[str]]:
+    """
+    Helper function for querying the meta table. This will query across all four metric types, and resolve all the resulting
+    values. If an MRI is provided, it is assumed that this function should find unique tag keys for that MRI.
+    """
+
+    if mri:
+        column_name = "tag_key"
+        metric_id = resolve_weak(use_case_id, org_id, mri)
+        if metric_id == -1:
+            raise InvalidParams(f"Unknown metric: {mri}")
+        extra_condition = Condition(Column("metric_id"), Op.EQ, metric_id)
+    else:
+        column_name = "metric_id"
+        extra_condition = None
+
+    conditions = [
+        Condition(Column("org_id"), Op.EQ, org_id),
+        Condition(Column("project_id"), Op.IN, project_ids),
+        Condition(Column("use_case_id"), Op.EQ, use_case_id.value),
+        Condition(Column("timestamp"), Op.GTE, datetime.now(UTC) - timedelta(days=90)),
+        Condition(Column("timestamp"), Op.LT, datetime.now(UTC) + timedelta(days=1)),
+    ]
+    if extra_condition:
+        conditions.append(extra_condition)
+
+    counters_query = (
+        Query(Entity("generic_metrics_counters_meta"))
+        .set_select([Column("project_id"), Column(column_name)])
+        .set_groupby([Column("project_id"), Column(column_name)])
+        .set_where(conditions)
+        .set_orderby(
+            [
+                OrderBy(Column("project_id"), Direction.ASC),
+                OrderBy(Column(column_name), Direction.ASC),
+            ]
+        )
+        .set_limit(1000)
+    )
+
+    def build_request(query: Query) -> Request:
+        return Request(
+            dataset="generic_metrics",
+            app_id=use_case_id.value if app_id == "" else app_id,
+            query=query,
+            tenant_ids={
+                "organization_id": org_id,
+                "project_id": project_ids[0],
+                "referrer": f"generic_metrics_meta_{column_name}",
+            },
+        )
+
+    requests = [build_request(counters_query)]
+    for mtype in ["sets", "gauges", "distributions"]:
+        new_query = counters_query.set_match(Entity(f"generic_metrics_{mtype}_meta"))
+        new_request = build_request(new_query)
+        requests.append(new_request)
+
+    results = bulk_snuba_queries(requests, f"generic_metrics_meta_{column_name}")
+    indexed_ids = []
+    for result in results:
+        indexed_ids.extend([row[column_name] for row in result["data"]])
+
+    resolved_ids = bulk_reverse_resolve(use_case_id, org_id, indexed_ids)
+    # Group by project ID
+    grouped_results: dict[int, list[str]] = {}
+    for result in results:
+        for row in result["data"]:
+            mri = resolved_ids[row[column_name]]
+            grouped_results.setdefault(row["project_id"], list()).append(mri)
+
+    return grouped_results
+
+
+def fetch_metric_tag_values(
+    org_id: int,
+    project_id: int,
+    use_case_id: UseCaseID,
+    mri: str,
+    tag_key: str,
+    tag_value_prefix: str = "",
+    app_id: str = "",
+) -> list[str]:
+    """
+    Find all the unique tag values for a given MRI and tag key. This will reverse resolve
+    all the values.
+    """
+    parsed_mri = parse_mri(mri)
+    if parsed_mri is None:
+        raise InvalidParams(f"'{mri}' is not a valid MRI")
+
+    entity = {
+        "c": "counters",
+        "d": "distributions",
+        "g": "gauges",
+        "s": "sets",
+    }[parsed_mri.entity]
+
+    resolved = resolve_many_weak(use_case_id, org_id, [mri, tag_key])
+    if len(resolved) != 2:
+        raise InvalidParams("Unknown metric or tag key")
+    metric_id, tag_key_id = resolved
+
+    conditions = [
+        Condition(Column("project_id"), Op.EQ, project_id),
+        Condition(Column("metric_id"), Op.EQ, metric_id),
+        Condition(Column("tag_key"), Op.EQ, tag_key_id),
+        Condition(Column("timestamp"), Op.GTE, datetime.now(UTC) - timedelta(days=90)),
+        Condition(Column("timestamp"), Op.LT, datetime.now(UTC) + timedelta(days=1)),
+    ]
+
+    if tag_value_prefix:
+        conditions.append(Condition(Column("tag_value"), Op.LIKE, f"{tag_value_prefix}%"))
+
+    tag_values_query = (
+        Query(Entity(f"generic_metrics_{entity}_meta_tag_values"))
+        .set_select([Column("tag_value")])
+        .set_groupby([Column("tag_value")])
+        .set_where(conditions)
+        .set_orderby([OrderBy(Column("tag_value"), Direction.ASC)])
+        .set_limit(1000)
+    )
+
+    request = Request(
+        dataset="generic_metrics",
+        app_id=use_case_id.value if app_id == "" else app_id,
+        query=tag_values_query,
+        tenant_ids={
+            "organization_id": org_id,
+            "project_id": project_id,
+            "referrer": "generic_metrics_meta_tag_values",
+        },
+    )
+
+    results = bulk_snuba_queries([request], "generic_metrics_meta_tag_values")
+    values = []
+    for result in results:
+        values.extend([row["tag_value"] for row in result["data"]])
+
+    return values

+ 95 - 1
tests/snuba/test_metrics_layer.py

@@ -25,7 +25,13 @@ from sentry.exceptions import InvalidParams
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
 from sentry.snuba.metrics.naming_layer import SessionMRI, TransactionMRI
 from sentry.snuba.metrics.naming_layer.public import TransactionStatusTagValue, TransactionTagsKey
-from sentry.snuba.metrics_layer.query import bulk_run_query, run_query
+from sentry.snuba.metrics_layer.query import (
+    bulk_run_query,
+    fetch_metric_mris,
+    fetch_metric_tag_keys,
+    fetch_metric_tag_values,
+    run_query,
+)
 from sentry.testutils.cases import BaseMetricsTestCase, TestCase
 
 pytestmark = pytest.mark.sentry_metrics
@@ -872,3 +878,91 @@ class MQLTest(TestCase, BaseMetricsTestCase):
         assert len(result["data"]) == 10
         for row in result["data"]:
             assert row["aggregate_value"] >= 86400
+
+
+class MQLMetaTest(TestCase, BaseMetricsTestCase):
+    def ts(self, dt: datetime) -> int:
+        return int(dt.timestamp())
+
+    def setUp(self) -> None:
+        super().setUp()
+
+        self.generic_metrics: Mapping[str, Literal["counter", "set", "distribution", "gauge"]] = {
+            TransactionMRI.DURATION.value: "distribution",
+            TransactionMRI.USER.value: "set",
+            TransactionMRI.COUNT_PER_ROOT_PROJECT.value: "counter",
+            "g:transactions/test_gauge@none": "gauge",
+        }
+        self.now = datetime.now(tz=timezone.utc).replace(microsecond=0)
+        self.hour_ago = self.now - timedelta(hours=1)
+        self.org_id = self.project.organization_id
+        for mri, metric_type in self.generic_metrics.items():
+            assert metric_type in {"counter", "distribution", "set", "gauge"}
+            for i in range(2):
+                value: int | dict[str, int]
+                if metric_type == "gauge":
+                    value = {
+                        "min": i,
+                        "max": i,
+                        "sum": i,
+                        "count": i,
+                        "last": i,
+                    }
+                else:
+                    value = i
+                self.store_metric(
+                    self.org_id,
+                    self.project.id,
+                    metric_type,
+                    mri,
+                    {
+                        "transaction": f"transaction_{i % 2}",
+                        "status_code": "500" if i % 2 == 0 else "200",
+                        "device": "BlackBerry" if i % 2 == 0 else "Nokia",
+                    },
+                    self.ts(self.hour_ago + timedelta(minutes=1 * i)),
+                    value,
+                    UseCaseID.TRANSACTIONS,
+                )
+
+    def test_fetch_metric_mris(self) -> None:
+        metric_mris = fetch_metric_mris(self.org_id, [self.project.id], UseCaseID.TRANSACTIONS)
+        assert len(metric_mris) == 1
+        assert len(metric_mris[self.project.id]) == 4
+        assert metric_mris[self.project.id] == [
+            "c:transactions/count_per_root_project@none",
+            "s:transactions/user@none",
+            "g:transactions/test_gauge@none",
+            "d:transactions/duration@millisecond",
+        ]
+
+    def test_fetch_metric_tag_keys(self) -> None:
+        tag_keys = fetch_metric_tag_keys(
+            self.org_id, [self.project.id], UseCaseID.TRANSACTIONS, "g:transactions/test_gauge@none"
+        )
+        assert len(tag_keys) == 1
+        assert len(tag_keys[self.project.id]) == 3
+        assert tag_keys[self.project.id] == ["status_code", "device", "transaction"]
+
+    def test_fetch_metric_tag_values(self) -> None:
+        tag_values = fetch_metric_tag_values(
+            self.org_id,
+            self.project.id,
+            UseCaseID.TRANSACTIONS,
+            "g:transactions/test_gauge@none",
+            "transaction",
+        )
+        assert len(tag_values) == 2
+        assert tag_values == ["transaction_0", "transaction_1"]
+
+    def test_fetch_metric_tag_values_with_prefix(self) -> None:
+        tag_values = fetch_metric_tag_values(
+            self.org_id,
+            self.project.id,
+            UseCaseID.TRANSACTIONS,
+            "g:transactions/test_gauge@none",
+            "status_code",
+            "5",
+        )
+        assert len(tag_values) == 1
+        assert tag_values == ["500"]