Browse Source

feat(ddm): Add support for spans correlation source (#64503)

This implements the simplest approach to correlate the span duration and
span exclusive time metrics to sample events by looking up the segment
id in the spans entity and then fetching the segment info.
Tony Xiao 1 year ago
parent
commit
f8ad4ec1d5

+ 123 - 0
src/sentry/sentry_metrics/querying/metadata/metrics_correlations.py

@@ -40,6 +40,7 @@ from sentry.sentry_metrics.querying.visitors import (
 from sentry.snuba.dataset import Dataset, EntityKey
 from sentry.snuba.metrics.naming_layer.mri import (
     ParsedMRI,
+    SpanMRI,
     TransactionMRI,
     is_measurement,
     is_mri,
@@ -498,6 +499,127 @@ class MeasurementsCorrelationsSource(CorrelationsSource):
         )
 
 
+class SpansDurationCorrelationsSource(CorrelationsSource):
+    @classmethod
+    def supports(cls, metric_mri: str) -> bool:
+        return cls.get_span_column(metric_mri) is not None
+
+    @classmethod
+    def get_span_column(cls, metric_mri: str) -> Column | None:
+        if metric_mri == SpanMRI.SELF_TIME.value:
+            return Column("exclusive_time")
+
+        if metric_mri == SpanMRI.DURATION.value:
+            return Column("duration")
+
+        return None
+
+    def _get_segments(
+        self,
+        metric_mri: str,
+        conditions: QueryConditions,
+        start: datetime,
+        end: datetime,
+        min_value: float | None,
+        max_value: float | None,
+    ) -> Sequence[Segment]:
+        segments_spans = self._get_segments_spans(
+            metric_mri, conditions, start, end, min_value, max_value
+        )
+
+        if segments_spans:
+            segments = _get_segments(
+                where=[Condition(Column("transaction_id"), Op.IN, list(segments_spans.keys()))],
+                start=start,
+                end=end,
+                organization=self.organization,
+                projects=self.projects,
+            )
+        else:
+            segments = []
+
+        extended_segments = []
+        for segment in segments:
+            metric_summaries = []
+            spans_details = []
+            for span_id, duration, timestamp in segments_spans.get(segment.segment_id, []):
+                # the span duration and self time metric happens once per span, so we can
+                # hard code what the metric summary object here
+                metric_summaries.append(
+                    MetricSummary(
+                        span_id=span_id,
+                        min=duration,
+                        max=duration,
+                        sum=duration,
+                        count=1,
+                    )
+                )
+
+                spans_details.append(
+                    SpanDetail(span_id=span_id, span_duration=duration, span_timestamp=timestamp)
+                )
+
+            extended_segments.append(
+                segment.add_metric_summaries(metric_summaries).add_spans_details(spans_details)
+            )
+
+        return extended_segments
+
+    def _get_segments_spans(
+        self,
+        metric_mri,
+        conditions: QueryConditions,
+        start: datetime,
+        end: datetime,
+        min_value: float | None,
+        max_value: float | None,
+    ) -> Mapping[str, Sequence[tuple[str, int, datetime]]]:
+        column = self.get_span_column(metric_mri)
+        assert column is not None
+
+        where: list[QueryCondition] = [
+            Condition(Column("project_id"), Op.IN, [project.id for project in self.projects]),
+            Condition(Column("timestamp"), Op.GTE, start),
+            Condition(Column("timestamp"), Op.LT, end),
+        ]
+
+        where.extend(conditions.get())
+
+        if min_value:
+            where += [Condition(column, Op.GTE, min_value)]
+        if max_value:
+            where += [Condition(column, Op.LTE, max_value)]
+
+        query = Query(
+            match=Entity(EntityKey.Spans.value),
+            select=[
+                Column("transaction_id"),
+                Column("span_id"),
+                column,
+                Column("timestamp"),
+            ],
+            where=where,
+            limit=Limit(SNUBA_QUERY_LIMIT),
+        )
+
+        request = Request(
+            dataset=Dataset.SpansIndexed.value,
+            app_id="metrics",
+            query=query,
+            tenant_ids={"organization_id": self.organization.id},
+        )
+
+        data = raw_snql_query(request, Referrer.API_DDM_FETCH_SPANS.value, use_cache=True)["data"]
+
+        segments_spans: dict[str, list[tuple[str, int, datetime]]] = {}
+        for value in data:
+            segments_spans.setdefault(value["transaction_id"], []).append(
+                (value["span_id"], value[column.name], value["timestamp"])
+            )
+
+        return segments_spans
+
+
 def _get_segments_aggregates_query(
     where: ConditionGroup | None,
     start: datetime,
@@ -657,6 +779,7 @@ def _get_segments(
 CORRELATIONS_SOURCES = [
     MeasurementsCorrelationsSource,
     TransactionDurationCorrelationsSource,
+    SpansDurationCorrelationsSource,
     MetricsSummariesCorrelationsSource,
 ]
 

+ 1 - 1
src/sentry/snuba/metrics/naming_layer/mri.py

@@ -325,7 +325,7 @@ def is_custom_measurement(parsed_mri: ParsedMRI) -> bool:
         and parsed_mri.name.startswith("measurements.")
         and
         # Iterate through the transaction MRI and check that this parsed_mri isn't in there
-        parsed_mri.mri_string not in [mri.value for mri in TransactionMRI.__members__.values()]
+        all(parsed_mri.mri_string != mri.value for mri in TransactionMRI.__members__.values())
     )
 
 

+ 2 - 1
src/sentry/testutils/cases.py

@@ -1438,6 +1438,7 @@ class BaseSpansTestCase(SnubaTestCase):
         profile_id: str | None = None,
         transaction: str | None = None,
         duration: int = 10,
+        exclusive_time: int = 5,
         tags: Mapping[str, Any] | None = None,
         measurements: Mapping[str, int | float] | None = None,
         timestamp: datetime | None = None,
@@ -1452,7 +1453,7 @@ class BaseSpansTestCase(SnubaTestCase):
             "span_id": span_id,
             "trace_id": trace_id,
             "duration_ms": int(duration),
-            "exclusive_time_ms": 5,
+            "exclusive_time_ms": int(exclusive_time),
             "is_segment": True,
             "received": datetime.now(tz=timezone.utc).timestamp(),
             "start_timestamp_ms": int(timestamp.timestamp() * 1000),

+ 44 - 0
tests/sentry/api/endpoints/test_organization_ddm_meta.py

@@ -1088,3 +1088,47 @@ class OrganizationDDMEndpointTest(APITestCase, BaseSpansTestCase):
         metric_spans = response.data["metricSpans"]
         # We expect to only have returned that span with that measurement, even if the value is 0.
         assert len(metric_spans) == 1
+
+    def test_get_metric_span_self_time(self):
+        mri = "d:spans/exclusive_time@millisecond"
+
+        self.store_segment(
+            project_id=self.project.id,
+            timestamp=before_now(minutes=5),
+            trace_id=uuid.uuid4().hex,
+            transaction_id=uuid.uuid4().hex,
+        )
+
+        response = self.get_success_response(
+            self.organization.slug,
+            metric=[mri],
+            project=[self.project.id],
+            statsPeriod="1d",
+            metricSpans="true",
+        )
+
+        metric_spans = response.data["metricSpans"]
+        # We expect to only have returned that span with that measurement, even if the value is 0.
+        assert len(metric_spans) == 1
+
+    def test_get_metric_span_duration(self):
+        mri = "d:spans/duration@millisecond"
+
+        self.store_segment(
+            project_id=self.project.id,
+            timestamp=before_now(minutes=5),
+            trace_id=uuid.uuid4().hex,
+            transaction_id=uuid.uuid4().hex,
+        )
+
+        response = self.get_success_response(
+            self.organization.slug,
+            metric=[mri],
+            project=[self.project.id],
+            statsPeriod="1d",
+            metricSpans="true",
+        )
+
+        metric_spans = response.data["metricSpans"]
+        # We expect to only have returned that span with that measurement, even if the value is 0.
+        assert len(metric_spans) == 1

+ 91 - 0
tests/sentry/sentry_metrics/querying/metadata/test_metrics_correlations.py

@@ -0,0 +1,91 @@
+import pytest
+
+from sentry.sentry_metrics.querying.metadata.metrics_correlations import (
+    MeasurementsCorrelationsSource,
+    MetricsSummariesCorrelationsSource,
+    SpansDurationCorrelationsSource,
+    TransactionDurationCorrelationsSource,
+)
+from sentry.snuba.metrics.naming_layer.mri import (
+    ErrorsMRI,
+    SessionMRI,
+    SpanMRI,
+    TransactionMRI,
+    parse_mri,
+)
+
+
+def assign_correlation_source_for_transaction_mri(mri):
+    if not isinstance(mri, TransactionMRI):
+        raise ValueError(f"Non TransactionMRI: {mri.value}")
+
+    if mri == TransactionMRI.DURATION:
+        return TransactionDurationCorrelationsSource
+
+    parsed_mri = parse_mri(mri.value)
+    if parsed_mri is None:
+        raise ValueError(f"Illegal MRI: {mri.value}")
+
+    if parsed_mri.name.startswith("measurements."):
+        return MeasurementsCorrelationsSource
+
+    if parsed_mri.namespace == "spans":
+        if mri == TransactionMRI.SPAN_SELF_TIME:
+            return SpansDurationCorrelationsSource
+        if mri == TransactionMRI.SPAN_DURATION:
+            return SpansDurationCorrelationsSource
+        return None
+
+    return MetricsSummariesCorrelationsSource
+
+
+@pytest.mark.parametrize(
+    ["correlation_source"],
+    [
+        pytest.param(MeasurementsCorrelationsSource, id="measurements"),
+        pytest.param(SpansDurationCorrelationsSource, id="span duration"),
+        pytest.param(TransactionDurationCorrelationsSource, id="transaction duration"),
+    ],
+)
+@pytest.mark.parametrize(
+    ["mri", "expected_source"],
+    [
+        # ========== SessionMRI ==========
+        *[pytest.param(mri.value, MetricsSummariesCorrelationsSource) for mri in SessionMRI],
+        # ==========  Transaction MRI ==========
+        *[
+            pytest.param(
+                mri.value,
+                assign_correlation_source_for_transaction_mri(mri),
+            )
+            for mri in TransactionMRI
+        ],
+        # ==========  Span MRI ==========
+        *[
+            pytest.param(
+                mri.value,
+                SpansDurationCorrelationsSource
+                if mri is SpanMRI.SELF_TIME or mri is SpanMRI.DURATION
+                else MetricsSummariesCorrelationsSource,
+                marks=pytest.mark.skipif(
+                    mri.value.startswith("e:spans_light/"),
+                    reason="Unexpected namespace: spans_light",
+                ),
+            )
+            for mri in SpanMRI
+        ],
+        # ==========  Error MRI ==========
+        pytest.param(ErrorsMRI.EVENT_INGESTED.value, MetricsSummariesCorrelationsSource),
+        # ==========  Custom MRI ==========
+        pytest.param(
+            "d:custom/sentry.process_profile.track_outcome@second",
+            MetricsSummariesCorrelationsSource,
+        ),
+    ],
+)
+def test_correlation_source_supports_mri(correlation_source, mri, expected_source):
+    supported = expected_source is correlation_source
+    assert correlation_source.supports(mri) == supported
+
+    # Treat metrics summary as a fallback, ie it should support all valid MRIs
+    assert MetricsSummariesCorrelationsSource.supports(mri)