Browse Source

feat(statistical-detectors): Setup breakpoint analyzer task (#56175)

Sets up the new task for the breakpoint analyzer. For now, simply emit a
message for potential regressions. Eventually, this will be hooked up to
the breakpoint detection to verify the regression.
Tony Xiao 1 year ago
parent
commit
0a1051ab47

+ 1 - 1
src/sentry/statistical_detectors/algorithm.py

@@ -94,7 +94,7 @@ class MovingAverageDetector(DetectorAlgorithm):
             timestamp=self.timestamp,
             count=self.count,
             moving_avg_short=self.moving_avg_short.value,
-            moving_avg_long=self.moving_avg_short.value,
+            moving_avg_long=self.moving_avg_long.value,
         )
 
 

+ 41 - 5
src/sentry/tasks/statistical_detectors.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 import logging
 from datetime import datetime, timedelta
-from typing import Any, Dict, Generator, List, Set
+from typing import Any, Dict, Generator, List, Optional, Set, Tuple
 
 import sentry_sdk
 from django.utils import timezone
@@ -29,6 +29,7 @@ logger = logging.getLogger("sentry.tasks.statistical_detectors")
 
 
 FUNCTIONS_PER_PROJECT = 100
+FUNCTIONS_PER_BATCH = 1_000
 PROJECTS_PER_BATCH = 1_000
 
 
@@ -83,7 +84,7 @@ def run_detection() -> None:
     queue="performance.statistical_detector",
     max_retries=0,
 )
-def detect_transaction_trends(project_ids: List[int], start: datetime, **kwargs) -> None:
+def detect_transaction_trends(project_ids: List[int], start: datetime, *args, **kwargs) -> None:
     if not options.get("statistical_detectors.enable"):
         return
 
@@ -96,10 +97,45 @@ def detect_transaction_trends(project_ids: List[int], start: datetime, **kwargs)
     queue="profiling.statistical_detector",
     max_retries=0,
 )
-def detect_function_trends(project_ids: List[int], start: datetime, **kwargs) -> None:
+def detect_function_trends(project_ids: List[int], start: datetime, *args, **kwargs) -> None:
     if not options.get("statistical_detectors.enable"):
         return
 
+    regressions = filter(
+        lambda trend: trend[0] == TrendType.Regressed, _detect_function_trends(project_ids, start)
+    )
+
+    for trends in chunked(regressions, FUNCTIONS_PER_BATCH):
+        detect_function_change_points.delay(
+            [(payload.project_id, payload.group) for _, payload in trends],
+            start,
+        )
+
+
+@instrumented_task(
+    name="sentry.tasks.statistical_detectors.detect_function_change_points",
+    queue="profiling.statistical_detector",
+    max_retries=0,
+)
+def detect_function_change_points(
+    functions: List[Tuple[int, str | int]], start: datetime, *args, **kwargs
+) -> None:
+    for project_id, function_id in functions:
+        with sentry_sdk.push_scope() as scope:
+            scope.set_tag("project_id", project_id)
+            scope.set_tag("function_id", function_id)
+            scope.set_context(
+                "statistical_detectors",
+                {
+                    "timestamp": start.isoformat(),
+                },
+            )
+            sentry_sdk.capture_message("Potential Regression")
+
+
+def _detect_function_trends(
+    project_ids: List[int], start: datetime
+) -> Generator[Tuple[Optional[TrendType], DetectorPayload], None, None]:
     functions_count = 0
     regressed_count = 0
     improved_count = 0
@@ -141,6 +177,8 @@ def detect_function_trends(project_ids: List[int], start: datetime, **kwargs) ->
             elif trend_type == TrendType.Improved:
                 improved_count += 1
 
+            yield (trend_type, payload)
+
         detector_store.bulk_write_states(payloads, states)
 
     # This is the total number of functions examined in this iteration
@@ -164,8 +202,6 @@ def detect_function_trends(project_ids: List[int], start: datetime, **kwargs) ->
         sample_rate=1.0,
     )
 
-    # TODO: pass on the regressed/improved functions to the next task
-
 
 def all_function_payloads(
     project_ids: List[int],

+ 8 - 4
tests/sentry/tasks/test_statistical_detectors.py

@@ -217,13 +217,16 @@ def test_detect_function_trends_query_timerange(functions_query, timestamp, proj
 
 
 @mock.patch("sentry.tasks.statistical_detectors.query_functions")
+@mock.patch("sentry.tasks.statistical_detectors.detect_function_change_points")
 @django_db_all
 def test_detect_function_trends(
+    detect_function_change_points,
     query_functions,
     timestamp,
     project,
 ):
-    timestamps = [timestamp - timedelta(hours=i) for i in range(3, 0, -1)]
+    n = 20
+    timestamps = [timestamp - timedelta(hours=n - i) for i in range(n)]
 
     query_functions.side_effect = [
         [
@@ -231,16 +234,17 @@ def test_detect_function_trends(
                 project_id=project.id,
                 group=123,
                 count=100,
-                value=100,
+                value=100 if i < n / 2 else 200,
                 timestamp=ts,
             ),
         ]
-        for ts in timestamps
+        for i, ts in enumerate(timestamps)
     ]
 
-    with override_options({"statistical_detectors.enable": True}):
+    with override_options({"statistical_detectors.enable": True}), TaskRunner():
         for ts in timestamps:
             detect_function_trends([project.id], ts)
+    assert detect_function_change_points.delay.called
 
 
 @region_silo_test(stable=True)