Просмотр исходного кода

feat(alerts): Send historical snuba data to Seer (#74614)

When an anomaly detection metric alert is created, query Snuba for 28
days of historical data and send it to Seer.

TODO (as separate follow ups):
[ ] [Add to edited anomaly detection
alert](https://getsentry.atlassian.net/browse/ALRT-168)
[ ] [Warn users if less than a week of historical data is
available](https://getsentry.atlassian.net/browse/ALRT-169)
[ ] [Support async Slack channel lookup _and_ anomaly detection alert -
currently it's one or the
other](https://getsentry.atlassian.net/browse/ALRT-170)

Closes https://getsentry.atlassian.net/browse/ALRT-140
Colleen O'Rourke 7 месяцев назад
Родитель
Сommit
553b533817

+ 3 - 0
src/sentry/conf/server.py

@@ -3437,6 +3437,9 @@ SEER_HASH_GROUPING_RECORDS_DELETE_URL = (
 )
 SEER_SIMILARITY_CIRCUIT_BREAKER_KEY = "seer.similarity"
 
+SEER_ANOMALY_DETECTION_VERSION = "v1"
+SEER_ANOMALY_DETECTION_STORE_DATA_URL = f"/{SEER_ANOMALY_DETECTION_VERSION}/anomaly-detection/store"
+
 SIMILARITY_BACKFILL_COHORT_MAP: dict[str, list[int]] = {}
 
 # Devserver configuration overrides.

+ 9 - 1
src/sentry/incidents/endpoints/organization_alert_rule_index.py

@@ -37,7 +37,7 @@ from sentry.incidents.endpoints.serializers.alert_rule import (
 )
 from sentry.incidents.endpoints.utils import parse_team_params
 from sentry.incidents.logic import get_slack_actions_with_async_lookups
-from sentry.incidents.models.alert_rule import AlertRule
+from sentry.incidents.models.alert_rule import AlertRule, AlertRuleDetectionType
 from sentry.incidents.models.incident import Incident, IncidentStatus
 from sentry.incidents.serializers import AlertRuleSerializer as DrfAlertRuleSerializer
 from sentry.incidents.utils.sentry_apps import trigger_sentry_app_action_creators_for_incidents
@@ -46,6 +46,7 @@ from sentry.models.organizationmemberteam import OrganizationMemberTeam
 from sentry.models.project import Project
 from sentry.models.rule import Rule, RuleSource
 from sentry.models.team import Team
+from sentry.seer.anomaly_detection.store_data import send_historical_data_to_seer
 from sentry.sentry_apps.services.app import app_service
 from sentry.signals import alert_rule_created
 from sentry.snuba.dataset import Dataset
@@ -118,8 +119,15 @@ class AlertRuleIndexMixin(Endpoint):
                 }
                 find_channel_id_for_alert_rule.apply_async(kwargs=task_args)
                 return Response({"uuid": client.uuid}, status=202)
+                # TODO handle slack channel lookup AND anomaly detection
             else:
                 alert_rule = serializer.save()
+                if alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC.value:
+                    resp = send_historical_data_to_seer(alert_rule=alert_rule, user=request.user)
+                    if resp.status != 200:
+                        alert_rule.delete()
+                        return Response({"detail": resp.reason}, status=status.HTTP_400_BAD_REQUEST)
+
                 referrer = request.query_params.get("referrer")
                 session_id = request.query_params.get("sessionId")
                 duplicate_rule = request.query_params.get("duplicateRule")

+ 188 - 0
src/sentry/seer/anomaly_detection/store_data.py

@@ -0,0 +1,188 @@
+import logging
+from datetime import timedelta
+
+from django.conf import settings
+from django.utils import timezone
+from rest_framework import status
+from urllib3 import BaseHTTPResponse
+from urllib3.exceptions import MaxRetryError, TimeoutError
+
+from sentry import features
+from sentry.conf.server import SEER_ANOMALY_DETECTION_STORE_DATA_URL
+from sentry.incidents.models.alert_rule import AlertRule, AlertRuleThresholdType
+from sentry.models.user import User
+from sentry.net.http import connection_from_url
+from sentry.seer.anomaly_detection.types import (
+    AlertInSeer,
+    AnomalyDetectionConfig,
+    StoreDataRequest,
+    TimeSeriesPoint,
+)
+from sentry.seer.signed_seer_api import make_signed_seer_api_request
+from sentry.snuba.models import SnubaQuery
+from sentry.snuba.referrer import Referrer
+from sentry.snuba.utils import get_dataset
+from sentry.utils import json
+from sentry.utils.snuba import SnubaTSResult
+
+logger = logging.getLogger(__name__)
+
+seer_anomaly_detection_connection_pool = connection_from_url(
+    settings.SEER_ANOMALY_DETECTION_URL,
+    timeout=settings.SEER_ANOMALY_DETECTION_TIMEOUT,
+)
+
+
+def format_historical_data(data: SnubaTSResult) -> list[TimeSeriesPoint]:
+    """
+    Format Snuba data into the format the Seer API expects.
+    If there are no results, it's just the timestamp
+    {'time': 1719012000}, {'time': 1719018000}, {'time': 1719024000}
+
+    If there are results, the count is added
+    {'time': 1721300400, 'count': 2}
+    """
+    formatted_data = []
+    for datum in data.data.get("data", []):
+        ts_point = TimeSeriesPoint(timestamp=datum.get("time"), value=datum.get("count", 0))
+        formatted_data.append(ts_point)
+    return formatted_data
+
+
+def translate_direction(direction: int) -> str:
+    """
+    Temporary translation map to Seer's expected values
+    """
+    direction_map = {
+        AlertRuleThresholdType.ABOVE: "up",
+        AlertRuleThresholdType.BELOW: "down",
+        AlertRuleThresholdType.ABOVE_AND_BELOW: "both",
+    }
+    return direction_map[AlertRuleThresholdType(direction)]
+
+
+def send_historical_data_to_seer(alert_rule: AlertRule, user: User) -> BaseHTTPResponse:
+    """
+    Get 28 days of historical data and pass it to Seer to be used for prediction anomalies on the alert
+    """
+    base_error_response = BaseHTTPResponse(
+        status=status.HTTP_400_BAD_REQUEST,
+        reason="Something went wrong!",
+        version=0,
+        version_string="HTTP/?",
+        decode_content=True,
+        request_url=SEER_ANOMALY_DETECTION_STORE_DATA_URL,
+    )
+    if not features.has(
+        "organizations:anomaly-detection-alerts", alert_rule.organization, actor=user
+    ):
+        base_error_response.reason = "You do not have the anomaly detection alerts feature enabled."
+        return base_error_response
+
+    project = alert_rule.projects.get()
+    if not project:
+        logger.error(
+            "No project associated with alert_rule. Skipping sending historical data to Seer",
+            extra={
+                "rule_id": alert_rule.id,
+            },
+        )
+        base_error_response.reason = (
+            "No project associated with alert_rule. Cannot create alert_rule."
+        )
+        return base_error_response
+
+    snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
+    window_min = int(snuba_query.time_window / 60)
+    historical_data = fetch_historical_data(alert_rule, snuba_query)
+
+    if not historical_data:
+        base_error_response.reason = "No historical data available. Cannot create alert_rule."
+        return base_error_response
+
+    formatted_data = format_historical_data(historical_data)
+
+    if (
+        not alert_rule.sensitivity
+        or not alert_rule.seasonality
+        or alert_rule.threshold_type is None
+    ):
+        # this won't happen because we've already gone through the serializer, but mypy insists
+        base_error_response.reason = (
+            "Cannot create alert_rule - missing expected configuration for a dynamic alert."
+        )
+        return base_error_response
+
+    anomaly_detection_config = AnomalyDetectionConfig(
+        time_period=window_min,
+        sensitivity=alert_rule.sensitivity,
+        direction=translate_direction(alert_rule.threshold_type),
+        expected_seasonality=alert_rule.seasonality,
+    )
+    alert = AlertInSeer(id=alert_rule.id)
+    body = StoreDataRequest(
+        organization_id=alert_rule.organization.id,
+        project_id=project.id,
+        alert=alert,
+        config=anomaly_detection_config,
+        timeseries=formatted_data,
+    )
+    try:
+        resp = make_signed_seer_api_request(
+            connection_pool=seer_anomaly_detection_connection_pool,
+            path=SEER_ANOMALY_DETECTION_STORE_DATA_URL,
+            body=json.dumps(body).encode("utf-8"),
+        )
+    # See SEER_ANOMALY_DETECTION_TIMEOUT in sentry.conf.server.py
+    except (TimeoutError, MaxRetryError):
+        timeout_text = "Timeout error when hitting Seer store data endpoint"
+        logger.warning(
+            timeout_text,
+            extra={
+                "rule_id": alert_rule.id,
+                "project_id": project.id,
+            },
+        )
+        base_error_response.reason = timeout_text
+        base_error_response.status = status.HTTP_408_REQUEST_TIMEOUT
+        return base_error_response
+
+    # TODO warn if there isn't at least 7 days of data
+    return resp
+
+
+def fetch_historical_data(alert_rule: AlertRule, snuba_query: SnubaQuery) -> SnubaTSResult | None:
+    """
+    Fetch 28 days of historical data from Snuba to pass to Seer to build the anomaly detection model
+    """
+    # TODO: if we can pass the existing timeseries data we have on the front end along here, we can shorten
+    # the time period we query and combine the data
+    NUM_DAYS = 28
+    end = timezone.now()
+    start = end - timedelta(days=NUM_DAYS)
+    granularity = snuba_query.time_window
+
+    dataset_label = snuba_query.dataset
+    if dataset_label == "events":
+        # DATSET_OPTIONS expects the name 'errors'
+        dataset_label = "errors"
+    dataset = get_dataset(dataset_label)
+    project = alert_rule.projects.get()
+    if not project or not dataset:
+        return None
+
+    historical_data = dataset.timeseries_query(
+        selected_columns=[snuba_query.aggregate],
+        query=snuba_query.query,
+        params={
+            "organization_id": alert_rule.organization.id,
+            "project_id": [project.id],
+            "granularity": granularity,
+            "start": start,
+            "end": end,
+        },
+        rollup=granularity,
+        referrer=Referrer.ANOMALY_DETECTION_HISTORICAL_DATA_QUERY.value,
+        zerofill_results=True,
+    )
+    return historical_data

+ 25 - 0
src/sentry/seer/anomaly_detection/types.py

@@ -0,0 +1,25 @@
+from typing import TypedDict
+
+
+class AlertInSeer(TypedDict):
+    id: int
+
+
+class TimeSeriesPoint(TypedDict):
+    timestamp: float
+    value: float
+
+
+class AnomalyDetectionConfig(TypedDict):
+    time_period: int
+    sensitivity: str
+    direction: str
+    expected_seasonality: str
+
+
+class StoreDataRequest(TypedDict):
+    organization_id: int
+    project_id: int
+    alert: AlertInSeer
+    config: AnomalyDetectionConfig
+    timeseries: list[TimeSeriesPoint]

+ 1 - 0
src/sentry/snuba/referrer.py

@@ -12,6 +12,7 @@ logger = logging.getLogger(__name__)
 class Referrer(Enum):
     ALERTRULESERIALIZER_TEST_QUERY_PRIMARY = "alertruleserializer.test_query.primary"
     ALERTRULESERIALIZER_TEST_QUERY = "alertruleserializer.test_query"
+    ANOMALY_DETECTION_HISTORICAL_DATA_QUERY = "anomaly_detection_historical_data_query"
     API_ALERTS_ALERT_RULE_CHART_METRICS_ENHANCED = "api.alerts.alert-rule-chart.metrics-enhanced"
     API_ALERTS_ALERT_RULE_CHART = "api.alerts.alert-rule-chart"
     API_ALERTS_CHARTCUTERIE = "api.alerts.chartcuterie"

+ 103 - 1
tests/sentry/incidents/endpoints/test_organization_alert_rule_index.py

@@ -1,4 +1,5 @@
 from copy import deepcopy
+from datetime import timedelta
 from functools import cached_property
 from unittest.mock import patch
 
@@ -7,13 +8,19 @@ import responses
 from django.db import router, transaction
 from django.test.utils import override_settings
 from rest_framework import status
+from urllib3.exceptions import MaxRetryError, TimeoutError
+from urllib3.response import HTTPResponse
 
 from sentry import audit_log
 from sentry.api.helpers.constants import ALERT_RULES_COUNT_HEADER, MAX_QUERY_SUBSCRIPTIONS_HEADER
 from sentry.api.serializers import serialize
+from sentry.conf.server import SEER_ANOMALY_DETECTION_STORE_DATA_URL
 from sentry.incidents.models.alert_rule import (
     AlertRule,
+    AlertRuleDetectionType,
     AlertRuleMonitorTypeInt,
+    AlertRuleSeasonality,
+    AlertRuleSensitivity,
     AlertRuleThresholdType,
     AlertRuleTrigger,
     AlertRuleTriggerAction,
@@ -23,6 +30,7 @@ from sentry.integrations.slack.utils.channel import SlackChannelIdData
 from sentry.models.auditlogentry import AuditLogEntry
 from sentry.models.organizationmember import OrganizationMember
 from sentry.models.outbox import outbox_context
+from sentry.seer.anomaly_detection.store_data import seer_anomaly_detection_connection_pool
 from sentry.sentry_metrics import indexer
 from sentry.sentry_metrics.use_case_id_registry import UseCaseID
 from sentry.silo.base import SiloMode
@@ -33,7 +41,8 @@ from sentry.tasks.integrations.slack.find_channel_id_for_alert_rule import (
 )
 from sentry.testutils.abstract import Abstract
 from sentry.testutils.cases import APITestCase
-from sentry.testutils.helpers.datetime import freeze_time
+from sentry.testutils.factories import EventType
+from sentry.testutils.helpers.datetime import before_now, freeze_time, iso_format
 from sentry.testutils.helpers.features import with_feature
 from sentry.testutils.outbox import outbox_runner
 from sentry.testutils.silo import assume_test_silo_mode
@@ -207,6 +216,99 @@ class AlertRuleCreateEndpointTest(AlertRuleIndexBase):
         assert resp.data == serialize(alert_rule, self.user)
         assert alert_rule.description == resp.data.get("description")
 
+    @with_feature("organizations:anomaly-detection-alerts")
+    @with_feature("organizations:incidents")
+    @patch(
+        "sentry.seer.anomaly_detection.store_data.seer_anomaly_detection_connection_pool.urlopen"
+    )
+    def test_anomaly_detection_alert(self, mock_seer_request):
+        data = {
+            **self.alert_rule_dict,
+            "detection_type": AlertRuleDetectionType.DYNAMIC,
+            "sensitivity": AlertRuleSensitivity.LOW,
+            "seasonality": AlertRuleSeasonality.AUTO,
+        }
+        mock_seer_request.return_value = HTTPResponse(status=200)
+        day_ago = before_now(days=1).replace(hour=10, minute=0, second=0, microsecond=0)
+        with self.options({"issues.group_attributes.send_kafka": True}):
+            self.store_event(
+                data={
+                    "event_id": "a" * 32,
+                    "message": "super duper bad",
+                    "timestamp": iso_format(day_ago + timedelta(minutes=1)),
+                    "fingerprint": ["group1"],
+                    "tags": {"sentry:user": self.user.email},
+                },
+                event_type=EventType.ERROR,
+                project_id=self.project.id,
+            )
+            self.store_event(
+                data={
+                    "event_id": "b" * 32,
+                    "message": "super bad",
+                    "timestamp": iso_format(day_ago + timedelta(minutes=2)),
+                    "fingerprint": ["group2"],
+                    "tags": {"sentry:user": self.user.email},
+                },
+                event_type=EventType.ERROR,
+                project_id=self.project.id,
+            )
+
+        with outbox_runner():
+            resp = self.get_success_response(
+                self.organization.slug,
+                status_code=201,
+                **data,
+            )
+        assert "id" in resp.data
+        alert_rule = AlertRule.objects.get(id=resp.data["id"])
+        assert resp.data == serialize(alert_rule, self.user)
+        assert alert_rule.seasonality == resp.data.get("seasonality")
+        assert alert_rule.sensitivity == resp.data.get("sensitivity")
+        assert mock_seer_request.call_count == 1
+
+    @with_feature("organizations:anomaly-detection-alerts")
+    @with_feature("organizations:incidents")
+    @patch(
+        "sentry.seer.anomaly_detection.store_data.seer_anomaly_detection_connection_pool.urlopen"
+    )
+    @patch("sentry.seer.anomaly_detection.store_data.logger")
+    def test_anomaly_detection_alert_seer_timeout_max_retry(self, mock_logger, mock_seer_request):
+        data = {
+            **self.alert_rule_dict,
+            "detection_type": AlertRuleDetectionType.DYNAMIC,
+            "sensitivity": AlertRuleSensitivity.LOW,
+            "seasonality": AlertRuleSeasonality.AUTO,
+        }
+        mock_seer_request.side_effect = TimeoutError
+        with outbox_runner():
+            resp = self.get_error_response(
+                self.organization.slug,
+                status_code=400,
+                **data,
+            )
+        assert not AlertRule.objects.filter(detection_type=AlertRuleDetectionType.DYNAMIC).exists()
+        assert mock_logger.warning.call_count == 1
+        assert resp.data["detail"] == "Timeout error when hitting Seer store data endpoint"
+        assert mock_seer_request.call_count == 1
+
+        mock_seer_request.reset_mock()
+        mock_logger.reset_mock()
+
+        mock_seer_request.side_effect = MaxRetryError(
+            seer_anomaly_detection_connection_pool, SEER_ANOMALY_DETECTION_STORE_DATA_URL
+        )
+        with outbox_runner():
+            resp = self.get_error_response(
+                self.organization.slug,
+                status_code=400,
+                **data,
+            )
+        assert not AlertRule.objects.filter(detection_type=AlertRuleDetectionType.DYNAMIC).exists()
+        assert mock_logger.warning.call_count == 1
+        assert resp.data["detail"] == "Timeout error when hitting Seer store data endpoint"
+        assert mock_seer_request.call_count == 1
+
     def test_monitor_type_with_condition(self):
         data = {
             **self.alert_rule_dict,