123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- from __future__ import annotations
- from datetime import datetime, timedelta, timezone
- from typing import Any, Callable, Literal, Mapping
- import pytest
- from snuba_sdk import (
- ArithmeticOperator,
- Column,
- Condition,
- Direction,
- Formula,
- Metric,
- MetricsQuery,
- MetricsScope,
- Op,
- Request,
- Rollup,
- Timeseries,
- )
- from sentry.exceptions import InvalidParams
- from sentry.sentry_metrics.use_case_id_registry import UseCaseID
- from sentry.snuba.metrics.naming_layer import SessionMRI, TransactionMRI
- from sentry.snuba.metrics.naming_layer.public import TransactionStatusTagValue, TransactionTagsKey
- from sentry.snuba.metrics_layer.query import run_query as layer_run_query
- from sentry.testutils.cases import BaseMetricsTestCase, TestCase
- pytestmark = pytest.mark.sentry_metrics
- # TODO: This is only needed while we support SnQL and MQL. Once SnQL is removed, this can be removed.
- LayerQuery = Callable[[Request], Mapping[str, Any]]
- class MQLTest(TestCase, BaseMetricsTestCase):
- @property
- def run_query(self) -> LayerQuery:
- def mql_query_fn(request: Request) -> Mapping[str, Any]:
- with self.options({"snuba.use-mql-endpoint": 1.0}):
- return layer_run_query(request)
- return mql_query_fn
- def ts(self, dt: datetime) -> int:
- return int(dt.timestamp())
- def setUp(self) -> None:
- super().setUp()
- self.metrics: Mapping[str, Literal["counter", "set", "distribution", "gauge"]] = {
- TransactionMRI.DURATION.value: "distribution",
- TransactionMRI.USER.value: "set",
- TransactionMRI.COUNT_PER_ROOT_PROJECT.value: "counter",
- "g:transactions/test_gauge@none": "gauge",
- }
- self.now = datetime.now(tz=timezone.utc).replace(microsecond=0)
- self.hour_ago = self.now - timedelta(hours=1)
- self.org_id = self.project.organization_id
- for mri, metric_type in self.metrics.items():
- assert metric_type in {"counter", "distribution", "set", "gauge"}
- for i in range(10):
- value: int | dict[str, int]
- if metric_type == "gauge":
- value = {
- "min": i,
- "max": i,
- "sum": i,
- "count": i,
- "last": i,
- }
- else:
- value = i
- self.store_metric(
- self.org_id,
- self.project.id,
- metric_type,
- mri,
- {
- "transaction": f"transaction_{i % 2}",
- "status_code": "500" if i % 3 == 0 else "200",
- "device": "BlackBerry" if i % 2 == 0 else "Nokia",
- },
- self.ts(self.hour_ago + timedelta(minutes=1 * i)),
- value,
- UseCaseID.TRANSACTIONS,
- )
- def test_basic(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="max",
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 10
- rows = result["data"]
- for i in range(10):
- assert rows[i]["aggregate_value"] == i
- assert (
- rows[i]["time"]
- == (
- self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
- ).isoformat()
- )
- def test_groupby(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="quantiles",
- aggregate_params=[0.5, 0.99],
- groupby=[Column("transaction")],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 10
- rows = result["data"]
- for i in range(10):
- assert rows[i]["aggregate_value"] == [i, i]
- assert rows[i]["transaction"] == f"transaction_{i % 2}"
- assert (
- rows[i]["time"]
- == (
- self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
- ).isoformat()
- )
- def test_filters(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="quantiles",
- aggregate_params=[0.5],
- filters=[
- Condition(Column("status_code"), Op.EQ, "500"),
- Condition(Column("device"), Op.EQ, "BlackBerry"),
- ],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 2
- rows = result["data"]
- assert rows[0]["aggregate_value"] == [0]
- assert rows[1]["aggregate_value"] == [6.0]
- def test_complex(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="quantiles",
- aggregate_params=[0.5],
- filters=[
- Condition(Column("status_code"), Op.EQ, "500"),
- Condition(Column("device"), Op.EQ, "BlackBerry"),
- ],
- groupby=[Column("transaction")],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 2
- rows = result["data"]
- assert rows[0]["aggregate_value"] == [0]
- assert rows[0]["transaction"] == "transaction_0"
- assert rows[1]["aggregate_value"] == [6.0]
- assert rows[1]["transaction"] == "transaction_0"
- def test_totals(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="max",
- filters=[Condition(Column("status_code"), Op.EQ, "200")],
- groupby=[Column("transaction")],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(totals=True, granularity=60, orderby=Direction.ASC),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 2
- rows = result["data"]
- assert rows[0]["aggregate_value"] == 7.0
- assert rows[1]["aggregate_value"] == 8.0
- def test_meta_data_in_response(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="max",
- filters=[Condition(Column("status_code"), Op.EQ, "200")],
- groupby=[Column("transaction")],
- ),
- start=self.hour_ago.replace(minute=16, second=59),
- end=self.now.replace(minute=16, second=59),
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert result["modified_start"] == self.hour_ago.replace(minute=16, second=0)
- assert result["modified_end"] == self.now.replace(minute=17, second=0)
- assert result["indexer_mappings"] == {
- "d:transactions/duration@millisecond": 9223372036854775909,
- "status_code": 10000,
- "transaction": 9223372036854776020,
- }
- def test_bad_query(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- "not a real MRI",
- ),
- aggregate="max",
- ),
- start=self.hour_ago.replace(minute=16, second=59),
- end=self.now.replace(minute=16, second=59),
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- with pytest.raises(InvalidParams):
- self.run_query(request)
- def test_interval_with_totals(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="max",
- filters=[Condition(Column("status_code"), Op.EQ, "200")],
- groupby=[Column("transaction")],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, totals=True, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 6
- assert result["totals"]["aggregate_value"] == 8.0
- def test_automatic_granularity(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="max",
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=120),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- # There's a flaky off by one error here that is very difficult to track down
- # TODO: figure out why this is flaky and assert to one specific value
- assert len(result["data"]) in [5, 6]
- def test_automatic_dataset(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- None,
- SessionMRI.RAW_DURATION.value,
- ),
- aggregate="max",
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.SESSIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert request.dataset == "metrics"
- assert len(result["data"]) == 0
- def test_gauges(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- None,
- "g:transactions/test_gauge@none",
- ),
- aggregate="last",
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, totals=True, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 10
- assert result["totals"]["aggregate_value"] == 9.0
- @pytest.mark.skip(reason="This is not implemented in MQL")
- def test_failure_rate(self) -> None:
- query = MetricsQuery(
- query=Formula(
- ArithmeticOperator.DIVIDE,
- [
- Timeseries(
- metric=Metric(
- mri=TransactionMRI.DURATION.value,
- ),
- aggregate="count",
- filters=[
- Condition(
- Column(TransactionTagsKey.TRANSACTION_STATUS.value),
- Op.NOT_IN,
- [
- TransactionStatusTagValue.OK.value,
- TransactionStatusTagValue.CANCELLED.value,
- TransactionStatusTagValue.UNKNOWN.value,
- ],
- )
- ],
- ),
- Timeseries(
- metric=Metric(
- mri=TransactionMRI.DURATION.value,
- ),
- aggregate="count",
- ),
- ],
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, totals=True, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 10
- assert result["totals"]["aggregate_value"] == 1.0
- def test_aggregate_aliases(self) -> None:
- query = MetricsQuery(
- query=Timeseries(
- metric=Metric(
- "transaction.duration",
- TransactionMRI.DURATION.value,
- ),
- aggregate="p95",
- ),
- start=self.hour_ago,
- end=self.now,
- rollup=Rollup(interval=60, granularity=60),
- scope=MetricsScope(
- org_ids=[self.org_id],
- project_ids=[self.project.id],
- use_case_id=UseCaseID.TRANSACTIONS.value,
- ),
- )
- request = Request(
- dataset="generic_metrics",
- app_id="tests",
- query=query,
- tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
- )
- result = self.run_query(request)
- assert len(result["data"]) == 10
- rows = result["data"]
- for i in range(10):
- assert rows[i]["aggregate_value"] == [i]
- assert (
- rows[i]["time"]
- == (
- self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
- ).isoformat()
- )
- class SnQLTest(MQLTest):
- @property
- def run_query(self) -> LayerQuery:
- def snql_query_fn(request: Request) -> Mapping[str, Any]:
- with self.options({"snuba.use-mql-endpoint": 0}):
- return layer_run_query(request)
- return snql_query_fn
- def test_failure_rate(self) -> None:
- super().test_failure_rate()
|