|
@@ -1,5 +1,5 @@
|
|
import logging
|
|
import logging
|
|
-from datetime import datetime
|
|
|
|
|
|
+from datetime import UTC, datetime, timedelta
|
|
from typing import Any
|
|
from typing import Any
|
|
|
|
|
|
import pytest
|
|
import pytest
|
|
@@ -10,7 +10,7 @@ from sentry.snuba import errors, metrics_performance
|
|
from sentry.snuba.dataset import Dataset
|
|
from sentry.snuba.dataset import Dataset
|
|
from sentry.snuba.models import SnubaQuery
|
|
from sentry.snuba.models import SnubaQuery
|
|
from sentry.testutils.cases import BaseMetricsTestCase, PerformanceIssueTestCase
|
|
from sentry.testutils.cases import BaseMetricsTestCase, PerformanceIssueTestCase
|
|
-from sentry.testutils.helpers.datetime import iso_format
|
|
|
|
|
|
+from sentry.testutils.helpers.datetime import before_now, freeze_time, iso_format
|
|
from sentry.testutils.performance_issues.event_generators import get_event
|
|
from sentry.testutils.performance_issues.event_generators import get_event
|
|
from sentry.utils.snuba import SnubaTSResult
|
|
from sentry.utils.snuba import SnubaTSResult
|
|
from tests.sentry.incidents.endpoints.test_organization_alert_rule_index import AlertRuleBase
|
|
from tests.sentry.incidents.endpoints.test_organization_alert_rule_index import AlertRuleBase
|
|
@@ -30,15 +30,18 @@ def make_event(**kwargs: Any) -> dict[str, Any]:
|
|
return result
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
+@freeze_time(before_now(days=2).replace(hour=0, minute=0, second=0, microsecond=0))
|
|
class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, PerformanceIssueTestCase):
|
|
class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, PerformanceIssueTestCase):
|
|
def setUp(self):
|
|
def setUp(self):
|
|
super().setUp()
|
|
super().setUp()
|
|
- self.time_1 = "2024-08-29T00:00:00Z"
|
|
|
|
- self.time_1_dt = datetime(2024, 8, 29, 0, 0)
|
|
|
|
|
|
+
|
|
|
|
+ self.now = datetime.now(UTC)
|
|
|
|
+ self.time_1_dt = self.now - timedelta(days=2)
|
|
|
|
+ self.time_1 = self.time_1_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
self.time_1_ts = self.time_1_dt.timestamp()
|
|
self.time_1_ts = self.time_1_dt.timestamp()
|
|
|
|
|
|
- self.time_2 = "2024-08-29T02:00:00Z"
|
|
|
|
- self.time_2_dt = datetime(2024, 8, 29, 2, 0)
|
|
|
|
|
|
+ self.time_2_dt = self.now - timedelta(days=3)
|
|
|
|
+ self.time_2 = self.time_2_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
self.time_2_ts = self.time_2_dt.timestamp()
|
|
self.time_2_ts = self.time_2_dt.timestamp()
|
|
|
|
|
|
self.received = self.time_1_ts
|
|
self.received = self.time_1_ts
|
|
@@ -84,9 +87,6 @@ class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, Performa
|
|
)
|
|
)
|
|
assert result == expected_return_value
|
|
assert result == expected_return_value
|
|
|
|
|
|
- @pytest.mark.skip(
|
|
|
|
- reason="This test is flaking, skipping for now - this feature isn't released."
|
|
|
|
- )
|
|
|
|
def test_anomaly_detection_fetch_historical_data(self):
|
|
def test_anomaly_detection_fetch_historical_data(self):
|
|
alert_rule = self.create_alert_rule(organization=self.organization, projects=[self.project])
|
|
alert_rule = self.create_alert_rule(organization=self.organization, projects=[self.project])
|
|
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
|
|
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
|
|
@@ -119,9 +119,6 @@ class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, Performa
|
|
assert {"time": int(self.time_1_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_1_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_2_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_2_ts), "count": 1} in result.data.get("data")
|
|
|
|
|
|
- @pytest.mark.skip(
|
|
|
|
- reason="This test is flaking, skipping for now - this feature isn't released."
|
|
|
|
- )
|
|
|
|
def test_anomaly_detection_fetch_historical_data_is_unresolved_query(self):
|
|
def test_anomaly_detection_fetch_historical_data_is_unresolved_query(self):
|
|
alert_rule = self.create_alert_rule(organization=self.organization, projects=[self.project])
|
|
alert_rule = self.create_alert_rule(organization=self.organization, projects=[self.project])
|
|
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
|
|
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
|
|
@@ -156,9 +153,6 @@ class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, Performa
|
|
assert {"time": int(self.time_1_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_1_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_2_ts), "count": 1} in result.data.get("data")
|
|
assert {"time": int(self.time_2_ts), "count": 1} in result.data.get("data")
|
|
|
|
|
|
- @pytest.mark.skip(
|
|
|
|
- reason="This test is flaking, skipping for now - this feature isn't released."
|
|
|
|
- )
|
|
|
|
def test_anomaly_detection_fetch_historical_data_performance_alert(self):
|
|
def test_anomaly_detection_fetch_historical_data_performance_alert(self):
|
|
alert_rule = self.create_alert_rule(
|
|
alert_rule = self.create_alert_rule(
|
|
organization=self.organization, projects=[self.project], dataset=Dataset.Transactions
|
|
organization=self.organization, projects=[self.project], dataset=Dataset.Transactions
|
|
@@ -197,9 +191,6 @@ class AnomalyDetectionStoreDataTest(AlertRuleBase, BaseMetricsTestCase, Performa
|
|
result = format_historical_data(data, ["count()"], metrics_performance, self.organization)
|
|
result = format_historical_data(data, ["count()"], metrics_performance, self.organization)
|
|
assert result == expected_return_value
|
|
assert result == expected_return_value
|
|
|
|
|
|
- @pytest.mark.skip(
|
|
|
|
- reason="This test is flaking, skipping for now - this feature isn't released."
|
|
|
|
- )
|
|
|
|
def test_anomaly_detection_fetch_historical_data_crash_rate_alert(self):
|
|
def test_anomaly_detection_fetch_historical_data_crash_rate_alert(self):
|
|
self.store_session(
|
|
self.store_session(
|
|
self.build_session(
|
|
self.build_session(
|