Browse Source

feat(statistical-detectors): Batch functions query (#54701)

Querying 1 project at a time scales linearly in terms of the number of
projects and will not be able to handle the necessary volume. This
creates batches of 100 projects per query to minimize the number of
queries to clickhouse.
Tony Xiao 1 year ago
parent
commit
fcb4f4f2c7

+ 3 - 3
src/sentry/statistical_detectors/detector.py

@@ -75,7 +75,7 @@ class TrendState:
 class TrendPayload:
     group: str | int
     count: float
-    p95: float
+    value: float
     timestamp: datetime
 
 
@@ -166,12 +166,12 @@ def detect_trend(state: TrendState, payload: TrendPayload) -> Tuple[TrendType, T
     # This EMA uses a shorter period to follow the timeseries more closely.
     ema_short = ExponentialMovingAverage(smoothing=2, period=20)
     ema_short.set(state.short_ma, state.count)
-    ema_short.update(payload.p95)
+    ema_short.update(payload.value)
 
     # This EMA uses a longer period to follow the overal trend of the timeseries.
     ema_long = ExponentialMovingAverage(smoothing=2, period=40)
     ema_long.set(state.long_ma, state.count)
-    ema_long.update(payload.p95)
+    ema_long.update(payload.value)
 
     # The heuristic isn't stable initially, so ensure we have a minimum
     # number of data points before looking for a regression.

+ 25 - 16
src/sentry/tasks/statistical_detectors.py

@@ -1,4 +1,5 @@
 import logging
+from collections import defaultdict
 from datetime import datetime, timedelta
 from typing import Any, Dict, List
 
@@ -11,10 +12,13 @@ from sentry.snuba import functions
 from sentry.snuba.referrer import Referrer
 from sentry.statistical_detectors.detector import TrendPayload
 from sentry.tasks.base import instrumented_task
+from sentry.utils.iterators import chunked
 
 logger = logging.getLogger("sentry.tasks.statistical_detectors")
 
 
+FUNCTIONS_PER_PROJECT = 100
+
 ITERATOR_CHUNK = 1_000
 
 
@@ -86,9 +90,12 @@ def detect_function_trends(project_ids: List[int], start: datetime, **kwargs) ->
     if not options.get("statistical_detectors.enable"):
         return
 
-    for project in Project.objects.filter(id__in=project_ids):
+    projects_per_query = options.get("statistical_detectors.query.batch_size")
+    assert projects_per_query > 0
+
+    for projects in chunked(Project.objects.filter(id__in=project_ids), projects_per_query):
         try:
-            query_functions(project, start)
+            query_functions(projects, start)
         except Exception as e:
             sentry_sdk.capture_exception(e)
 
@@ -97,14 +104,15 @@ def query_transactions(project_id: int) -> None:
     pass
 
 
-def query_functions(project: Project, start: datetime) -> List[TrendPayload]:
-    params = _get_function_query_params(project, start)
+def query_functions(projects: List[Project], start: datetime) -> Dict[int, List[TrendPayload]]:
+    params = _get_function_query_params(projects, start)
 
     # TODOs:
     # - format and return this for further processing
     # - handle any errors
-    results = functions.query(
+    query_results = functions.query(
         selected_columns=[
+            "project.id",
             "timestamp",
             "fingerprint",
             "count()",
@@ -112,26 +120,28 @@ def query_functions(project: Project, start: datetime) -> List[TrendPayload]:
         ],
         query="is_application:1",
         params=params,
-        orderby=["-count()"],
-        limit=100,
+        orderby=["project.id", "-count()"],
+        limit=FUNCTIONS_PER_PROJECT * len(projects),
         referrer=Referrer.API_PROFILING_FUNCTIONS_STATISTICAL_DETECTOR.value,
         auto_aggregations=True,
         use_aggregate_conditions=True,
         transform_alias_to_input_format=True,
     )
 
-    return [
-        TrendPayload(
+    function_results = defaultdict(list)
+    for row in query_results["data"]:
+        payload = TrendPayload(
             group=row["fingerprint"],
             count=row["count()"],
-            p95=row["p95()"],
+            value=row["p95()"],
             timestamp=datetime.fromisoformat(row["timestamp"]),
         )
-        for row in results["data"]
-    ]
+        function_results[row["project.id"]].append(payload)
+
+    return function_results
 
 
-def _get_function_query_params(project: Project, start: datetime) -> Dict[str, Any]:
+def _get_function_query_params(projects: List[Project], start: datetime) -> Dict[str, Any]:
     # The functions dataset only supports 1 hour granularity.
     # So we always look back at the last full hour that just elapsed.
     # And since the timestamps are truncated to the start of the hour
@@ -142,7 +152,6 @@ def _get_function_query_params(project: Project, start: datetime) -> Dict[str, A
     return {
         "start": start,
         "end": start + timedelta(minutes=1),
-        "project_id": [project.id],
-        "project_objects": [project],
-        "organization_id": project.organization_id,
+        "project_id": [project.id for project in projects],
+        "project_objects": projects,
     }

+ 1 - 1
src/sentry/testutils/cases.py

@@ -1925,7 +1925,7 @@ class ProfilesSnubaTestCase(
             hasher.update(b"")
         hasher.update(b":")
         hasher.update(function["function"].encode())
-        return int(hasher.hexdigest()[:16], 16)
+        return int(hasher.hexdigest()[:8], 16)
 
 
 @pytest.mark.snuba

+ 4 - 3
tests/sentry/statistical_detectors/test_detector.py

@@ -68,7 +68,7 @@ def test_trend_state(data, expected):
 
 
 @pytest.mark.parametrize(
-    "initial,p95s,regressed_indices,improved_indices",
+    "initial,values,regressed_indices,improved_indices",
     [
         pytest.param(
             TrendState(None, 0, 0, 0),
@@ -100,7 +100,7 @@ def test_trend_state(data, expected):
         ),
     ],
 )
-def test_run_trend_detection(initial, p95s, regressed_indices, improved_indices):
+def test_run_trend_detection(initial, values, regressed_indices, improved_indices):
     states = [initial]
     all_regressed = []
     all_improved = []
@@ -108,7 +108,8 @@ def test_run_trend_detection(initial, p95s, regressed_indices, improved_indices)
     now = datetime.now()
 
     payloads = [
-        TrendPayload(0, i + 1, p95, now + timedelta(hours=i + 1)) for i, p95 in enumerate(p95s)
+        TrendPayload(0, i + 1, value, now + timedelta(hours=i + 1))
+        for i, value in enumerate(values)
     ]
 
     for payload in payloads:

+ 53 - 1
tests/sentry/tasks/test_statistical_detectors.py

@@ -1,18 +1,23 @@
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
 from unittest import mock
 
 import pytest
 from freezegun import freeze_time
 
+from sentry.statistical_detectors.detector import TrendPayload
 from sentry.tasks.statistical_detectors import (
     detect_function_trends,
     detect_transaction_trends,
+    query_functions,
     run_detection,
 )
+from sentry.testutils.cases import ProfilesSnubaTestCase
 from sentry.testutils.factories import Factories
 from sentry.testutils.helpers import override_options
+from sentry.testutils.helpers.datetime import before_now
 from sentry.testutils.helpers.task_runner import TaskRunner
 from sentry.testutils.pytest.fixtures import django_db_all
+from sentry.testutils.silo import region_silo_test
 
 
 @pytest.fixture
@@ -168,3 +173,50 @@ def test_detect_function_trends_query_timerange(functions_query, timestamp, proj
     params = functions_query.mock_calls[0].kwargs["params"]
     assert params["start"] == datetime(2023, 8, 1, 11, 0, tzinfo=timezone.utc)
     assert params["end"] == datetime(2023, 8, 1, 11, 1, tzinfo=timezone.utc)
+
+
+@region_silo_test(stable=True)
+class FunctionsQueryTest(ProfilesSnubaTestCase):
+    def setUp(self):
+        super().setUp()
+
+        self.now = before_now(minutes=10)
+        self.hour_ago = (self.now - timedelta(hours=1)).replace(
+            minute=0, second=0, microsecond=0, tzinfo=timezone.utc
+        )
+
+    def test_functions_query(self):
+        self.store_functions(
+            [
+                {
+                    "self_times_ns": [100 for _ in range(100)],
+                    "package": "foo",
+                    "function": "bar",
+                    # only in app functions should
+                    # appear in the results
+                    "in_app": True,
+                },
+                {
+                    "self_times_ns": [200 for _ in range(100)],
+                    "package": "baz",
+                    "function": "quz",
+                    # non in app functions should not
+                    # appear in the results
+                    "in_app": False,
+                },
+            ],
+            project=self.project,
+            timestamp=self.hour_ago,
+        )
+
+        results = query_functions([self.project], self.now)
+        assert results == {
+            self.project.id: [
+                TrendPayload(
+                    group=self.function_fingerprint({"package": "foo", "function": "bar"}),
+                    count=100,
+                    value=pytest.approx(100),  # type: ignore[arg-type]
+                    timestamp=self.hour_ago,
+                )
+            ],
+        }