Browse Source

feat(stats-detectors): Emit escalation message for stats detectors (#62801)

When a regression issue is detected to have escalated, we need to emit a
message to the issue platform.
Tony Xiao 1 year ago
parent
commit
5bcec63ff3

+ 5 - 2
src/sentry/statistical_detectors/algorithm.py

@@ -27,6 +27,9 @@ class MovingAverageDetectorState(DetectorState):
     FIELD_MOVING_AVG_SHORT = "S"
     FIELD_MOVING_AVG_LONG = "L"
 
+    def get_moving_avg(self) -> float:
+        return self.moving_avg_long
+
     def to_redis_dict(self) -> Mapping[str | bytes, bytes | float | int | str]:
         d: MutableMapping[str | bytes, bytes | float | int | str] = {
             self.FIELD_COUNT: self.count,
@@ -54,7 +57,7 @@ class MovingAverageDetectorState(DetectorState):
         )
 
     def should_auto_resolve(self, target: float, rel_threshold: float) -> bool:
-        value = self.moving_avg_long
+        value = self.get_moving_avg()
 
         rel_change = (value - target) / target
         if rel_change < rel_threshold:
@@ -65,7 +68,7 @@ class MovingAverageDetectorState(DetectorState):
     def should_escalate(
         self, baseline: float, regressed: float, min_change: float, rel_threshold: float
     ) -> bool:
-        value = self.moving_avg_long
+        value = self.get_moving_avg()
 
         change = value - regressed
         if change < min_change:

+ 4 - 0
src/sentry/statistical_detectors/base.py

@@ -49,3 +49,7 @@ class DetectorState(ABC):
     @abstractmethod
     def empty(cls) -> DetectorState:
         ...
+
+    @abstractmethod
+    def get_moving_avg(self) -> float:
+        ...

+ 128 - 9
src/sentry/statistical_detectors/detector.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import heapq
+import logging
 from abc import ABC, abstractmethod
 from collections import defaultdict
 from dataclasses import dataclass
@@ -11,7 +12,9 @@ import sentry_sdk
 
 from sentry import options
 from sentry.api.serializers.snuba import SnubaTSResultSerializer
+from sentry.issues.ingest import process_occurrence_data
 from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
+from sentry.issues.status_change_consumer import bulk_get_groups_from_fingerprints
 from sentry.issues.status_change_message import StatusChangeMessage
 from sentry.models.group import GroupStatus
 from sentry.models.project import Project
@@ -26,10 +29,13 @@ from sentry.statistical_detectors.algorithm import DetectorAlgorithm
 from sentry.statistical_detectors.base import DetectorPayload, DetectorState, TrendType
 from sentry.statistical_detectors.issue_platform_adapter import fingerprint_regression
 from sentry.statistical_detectors.store import DetectorStore
+from sentry.types.group import GroupSubStatus
 from sentry.utils import metrics
 from sentry.utils.iterators import chunked
 from sentry.utils.snuba import SnubaTSResult
 
+logger = logging.getLogger("sentry.statistical_detectorst.tasks")
+
 
 @dataclass(frozen=True)
 class TrendBundle:
@@ -50,6 +56,11 @@ class RegressionDetector(ABC):
     resolution_rel_threshold: float
     escalation_rel_threshold: float
 
+    @classmethod
+    def configure_tags(cls):
+        sentry_sdk.set_tag("regression.source", cls.source)
+        sentry_sdk.set_tag("regression.kind", cls.source)
+
     @classmethod
     @abstractmethod
     def detector_algorithm_factory(cls) -> DetectorAlgorithm:
@@ -320,7 +331,7 @@ class RegressionDetector(ABC):
                     and bundle.state.should_auto_resolve(
                         group.baseline, cls.resolution_rel_threshold
                     )
-                    # enforce a buffer window after which the issue cannot
+                    # enforce a buffer window within which the issue cannot
                     # auto resolve to avoid the issue state changing frequently
                     and group.date_regressed + cls.buffer_period <= timestamp
                 ):
@@ -357,7 +368,9 @@ class RegressionDetector(ABC):
         timestamp: datetime,
         batch_size=100,
     ) -> Generator[TrendBundle, None, None]:
-        groups_to_escalate = []
+        escalated = 0
+
+        candidates = []
 
         for bundle in bundles:
             group = bundle.regression_group
@@ -371,24 +384,120 @@ class RegressionDetector(ABC):
                         cls.min_change,
                         cls.escalation_rel_threshold,
                     )
+                    # enforce a buffer window within which the issue cannot
+                    # escalate to avoid the issue state changing frequently
+                    and group.date_regressed + cls.buffer_period <= timestamp
                 ):
-                    groups_to_escalate.append(group)
-
-                # For now, keep passing on the bundle.
-                # Eventually, should redirect these bundles to escalation
-                yield bundle
+                    candidates.append(bundle)
+                else:
+                    yield bundle
             except Exception as e:
                 sentry_sdk.capture_exception(e)
 
-        # TODO: mark the groups as escalated
+        """
+        escalated_groups = []
+        groups_to_escalate = []
+        """
+
+        for bundle in cls._filter_escalating_groups(candidates, batch_size=batch_size):
+            escalated += 1
+
+            if bundle.state is None or bundle.regression_group is None:
+                continue
+
+            state = bundle.state
+            group = bundle.regression_group
+
+            logger.info(
+                "statistical_detectors.detection.escalation",
+                extra={
+                    "project": group.project_id,
+                    "fingerprint": group.fingerprint,
+                    "version": group.version,
+                    "baseline": group.baseline,
+                    "regressed": group.regressed,
+                    "escalated": state.get_moving_avg(),
+                },
+            )
+
+            """
+            # mark the existing regression group as inactive
+            # as we want to create a new one for the escalation
+            group.active = False
+            group.date_resolved = timestamp
+            groups_to_escalate.append(group)
+
+            # the escalation will use the current timestamp and
+            # the current moving average as the new regression
+            escalated_groups.append(
+                RegressionGroup(
+                    type=cls.regression_type.value,
+                    date_regressed=timestamp,
+                    version=group.version + 1,
+                    active=True,
+                    project_id=group.project_id,
+                    fingerprint=group.fingerprint,
+                    baseline=group.regressed,
+                    regressed=bundle.state.get_moving_avg(),
+                )
+            )
+
+            status_change = cls.make_status_change_message(
+                bundle.payload, status=GroupStatus.UNRESOLVED, substatus=GroupSubStatus.ESCALATING
+            )
+            produce_occurrence_to_kafka(
+                payload_type=PayloadType.STATUS_CHANGE,
+                status_change=status_change,
+            )
+            """
+
+        """
+        RegressionGroup.objects.bulk_update(groups_to_escalate, ["active", "date_resolved"])
+        RegressionGroup.objects.bulk_create(escalated_groups)
+        """
 
         metrics.incr(
             "statistical_detectors.objects.escalated",
-            amount=len(groups_to_escalate),
+            amount=escalated,
             tags={"source": cls.source, "kind": cls.kind},
             sample_rate=1.0,
         )
 
+    @classmethod
+    def _filter_escalating_groups(
+        cls,
+        bundles_to_escalate: List[TrendBundle],
+        batch_size=100,
+    ) -> Generator[TrendBundle, None, None]:
+        for bundles in chunked(bundles_to_escalate, batch_size):
+            pairs = {
+                generate_issue_group_key(
+                    bundle.payload.project_id, cls.regression_type, bundle.payload.group
+                ): bundle
+                for bundle in bundles
+            }
+
+            issue_groups = bulk_get_groups_from_fingerprints(
+                [(project_id, [fingerprint]) for project_id, fingerprint in pairs]
+            )
+
+            for key, bundle in pairs.items():
+                issue_group = issue_groups.get(key)
+                if issue_group is None:
+                    sentry_sdk.capture_message("Missing issue group for regression issue")
+                    continue
+
+                if (
+                    issue_group.status == GroupStatus.UNRESOLVED
+                    and issue_group.substatus == GroupSubStatus.ONGOING
+                ):
+                    yield bundle
+                elif (
+                    issue_group.status == GroupStatus.IGNORED
+                    and issue_group.substatus == GroupSubStatus.UNTIL_ESCALATING
+                ):
+                    yield bundle
+
     @classmethod
     def get_regression_versions(
         cls,
@@ -483,3 +592,13 @@ def generate_fingerprint(regression_type: RegressionType, name: str | int) -> st
         return f"{int(name):x}"
     else:
         raise ValueError(f"Unsupported RegressionType: {regression_type}")
+
+
+def generate_issue_group_key(
+    project_id: int, regression_type: RegressionType, name: str | int
+) -> Tuple[int, str]:
+    data = {
+        "fingerprint": [generate_fingerprint(regression_type, name)],
+    }
+    process_occurrence_data(data)
+    return project_id, data["fingerprint"][0]

+ 8 - 0
src/sentry/tasks/statistical_detectors.py

@@ -252,6 +252,8 @@ def detect_transaction_trends(
     if not options.get("statistical_detectors.enable"):
         return
 
+    EndpointRegressionDetector.configure_tags()
+
     projects = get_detector_enabled_projects(
         project_ids,
         feature_name="organizations:performance-statistical-detectors-ema",
@@ -291,6 +293,8 @@ def detect_transaction_change_points(
     if not options.get("statistical_detectors.enable"):
         return
 
+    EndpointRegressionDetector.configure_tags()
+
     projects_by_id = {
         project.id: project
         for project in get_detector_enabled_projects(
@@ -331,6 +335,8 @@ def detect_function_trends(project_ids: List[int], start: datetime, *args, **kwa
     if not options.get("statistical_detectors.enable"):
         return
 
+    FunctionRegressionDetector.configure_tags()
+
     projects = get_detector_enabled_projects(
         project_ids,
         feature_name="organizations:profiling-statistical-detectors-ema",
@@ -369,6 +375,8 @@ def detect_function_change_points(
     if not options.get("statistical_detectors.enable"):
         return
 
+    FunctionRegressionDetector.configure_tags()
+
     projects_by_id = {
         project.id: project
         for project in get_detector_enabled_projects(

+ 205 - 0
tests/sentry/tasks/test_statistical_detectors.py

@@ -1,3 +1,4 @@
+import uuid
 from datetime import datetime, timedelta, timezone
 from typing import List
 from unittest import mock
@@ -6,6 +7,11 @@ import pytest
 from django.db.models import F
 
 from sentry.api.endpoints.project_performance_issue_settings import InternalProjectOptions
+from sentry.issues.grouptype import (
+    PerformanceP95EndpointRegressionGroupType,
+    ProfileFunctionRegressionType,
+)
+from sentry.issues.occurrence_consumer import _process_message
 from sentry.issues.producer import PayloadType
 from sentry.issues.status_change_message import StatusChangeMessage
 from sentry.models.group import GroupStatus
@@ -42,6 +48,7 @@ from sentry.testutils.helpers import Feature, override_options
 from sentry.testutils.helpers.datetime import before_now, freeze_time
 from sentry.testutils.pytest.fixtures import django_db_all
 from sentry.testutils.silo import region_silo_test
+from sentry.types.group import GroupSubStatus
 from sentry.utils.snuba import SnubaTSResult
 
 
@@ -1003,6 +1010,204 @@ def test_save_regressions_with_versions(
     assert len(list(regressions)) == 1
 
 
+@pytest.mark.parametrize(
+    ["detector_cls", "object_name", "baseline", "regressed", "escalated", "issue_type"],
+    [
+        pytest.param(
+            EndpointRegressionDetector,
+            "transaction_1",
+            100,
+            300,
+            500,
+            PerformanceP95EndpointRegressionGroupType,
+            id="endpoint",
+        ),
+        pytest.param(
+            FunctionRegressionDetector,
+            "123",
+            100_000_000,
+            300_000_000,
+            500_000_000,
+            ProfileFunctionRegressionType,
+            id="function",
+        ),
+    ],
+)
+@pytest.mark.parametrize(
+    ["status", "substatus", "should_escalate"],
+    [
+        pytest.param(GroupStatus.UNRESOLVED, GroupSubStatus.ESCALATING, False, id="escalating"),
+        pytest.param(GroupStatus.UNRESOLVED, GroupSubStatus.ONGOING, True, id="ongoing"),
+        pytest.param(GroupStatus.UNRESOLVED, GroupSubStatus.REGRESSED, False, id="regressed"),
+        pytest.param(GroupStatus.UNRESOLVED, GroupSubStatus.NEW, False, id="new"),
+        pytest.param(GroupStatus.RESOLVED, None, False, id="resolved"),
+        pytest.param(
+            GroupStatus.IGNORED, GroupSubStatus.UNTIL_ESCALATING, True, id="until escalating"
+        ),
+        pytest.param(
+            GroupStatus.IGNORED, GroupSubStatus.UNTIL_CONDITION_MET, False, id="until condition met"
+        ),
+        pytest.param(GroupStatus.IGNORED, GroupSubStatus.FOREVER, False, id="forever"),
+    ],
+)
+@mock.patch("sentry.statistical_detectors.detector.produce_occurrence_to_kafka")
+@django_db_all
+def test_redirect_escalations(
+    produce_occurrence_to_kafka,
+    detector_cls,
+    object_name,
+    baseline,
+    regressed,
+    escalated,
+    issue_type,
+    status,
+    substatus,
+    should_escalate,
+    project,
+    timestamp,
+    django_cache,  # the environment can persist in the cache otherwise
+):
+    fingerprint = generate_fingerprint(detector_cls.regression_type, object_name)
+
+    RegressionGroup.objects.create(
+        type=detector_cls.regression_type.value,
+        date_regressed=timestamp - timedelta(days=1),
+        version=1,
+        active=True,
+        project_id=project.id,
+        fingerprint=fingerprint,
+        baseline=baseline,
+        regressed=regressed,
+    )
+
+    event_id = uuid.uuid4().hex
+    message = {
+        "id": uuid.uuid4().hex,
+        "project_id": project.id,
+        "event_id": event_id,
+        "fingerprint": [fingerprint],
+        "issue_title": issue_type.description,
+        "subtitle": "",
+        "resource_id": None,
+        "evidence_data": {},
+        "evidence_display": [],
+        "type": issue_type.type_id,
+        "detection_time": timestamp.isoformat(),
+        "level": "info",
+        "culprit": "",
+        "payload_type": PayloadType.OCCURRENCE.value,
+        "event": {
+            "timestamp": timestamp.isoformat(),
+            "project_id": project.id,
+            "transaction": "",
+            "event_id": event_id,
+            "platform": "python",
+            "received": timestamp.isoformat(),
+            "tags": {},
+        },
+    }
+
+    result = _process_message(message)
+    assert result is not None
+    _, group_info = result
+    assert group_info is not None
+    group = group_info.group
+    group.status = status
+    group.substatus = substatus
+    group.save()
+
+    def get_trends(ts):
+        payload = DetectorPayload(
+            project_id=project.id,
+            group=object_name,
+            fingerprint="",  # this fingerprint isn't used so leave it blank
+            count=100,
+            value=escalated,
+            timestamp=ts - timedelta(hours=1),
+        )
+        state = MovingAverageDetectorState(
+            timestamp=ts - timedelta(hours=1),
+            count=100,
+            moving_avg_short=escalated,
+            moving_avg_long=escalated,
+        )
+        yield TrendBundle(
+            type=TrendType.Unchanged,
+            score=1,
+            payload=payload,
+            state=state,
+        )
+
+    trends = get_trends(timestamp)
+    trends = detector_cls.get_regression_groups(trends)
+    trends = detector_cls.redirect_escalations(trends, timestamp)
+
+    assert len(list(trends)) == 0
+
+    """
+    if should_escalate:
+        assert produce_occurrence_to_kafka.called
+        status_change = StatusChangeMessage(
+            fingerprint=[fingerprint],
+            project_id=project.id,
+            new_status=GroupStatus.UNRESOLVED,
+            new_substatus=GroupSubStatus.ESCALATING,
+        )
+        produce_occurrence_to_kafka.assert_has_calls(
+            [mock.call(payload_type=PayloadType.STATUS_CHANGE, status_change=status_change)]
+        )
+
+        # version 1 should be inactive now
+        assert (
+            RegressionGroup.objects.get(
+                type=detector_cls.regression_type.value,
+                version=1,
+                project_id=project.id,
+                fingerprint=fingerprint,
+                active=False,
+            )
+            is not None
+        )
+
+        # version 2 should be created and active
+        assert (
+            RegressionGroup.objects.get(
+                type=detector_cls.regression_type.value,
+                version=2,
+                project_id=project.id,
+                fingerprint=fingerprint,
+                active=True,
+            )
+            is not None
+        )
+    else:
+        assert not produce_occurrence_to_kafka.called
+
+        # version 1 should still be active
+        assert (
+            RegressionGroup.objects.get(
+                type=detector_cls.regression_type.value,
+                version=1,
+                project_id=project.id,
+                fingerprint=fingerprint,
+                active=True,
+            )
+            is not None
+        )
+
+        # version 2 should not exist
+        assert (
+            RegressionGroup.objects.filter(
+                type=detector_cls.regression_type.value,
+                version=2,
+                project_id=project.id,
+                fingerprint=fingerprint,
+            ).first()
+            is None
+        )
+    """
+
+
 @region_silo_test
 class FunctionsTasksTest(ProfilesSnubaTestCase):
     def setUp(self):