Просмотр исходного кода

fix(alerts): Update snuba query data for Seer (#78292)

Another piece of https://github.com/getsentry/sentry/pull/78143/ after
it's been reformatted. This updates the snuba query data if necessary
which results in different results being sent to Seer, and updates the
rule status as needed.

Closes https://getsentry.atlassian.net/browse/ALRT-281
Colleen O'Rourke 5 месяцев назад
Родитель
Сommit
6a07174802

+ 11 - 3
src/sentry/incidents/logic.py

@@ -653,7 +653,7 @@ def create_alert_rule(
                     "Your organization does not have access to this feature."
                 )
             # NOTE: if adding a new metric alert type, take care to check that it's handled here
-            send_new_rule_data(alert_rule, projects[0])
+            send_new_rule_data(alert_rule, projects[0], snuba_query)
 
         if user:
             create_audit_entry_from_user(
@@ -915,7 +915,7 @@ def update_alert_rule(
                 )
             # NOTE: if adding a new metric alert type, take care to check that it's handled here
             project = projects[0] if projects else alert_rule.projects.get()
-            update_rule_data(alert_rule, project, updated_fields, updated_query_fields)
+            update_rule_data(alert_rule, project, snuba_query, updated_fields, updated_query_fields)
         else:
             # if this was a dynamic rule, delete the data in Seer
             if alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC:
@@ -949,7 +949,15 @@ def update_alert_rule(
                 "time_window", timedelta(seconds=snuba_query.time_window)
             )
             updated_query_fields.setdefault("event_types", None)
-            updated_query_fields.setdefault("resolution", timedelta(seconds=snuba_query.resolution))
+            if (
+                detection_type == AlertRuleDetectionType.DYNAMIC
+                and alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
+            ):
+                updated_query_fields.setdefault("resolution", snuba_query.resolution)
+            else:
+                updated_query_fields.setdefault(
+                    "resolution", timedelta(seconds=snuba_query.resolution)
+                )
             update_snuba_query(snuba_query, environment=environment, **updated_query_fields)
 
         existing_subs: Iterable[QuerySubscription] = ()

+ 65 - 13
src/sentry/seer/anomaly_detection/store_data.py

@@ -1,5 +1,6 @@
 import logging
 from datetime import datetime, timedelta
+from enum import StrEnum
 from typing import Any
 
 from django.conf import settings
@@ -22,11 +23,12 @@ from sentry.seer.anomaly_detection.types import (
 from sentry.seer.anomaly_detection.utils import (
     fetch_historical_data,
     format_historical_data,
+    get_dataset_from_label,
+    get_event_types,
     translate_direction,
 )
 from sentry.seer.signed_seer_api import make_signed_seer_api_request
-from sentry.snuba.models import SnubaQuery
-from sentry.snuba.utils import get_dataset
+from sentry.snuba.models import SnubaQuery, SnubaQueryEventType
 from sentry.utils import json, metrics
 from sentry.utils.json import JSONDecodeError
 
@@ -36,7 +38,12 @@ seer_anomaly_detection_connection_pool = connection_from_url(
     settings.SEER_ANOMALY_DETECTION_URL,
     timeout=settings.SEER_ANOMALY_DETECTION_TIMEOUT,
 )
-NUM_DAYS = 28
+MIN_DAYS = 7
+
+
+class SeerMethod(StrEnum):
+    CREATE = "create"
+    UPDATE = "update"
 
 
 def _get_start_and_end_indices(data: list[TimeSeriesPoint]) -> tuple[int, int]:
@@ -58,15 +65,28 @@ def _get_start_and_end_indices(data: list[TimeSeriesPoint]) -> tuple[int, int]:
     return start, end
 
 
-def handle_send_historical_data_to_seer(alert_rule: AlertRule, project: Project, method: str):
+def handle_send_historical_data_to_seer(
+    alert_rule: AlertRule,
+    snuba_query: SnubaQuery,
+    project: Project,
+    method: str,
+    event_types: list[SnubaQueryEventType.EventType] | None = None,
+):
+    event_types_param = event_types or snuba_query.event_types
     try:
         rule_status = send_historical_data_to_seer(
             alert_rule=alert_rule,
             project=project,
+            snuba_query=snuba_query,
+            event_types=event_types_param,
         )
         if rule_status == AlertRuleStatus.NOT_ENOUGH_DATA:
             # if we don't have at least seven days worth of data, then the dynamic alert won't fire
             alert_rule.update(status=AlertRuleStatus.NOT_ENOUGH_DATA.value)
+        elif (
+            rule_status == AlertRuleStatus.PENDING and alert_rule.status != AlertRuleStatus.PENDING
+        ):
+            alert_rule.update(status=AlertRuleStatus.PENDING.value)
     except (TimeoutError, MaxRetryError):
         raise TimeoutError(f"Failed to send data to Seer - cannot {method} alert rule.")
     except ParseError:
@@ -75,9 +95,9 @@ def handle_send_historical_data_to_seer(alert_rule: AlertRule, project: Project,
         raise ValidationError(f"Failed to send data to Seer - cannot {method} alert rule.")
 
 
-def send_new_rule_data(alert_rule: AlertRule, project: Project) -> None:
+def send_new_rule_data(alert_rule: AlertRule, project: Project, snuba_query: SnubaQuery) -> None:
     try:
-        handle_send_historical_data_to_seer(alert_rule, project, "create")
+        handle_send_historical_data_to_seer(alert_rule, snuba_query, project, SeerMethod.CREATE)
     except (TimeoutError, MaxRetryError, ParseError, ValidationError):
         alert_rule.delete()
         raise
@@ -88,6 +108,7 @@ def send_new_rule_data(alert_rule: AlertRule, project: Project) -> None:
 def update_rule_data(
     alert_rule: AlertRule,
     project: Project,
+    snuba_query: SnubaQuery,
     updated_fields: dict[str, Any],
     updated_query_fields: dict[str, Any],
 ) -> None:
@@ -103,19 +124,51 @@ def update_rule_data(
         for k, v in updated_fields.items():
             setattr(alert_rule, k, v)
 
-        handle_send_historical_data_to_seer(alert_rule, project, "update")
+        for k, v in updated_query_fields.items():
+            if k == "dataset":
+                v = v.value
+            elif k == "time_window":
+                time_window = updated_query_fields.get("time_window")
+                v = (
+                    int(time_window.total_seconds())
+                    if time_window is not None
+                    else snuba_query.time_window
+                )
+            elif k == "event_types":
+                continue
+            setattr(alert_rule.snuba_query, k, v)
 
+        assert alert_rule.snuba_query
+        handle_send_historical_data_to_seer(
+            alert_rule,
+            alert_rule.snuba_query,
+            project,
+            SeerMethod.UPDATE,
+            updated_query_fields.get("event_types"),
+        )
 
-def send_historical_data_to_seer(alert_rule: AlertRule, project: Project) -> AlertRuleStatus:
+
+def send_historical_data_to_seer(
+    alert_rule: AlertRule,
+    project: Project,
+    snuba_query: SnubaQuery | None = None,
+    event_types: list[SnubaQueryEventType.EventType] | None = None,
+) -> AlertRuleStatus:
     """
     Get 28 days of historical data and pass it to Seer to be used for prediction anomalies on the alert.
     """
-    snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
+    if not snuba_query:
+        snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
     window_min = int(snuba_query.time_window / 60)
-    dataset = get_dataset(snuba_query.dataset)
-    query_columns = get_query_columns([snuba_query.aggregate], snuba_query.time_window)
+    dataset = get_dataset_from_label(snuba_query.dataset)
+    query_columns = get_query_columns([snuba_query.aggregate], window_min)
+    event_types = get_event_types(snuba_query, event_types)
     historical_data = fetch_historical_data(
-        alert_rule=alert_rule, snuba_query=snuba_query, query_columns=query_columns, project=project
+        alert_rule=alert_rule,
+        snuba_query=snuba_query,
+        query_columns=query_columns,
+        project=project,
+        event_types=event_types,
     )
 
     if not historical_data:
@@ -231,7 +284,6 @@ def send_historical_data_to_seer(alert_rule: AlertRule, project: Project) -> Ale
         )
         raise Exception(message)
 
-    MIN_DAYS = 7
     data_start_index, data_end_index = _get_start_and_end_indices(formatted_data)
     if data_start_index == -1:
         return AlertRuleStatus.NOT_ENOUGH_DATA

+ 41 - 14
src/sentry/seer/anomaly_detection/utils.py

@@ -3,6 +3,7 @@ from typing import Any
 
 from django.utils import timezone
 from django.utils.datastructures import MultiValueDict
+from rest_framework.exceptions import ParseError
 
 from sentry import release_health
 from sentry.api.bases.organization_events import resolve_axis_column
@@ -17,11 +18,17 @@ from sentry.snuba.metrics.extraction import MetricSpecType
 from sentry.snuba.models import SnubaQuery, SnubaQueryEventType
 from sentry.snuba.referrer import Referrer
 from sentry.snuba.sessions_v2 import QueryDefinition
-from sentry.snuba.utils import get_dataset
+from sentry.snuba.utils import DATASET_OPTIONS, get_dataset
 from sentry.utils.snuba import SnubaTSResult
 
 NUM_DAYS = 28
 
+SNUBA_QUERY_EVENT_TYPE_TO_STRING = {
+    SnubaQueryEventType.EventType.ERROR: "error",
+    SnubaQueryEventType.EventType.DEFAULT: "default",
+    SnubaQueryEventType.EventType.TRANSACTION: "transaction",
+}
+
 
 def translate_direction(direction: int) -> str:
     """
@@ -35,27 +42,32 @@ def translate_direction(direction: int) -> str:
     return direction_map[AlertRuleThresholdType(direction)]
 
 
-def get_snuba_query_string(snuba_query: SnubaQuery) -> str:
+def get_event_types(
+    snuba_query: SnubaQuery, event_types: list[SnubaQueryEventType.EventType] | None = None
+) -> list[SnubaQueryEventType.EventType]:
+    if not event_types:
+        event_types = snuba_query.event_types or []
+    return event_types
+
+
+def get_snuba_query_string(
+    snuba_query: SnubaQuery, event_types: list[SnubaQueryEventType.EventType] | None = None
+) -> str:
     """
     Generate a query string that matches what the OrganizationEventsStatsEndpoint does
     """
-    SNUBA_QUERY_EVENT_TYPE_TO_STRING = {
-        SnubaQueryEventType.EventType.ERROR: "error",
-        SnubaQueryEventType.EventType.DEFAULT: "default",
-        SnubaQueryEventType.EventType.TRANSACTION: "transaction",
-    }
-
+    event_types = get_event_types(snuba_query, event_types)
     if len(snuba_query.event_types) > 1:
-        # e.g. (is:unresolved) AND (event.type:[error, default])
+        # e.g. '(is:unresolved) AND (event.type:[error, default])'
         event_types_list = [
-            SNUBA_QUERY_EVENT_TYPE_TO_STRING[event_type] for event_type in snuba_query.event_types
+            SNUBA_QUERY_EVENT_TYPE_TO_STRING[event_type] for event_type in event_types
         ]
         event_types_string = "(event.type:["
         event_types_string += ", ".join(event_types_list)
         event_types_string += "])"
     else:
-        # e.g. (is:unresolved) AND (event.type:error)
-        snuba_query_event_type_string = SNUBA_QUERY_EVENT_TYPE_TO_STRING[snuba_query.event_types[0]]
+        # e.g. '(is:unresolved) AND (event.type:error)'
+        snuba_query_event_type_string = SNUBA_QUERY_EVENT_TYPE_TO_STRING[event_types[0]]
         event_types_string = f"(event.type:{snuba_query_event_type_string})"
     if snuba_query.query:
         snuba_query_string = f"({snuba_query.query}) AND {event_types_string}"
@@ -161,6 +173,19 @@ def format_historical_data(
     return format_snuba_ts_data(data, query_columns, organization)
 
 
+def get_dataset_from_label(dataset_label: str):
+    if dataset_label == "events":
+        # DATASET_OPTIONS expects the name 'errors'
+        dataset_label = "errors"
+    elif dataset_label in ["generic_metrics", "transactions"]:
+        # XXX: performance alerts dataset differs locally vs in prod
+        dataset_label = "metricsEnhanced"
+    dataset = get_dataset(dataset_label)
+    if dataset is None:
+        raise ParseError(detail=f"dataset must be one of: {', '.join(DATASET_OPTIONS.keys())}")
+    return dataset
+
+
 def fetch_historical_data(
     alert_rule: AlertRule,
     snuba_query: SnubaQuery,
@@ -168,6 +193,7 @@ def fetch_historical_data(
     project: Project,
     start: datetime | None = None,
     end: datetime | None = None,
+    event_types: list[SnubaQueryEventType.EventType] | None = None,
 ) -> SnubaTSResult | None:
     """
     Fetch 28 days of historical data from Snuba to pass to Seer to build the anomaly detection model
@@ -191,7 +217,7 @@ def fetch_historical_data(
     elif dataset_label in ["generic_metrics", "transactions"]:
         # XXX: performance alerts dataset differs locally vs in prod
         dataset_label = "metricsEnhanced"
-    dataset = get_dataset(dataset_label)
+    dataset = get_dataset_from_label(dataset_label)
 
     if not project or not dataset or not alert_rule.organization:
         return None
@@ -214,7 +240,8 @@ def fetch_historical_data(
             start, end, project, alert_rule.organization, granularity
         )
     else:
-        snuba_query_string = get_snuba_query_string(snuba_query)
+        event_types = get_event_types(snuba_query, event_types)
+        snuba_query_string = get_snuba_query_string(snuba_query, event_types)
         historical_data = dataset.timeseries_query(
             selected_columns=query_columns,
             query=snuba_query_string,

+ 95 - 1
tests/sentry/incidents/test_logic.py

@@ -101,7 +101,7 @@ from sentry.silo.base import SiloMode
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.models import QuerySubscription, SnubaQuery, SnubaQueryEventType
 from sentry.testutils.cases import BaseIncidentsTest, BaseMetricsTestCase, TestCase
-from sentry.testutils.helpers.datetime import before_now, freeze_time
+from sentry.testutils.helpers.datetime import before_now, freeze_time, iso_format
 from sentry.testutils.helpers.features import with_feature
 from sentry.testutils.helpers.options import override_options
 from sentry.testutils.silo import assume_test_silo_mode, assume_test_silo_mode_of
@@ -1000,6 +1000,21 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
     def alert_rule(self):
         return self.create_alert_rule(name="hello")
 
+    def create_error_event(self, **kwargs):
+        two_weeks_ago = before_now(days=14).replace(hour=10, minute=0, second=0, microsecond=0)
+        data = {
+            "event_id": "a" * 32,
+            "message": "super bad",
+            "timestamp": iso_format(two_weeks_ago + timedelta(minutes=1)),
+            "tags": {"sentry:user": self.user.email},
+            "exception": [{"value": "BadError"}],
+        }
+        data.update(**kwargs)
+        self.store_event(
+            data=data,
+            project_id=self.project.id,
+        )
+
     def test(self):
         name = "uh oh"
         query = "level:warning"
@@ -1763,6 +1778,85 @@ class UpdateAlertRuleTest(TestCase, BaseIncidentsTest):
         assert mock_seer_request.call_count == 1
         assert alert_rule.status == AlertRuleStatus.PENDING.value
 
+    @with_feature("organizations:anomaly-detection-alerts")
+    @patch(
+        "sentry.seer.anomaly_detection.store_data.seer_anomaly_detection_connection_pool.urlopen"
+    )
+    def test_update_dynamic_alert_not_enough_to_pending(self, mock_seer_request):
+        """
+        Update a dynamic rule's aggregate so the rule's status changes from not enough data to enough/pending
+        """
+        seer_return_value: StoreDataResponse = {"success": True}
+        mock_seer_request.return_value = HTTPResponse(orjson.dumps(seer_return_value), status=200)
+
+        dynamic_rule = self.create_alert_rule(
+            sensitivity=AlertRuleSensitivity.HIGH,
+            seasonality=AlertRuleSeasonality.AUTO,
+            time_window=60,
+            detection_type=AlertRuleDetectionType.DYNAMIC,
+        )
+        assert mock_seer_request.call_count == 1
+        assert dynamic_rule.status == AlertRuleStatus.NOT_ENOUGH_DATA.value
+        mock_seer_request.reset_mock()
+
+        two_weeks_ago = before_now(days=14).replace(hour=10, minute=0, second=0, microsecond=0)
+        with self.options({"issues.group_attributes.send_kafka": True}):
+            self.create_error_event(timestamp=iso_format(two_weeks_ago + timedelta(minutes=1)))
+            self.create_error_event(
+                timestamp=iso_format(two_weeks_ago + timedelta(days=10))
+            )  # 4 days ago
+
+        # update aggregate
+        update_alert_rule(
+            dynamic_rule,
+            aggregate="count_unique(user)",
+            time_window=60,
+            detection_type=AlertRuleDetectionType.DYNAMIC,
+        )
+        assert mock_seer_request.call_count == 1
+        assert dynamic_rule.status == AlertRuleStatus.PENDING.value
+
+    @with_feature("organizations:anomaly-detection-alerts")
+    @patch(
+        "sentry.seer.anomaly_detection.store_data.seer_anomaly_detection_connection_pool.urlopen"
+    )
+    def test_update_dynamic_alert_pending_to_not_enough(self, mock_seer_request):
+        """
+        Update a dynamic rule's aggregate so the rule's status changes from enough/pending to not enough data
+        """
+        seer_return_value: StoreDataResponse = {"success": True}
+        mock_seer_request.return_value = HTTPResponse(orjson.dumps(seer_return_value), status=200)
+
+        two_weeks_ago = before_now(days=14).replace(hour=10, minute=0, second=0, microsecond=0)
+        with self.options({"issues.group_attributes.send_kafka": True}):
+            self.create_error_event(timestamp=iso_format(two_weeks_ago + timedelta(minutes=1)))
+            self.create_error_event(
+                timestamp=iso_format(two_weeks_ago + timedelta(days=10))
+            )  # 4 days ago
+
+        dynamic_rule = self.create_alert_rule(
+            sensitivity=AlertRuleSensitivity.HIGH,
+            seasonality=AlertRuleSeasonality.AUTO,
+            time_window=60,
+            detection_type=AlertRuleDetectionType.DYNAMIC,
+        )
+        assert mock_seer_request.call_count == 1
+        assert dynamic_rule.status == AlertRuleStatus.PENDING.value
+
+        mock_seer_request.reset_mock()
+
+        # update aggregate
+        update_alert_rule(
+            dynamic_rule,
+            aggregate="p95(measurements.fid)",  # first input delay data we don't have stored
+            dataset=Dataset.Transactions,
+            event_types=[SnubaQueryEventType.EventType.TRANSACTION],
+            query="",
+            detection_type=AlertRuleDetectionType.DYNAMIC,
+        )
+        assert mock_seer_request.call_count == 1
+        assert dynamic_rule.status == AlertRuleStatus.NOT_ENOUGH_DATA.value
+
     @with_feature("organizations:anomaly-detection-alerts")
     @patch(
         "sentry.seer.anomaly_detection.store_data.seer_anomaly_detection_connection_pool.urlopen"