Browse Source

feat(statistical-detectors): Pass breakpoints to vroom to produce occ… (#56974)

…urrences

To pull all the necessary data to produce an occurrence, we need to pass
breakpoint along with a profile id to vroom where the data can be
extracted. This change queries for an example profile and passes the
payload to vroom to create issue occurrences.
Tony Xiao 1 year ago
parent
commit
60d18c1a93

+ 3 - 0
src/sentry/snuba/functions.py

@@ -33,6 +33,7 @@ def query(
     auto_fields: bool = False,
     auto_aggregations: bool = False,
     use_aggregate_conditions: bool = False,
+    conditions=None,
     allow_metric_aggregates: bool = False,
     transform_alias_to_input_format: bool = False,
     has_metrics: bool = False,
@@ -61,6 +62,8 @@ def query(
             functions_acl=functions_acl,
         ),
     )
+    if conditions is not None:
+        builder.add_conditions(conditions)
     result = builder.process_results(builder.run_query(referrer))
     result["meta"]["tips"] = transform_tips(builder.tips)
     return result

+ 3 - 0
src/sentry/snuba/referrer.py

@@ -346,6 +346,9 @@ class Referrer(Enum):
     API_PROFILING_FUNCTIONS_STATISTICAL_DETECTOR_STATS = (
         "api.profiling.functions.statistical-detector.stats"
     )
+    API_PROFILING_FUNCTIONS_STATISTICAL_DETECTOR_EXAMPLE = (
+        "api.profiling.functions.statistical-detector.example"
+    )
     API_PROJECT_EVENTS = "api.project-events"
     API_RELEASES_RELEASE_DETAILS_CHART = "api.releases.release-details-chart"
     API_REPLAY_DETAILS_PAGE = "api.replay.details-page"

+ 117 - 22
src/sentry/tasks/statistical_detectors.py

@@ -1,12 +1,13 @@
 from __future__ import annotations
 
 import logging
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from typing import Any, Dict, Generator, List, Optional, Set, Tuple
 
 import sentry_sdk
-from django.utils import timezone
+from django.utils import timezone as django_timezone
 from snuba_sdk import (
+    And,
     Column,
     Condition,
     CurriedFunction,
@@ -17,6 +18,7 @@ from snuba_sdk import (
     Limit,
     LimitBy,
     Op,
+    Or,
     OrderBy,
     Query,
     Request,
@@ -26,6 +28,7 @@ from sentry import options
 from sentry.api.serializers.snuba import SnubaTSResultSerializer
 from sentry.constants import ObjectStatus
 from sentry.models.project import Project
+from sentry.profiles.utils import get_from_profiling_service
 from sentry.search.events.builder import ProfileTopFunctionsTimeseriesQueryBuilder
 from sentry.search.events.fields import get_function_alias
 from sentry.search.events.types import QueryBuilderConfig
@@ -44,7 +47,7 @@ from sentry.statistical_detectors.algorithm import (
 )
 from sentry.statistical_detectors.detector import DetectorPayload, TrendType
 from sentry.tasks.base import instrumented_task
-from sentry.utils import metrics
+from sentry.utils import json, metrics
 from sentry.utils.iterators import chunked
 from sentry.utils.math import ExponentialMovingAverage
 from sentry.utils.query import RangeQuerySetWrapper
@@ -70,7 +73,7 @@ def run_detection() -> None:
     if not options.get("statistical_detectors.enable"):
         return
 
-    now = timezone.now()
+    now = django_timezone.now()
 
     enabled_performance_projects: Set[int] = set(
         options.get("statistical_detectors.enable.projects.performance")
@@ -263,27 +266,15 @@ def detect_function_change_points(
     functions_list: List[Tuple[int, int]], start: datetime, *args, **kwargs
 ) -> None:
     breakpoint_count = 0
+    emitted_count = 0
 
     breakpoints = _detect_function_change_points(functions_list, start)
 
-    for breakpoint_chunk in chunked(breakpoints, 100):
-        for entry in breakpoint_chunk:
-            breakpoint_count += 1
-
-            with sentry_sdk.push_scope() as scope:
-                scope.set_tag("regressed_project_id", entry["project"])
-                # the service was originally meant for transactions so this
-                # naming is a result of this
-                scope.set_tag("regressed_function_id", entry["transaction"])
-                scope.set_context(
-                    "statistical_detectors",
-                    {
-                        **entry,
-                        "timestamp": start.isoformat(),
-                        "breakpoint": datetime.fromtimestamp(entry["breakpoint"]),
-                    },
-                )
-                sentry_sdk.capture_message("Potential Regression")
+    chunk_size = 100
+
+    for breakpoint_chunk in chunked(breakpoints, chunk_size):
+        breakpoint_count += len(breakpoint_chunk)
+        emitted_count += emit_function_regression_issue(breakpoint_chunk, start)
 
     metrics.incr(
         "statistical_detectors.breakpoint.functions",
@@ -291,6 +282,12 @@ def detect_function_change_points(
         sample_rate=1.0,
     )
 
+    metrics.incr(
+        "statistical_detectors.emitted.functions",
+        amount=emitted_count,
+        sample_rate=1.0,
+    )
+
 
 def _detect_function_trends(
     project_ids: List[int], start: datetime
@@ -402,6 +399,104 @@ def _detect_function_change_points(
             continue
 
 
+def emit_function_regression_issue(
+    breakpoints: List[BreakpointData],
+    start: datetime,
+) -> int:
+    start = start - timedelta(hours=1)
+    start = start.replace(minute=0, second=0, microsecond=0)
+
+    project_ids = [int(entry["project"]) for entry in breakpoints]
+    projects = Project.objects.filter(id__in=project_ids)
+    projects_by_id = {project.id: project for project in projects}
+
+    params: Dict[str, Any] = {
+        "start": start,
+        "end": start + timedelta(minutes=1),
+        "project_id": project_ids,
+        "project_objects": projects,
+    }
+
+    conditions = [
+        And(
+            [
+                Condition(Column("project_id"), Op.EQ, int(entry["project"])),
+                Condition(Column("fingerprint"), Op.EQ, int(entry["transaction"])),
+            ]
+        )
+        for entry in breakpoints
+    ]
+
+    result = functions.query(
+        selected_columns=["project.id", "fingerprint", "worst()"],
+        query="is_application:1",
+        params=params,
+        orderby=["project.id"],
+        limit=len(breakpoints),
+        referrer=Referrer.API_PROFILING_FUNCTIONS_STATISTICAL_DETECTOR_EXAMPLE.value,
+        auto_aggregations=True,
+        use_aggregate_conditions=True,
+        transform_alias_to_input_format=True,
+        conditions=conditions if len(conditions) <= 1 else [Or(conditions)],
+    )
+
+    examples = {(row["project.id"], row["fingerprint"]): row["worst()"] for row in result["data"]}
+
+    payloads = []
+
+    for entry in breakpoints:
+        with sentry_sdk.push_scope() as scope:
+            scope.set_tag("regressed_project_id", entry["project"])
+            # the service was originally meant for transactions so this
+            # naming is a result of this
+            scope.set_tag("regressed_function_id", entry["transaction"])
+
+            breakpoint_ts = datetime.fromtimestamp(entry["breakpoint"], tz=timezone.utc)
+            scope.set_context(
+                "statistical_detectors",
+                {
+                    **entry,
+                    "timestamp": start.isoformat(),
+                    "breakpoint_timestamp": breakpoint_ts.isoformat(),
+                },
+            )
+            sentry_sdk.capture_message("Potential Regression")
+
+        project_id = int(entry["project"])
+        fingerprint = int(entry["transaction"])
+        example = examples.get((project_id, fingerprint))
+        if example is None:
+            continue
+
+        project = projects_by_id.get(project_id)
+        if project is None:
+            continue
+
+        payloads.append(
+            {
+                "organization_id": project.organization_id,
+                "project_id": project_id,
+                "profile_id": example,
+                "fingerprint": fingerprint,
+                "absolute_percentage_change": entry["absolute_percentage_change"],
+                "aggregate_range_1": entry["aggregate_range_1"],
+                "aggregate_range_2": entry["aggregate_range_2"],
+                "breakpoint": int(entry["breakpoint"]),
+                "trend_difference": entry["trend_difference"],
+                "trend_percentage": entry["trend_percentage"],
+                "unweighted_p_value": entry["unweighted_p_value"],
+                "unweighted_t_value": entry["unweighted_t_value"],
+            }
+        )
+
+    response = get_from_profiling_service(method="POST", path="/regressed", json_data=payloads)
+    if response.status != 200:
+        return 0
+
+    data = json.loads(response.data)
+    return data.get("occurrences")
+
+
 def all_function_payloads(
     project_ids: List[int],
     start: datetime,

+ 39 - 4
tests/sentry/tasks/test_statistical_detectors.py

@@ -1,10 +1,12 @@
 from datetime import datetime, timedelta, timezone
+from typing import List
 from unittest import mock
 
 import pytest
 from django.db.models import F
 
 from sentry.models import Project
+from sentry.seer.utils import BreakpointData
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
 from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
 from sentry.statistical_detectors.detector import DetectorPayload
@@ -12,6 +14,7 @@ from sentry.tasks.statistical_detectors import (
     detect_function_change_points,
     detect_function_trends,
     detect_transaction_trends,
+    emit_function_regression_issue,
     query_functions,
     query_transactions,
     run_detection,
@@ -292,11 +295,16 @@ def test_detect_function_trends(
     assert detect_function_change_points.delay.called
 
 
+@mock.patch("sentry.tasks.statistical_detectors.emit_function_regression_issue")
 @mock.patch("sentry.tasks.statistical_detectors.detect_breakpoints")
 @mock.patch("sentry.tasks.statistical_detectors.raw_snql_query")
 @django_db_all
 def test_detect_function_change_points(
-    mock_raw_snql_query, mock_detect_breakpoints, timestamp, project
+    mock_raw_snql_query,
+    mock_detect_breakpoints,
+    mock_emit_function_regression_issue,
+    timestamp,
+    project,
 ):
     start_of_hour = timestamp.replace(minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
 
@@ -334,6 +342,7 @@ def test_detect_function_change_points(
     }
 
     detect_function_change_points([(project.id, fingerprint)], timestamp)
+    assert mock_emit_function_regression_issue.called
 
 
 @region_silo_test(stable=True)
@@ -350,9 +359,6 @@ class FunctionsTasksTest(ProfilesSnubaTestCase):
             self.create_project(organization=self.organization, teams=[self.team], name="Bar"),
         ]
 
-    @mock.patch("sentry.tasks.statistical_detectors.FUNCTIONS_PER_PROJECT", 1)
-    def test_functions_query(self):
-
         for project in self.projects:
             self.store_functions(
                 [
@@ -386,6 +392,8 @@ class FunctionsTasksTest(ProfilesSnubaTestCase):
                 timestamp=self.hour_ago,
             )
 
+    @mock.patch("sentry.tasks.statistical_detectors.FUNCTIONS_PER_PROJECT", 1)
+    def test_functions_query(self):
         results = query_functions(self.projects, self.now)
         assert results == [
             DetectorPayload(
@@ -398,6 +406,33 @@ class FunctionsTasksTest(ProfilesSnubaTestCase):
             for project in self.projects
         ]
 
+    @mock.patch("sentry.tasks.statistical_detectors.get_from_profiling_service")
+    def test_emit_function_regression_issue(self, mock_get_from_profiling_service):
+        mock_value = mock.MagicMock()
+        mock_value.status = 200
+        mock_value.data = b'{"occurrences":5}'
+        mock_get_from_profiling_service.return_value = mock_value
+
+        breakpoints: List[BreakpointData] = [
+            {
+                "project": str(project.id),
+                "transaction": str(
+                    self.function_fingerprint({"package": "foo", "function": "foo"})
+                ),
+                "aggregate_range_1": 100_000_000,
+                "aggregate_range_2": 200_000_000,
+                "unweighted_t_value": 1.23,
+                "unweighted_p_value": 1.23,
+                "trend_percentage": 1.23,
+                "absolute_percentage_change": 1.23,
+                "trend_difference": 1.23,
+                "breakpoint": (self.hour_ago - timedelta(hours=12)).timestamp(),
+            }
+            for project in self.projects
+        ]
+        emitted = emit_function_regression_issue(breakpoints, self.now)
+        assert emitted == 5
+
 
 @region_silo_test(stable=True)
 @pytest.mark.sentry_metrics