Browse Source

ref(metrics-extraction): Use bulk cache instead of per-query cache (#68259)

### Summary
We can't make individual cache calls for most of the on-demand metrics
extraction query resolution since calling cache isn't free and they'd be
replacing resolve queries in the 10ms range. We do need to shorten the
outliers however, so this adds a bulk cache that only gets fetched/set
once per metrics extraction run, and is saved for ~1.5hr before updating
entirely.
Kev 11 months ago
parent
commit
b1f806a5cd

+ 36 - 4
src/sentry/relay/config/metric_extraction.py

@@ -14,6 +14,7 @@ from sentry_relay.processing import validate_sampling_condition
 from sentry import features, options
 from sentry.api.endpoints.project_transaction_threshold import DEFAULT_THRESHOLD
 from sentry.api.utils import get_date_range_from_params
+from sentry.features.rollout import in_random_rollout
 from sentry.incidents.models.alert_rule import AlertRule, AlertRuleStatus
 from sentry.models.dashboard_widget import (
     ON_DEMAND_ENABLED_KEY,
@@ -203,6 +204,20 @@ def _get_alert_metric_specs(
     return specs
 
 
+def _bulk_cache_query_key(project: Project) -> str:
+    return f"on-demand.bulk-query-cache.{project.organization.id}"
+
+
+def _get_bulk_cached_query(project: Project) -> dict[str, Any]:
+    query_bulk_cache_key = _bulk_cache_query_key(project)
+    return cache.get(query_bulk_cache_key, None)
+
+
+def _set_bulk_cached_query(project: Project, query_cache: dict[str, Any]) -> None:
+    query_bulk_cache_key = _bulk_cache_query_key(project)
+    cache.set(query_bulk_cache_key, query_cache, timeout=5400)
+
+
 @metrics.wraps("on_demand_metrics._get_widget_metric_specs")
 def _get_widget_metric_specs(
     project: Project, enabled_features: set[str], prefilling: bool
@@ -230,6 +245,10 @@ def _get_widget_metric_specs(
         "on_demand_metrics.widgets_to_process", amount=len(widget_queries), sample_rate=1.0
     )
 
+    organization_bulk_query_cache = _get_bulk_cached_query(project)
+    save_organization_bulk_cache = not bool(organization_bulk_query_cache)
+    organization_bulk_query_cache = {}
+
     ignored_widget_ids: dict[int, bool] = {}
     specs_for_widget: dict[int, list[HashedMetricSpec]] = defaultdict(list)
     widget_query_for_spec_hash: dict[str, DashboardWidgetQuery] = {}
@@ -239,7 +258,9 @@ def _get_widget_metric_specs(
 
     with metrics.timer("on_demand_metrics.widget_spec_convert"):
         for widget_query in widget_queries:
-            widget_specs = convert_widget_query_to_metric(project, widget_query, prefilling)
+            widget_specs = convert_widget_query_to_metric(
+                project, widget_query, prefilling, organization_bulk_query_cache
+            )
 
             if not widget_specs:
                 # Skip checking any widget queries that don't have specs,
@@ -285,6 +306,9 @@ def _get_widget_metric_specs(
 
     _update_state_with_spec_limit(trimmed_specs, widget_query_for_spec_hash)
     metrics.incr("on_demand_metrics.widget_query_specs", amount=len(specs))
+    if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"):
+        if save_organization_bulk_cache:
+            _set_bulk_cached_query(project, organization_bulk_query_cache)
     return specs
 
 
@@ -410,7 +434,10 @@ def _convert_snuba_query_to_metrics(
 
 
 def convert_widget_query_to_metric(
-    project: Project, widget_query: DashboardWidgetQuery, prefilling: bool
+    project: Project,
+    widget_query: DashboardWidgetQuery,
+    prefilling: bool,
+    organization_bulk_query_cache: dict[str, Any] | None = None,
 ) -> list[HashedMetricSpec]:
     """
     Converts a passed metrics widget query to one or more MetricSpecs.
@@ -426,7 +453,7 @@ def convert_widget_query_to_metric(
 
     for aggregate in aggregates:
         metrics_specs += _generate_metric_specs(
-            aggregate, widget_query, project, prefilling, groupbys
+            aggregate, widget_query, project, prefilling, groupbys, organization_bulk_query_cache
         )
 
     return metrics_specs
@@ -438,6 +465,7 @@ def _generate_metric_specs(
     project: Project,
     prefilling: bool,
     groupbys: Sequence[str] | None = None,
+    organization_bulk_query_cache: dict[str, Any] | None = None,
 ) -> list[HashedMetricSpec]:
     metrics_specs = []
     metrics.incr("on_demand_metrics.before_widget_spec_generation")
@@ -452,6 +480,7 @@ def _generate_metric_specs(
         prefilling,
         groupbys=groupbys,
         spec_type=MetricSpecType.DYNAMIC_QUERY,
+        organization_bulk_query_cache=organization_bulk_query_cache,
     ):
         for spec in results:
             _log_on_demand_metric_spec(
@@ -708,6 +737,7 @@ def _convert_aggregate_and_query_to_metrics(
     prefilling: bool,
     spec_type: MetricSpecType = MetricSpecType.SIMPLE_QUERY,
     groupbys: Sequence[str] | None = None,
+    organization_bulk_query_cache: dict[str, Any] | None = None,
 ) -> Sequence[HashedMetricSpec] | None:
     """
     Converts an aggregate and a query to a metric spec with its hash value.
@@ -719,7 +749,9 @@ def _convert_aggregate_and_query_to_metrics(
     # We can avoid injection of the environment in the query, since it's supported by standard, thus it won't change
     # the supported state of a query, since if it's standard, and we added environment it will still be standard
     # and if it's on demand, it will always be on demand irrespectively of what we add.
-    if not should_use_on_demand_metrics(dataset, aggregate, query, groupbys, prefilling):
+    if not should_use_on_demand_metrics(
+        dataset, aggregate, query, groupbys, prefilling, organization_bulk_query_cache
+    ):
         return None
 
     metric_specs_and_hashes = []

+ 7 - 5
src/sentry/snuba/metrics/extraction.py

@@ -47,7 +47,6 @@ from sentry.snuba.dataset import Dataset
 from sentry.snuba.metrics.naming_layer.mri import ParsedMRI, parse_mri
 from sentry.snuba.metrics.utils import MetricOperationType
 from sentry.utils import metrics
-from sentry.utils.cache import cache
 from sentry.utils.hashlib import md5_text
 from sentry.utils.snuba import is_measurement, is_span_op_breakdown, resolve_column
 
@@ -671,15 +670,18 @@ def should_use_on_demand_metrics(
     query: str | None,
     groupbys: Sequence[str] | None = None,
     prefilling: bool = False,
+    organization_bulk_query_cache: dict[str, Any] | None = None,
 ) -> bool:
     if in_random_rollout("on_demand_metrics.cache_should_use_on_demand"):
+        if organization_bulk_query_cache is None:
+            organization_bulk_query_cache = {}
 
         dataset_str = dataset.value if isinstance(dataset, Enum) else str(dataset or "")
         groupbys_str = ",".join(sorted(groupbys)) if groupbys else ""
-        cache_key = md5_text(
+        local_cache_key = md5_text(
             f"{dataset_str}-{aggregate}-{query or ''}-{groupbys_str}-prefilling={prefilling}"
         ).hexdigest()
-        cached_result = cache.get(cache_key)
+        cached_result = organization_bulk_query_cache.get(local_cache_key, None)
         if cached_result:
             metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_hit")
             return cached_result
@@ -687,7 +689,7 @@ def should_use_on_demand_metrics(
             logger.info(
                 "should_use_on_demand_metrics.cache_miss",
                 extra={
-                    "cache_key": cache_key,
+                    "cache_key": local_cache_key,
                 },
             )
             result = _should_use_on_demand_metrics(
@@ -698,7 +700,7 @@ def should_use_on_demand_metrics(
                 prefilling=prefilling,
             )
             metrics.incr("on_demand_metrics.should_use_on_demand_metrics.cache_miss")
-            cache.set(cache_key, result, timeout=5400)
+            organization_bulk_query_cache[local_cache_key] = result
             return result
 
     return _should_use_on_demand_metrics(

+ 24 - 0
tests/sentry/relay/config/test_metric_extraction.py

@@ -12,6 +12,7 @@ from sentry.models.project import Project
 from sentry.models.transaction_threshold import ProjectTransactionThreshold, TransactionMetric
 from sentry.relay.config.experimental import TimeChecker
 from sentry.relay.config.metric_extraction import (
+    _set_bulk_cached_query,
     get_current_widget_specs,
     get_metric_extraction_config,
 )
@@ -762,6 +763,29 @@ def test_get_metric_extraction_config_alerts_and_widgets_off(default_project: Pr
         ]
 
 
+@django_db_all
+def test_get_metric_extraction_config_uses_cache_for_widgets(default_project: Project) -> None:
+    # widgets should be skipped if the feature is off
+    original_set_bulk_cached_query = _set_bulk_cached_query
+
+    with (
+        Feature({ON_DEMAND_METRICS: True, ON_DEMAND_METRICS_WIDGETS: True}),
+        override_options({"on_demand_metrics.cache_should_use_on_demand": 1.0}),
+        mock.patch(
+            "sentry.relay.config.metric_extraction._set_bulk_cached_query"
+        ) as mock_set_cache_spy,
+    ):
+        mock_set_cache_spy.side_effect = original_set_bulk_cached_query
+        create_widget(["count()"], "transaction.duration:>=1000", default_project)
+
+        get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project)
+
+        assert mock_set_cache_spy.call_count == 1
+
+        get_metric_extraction_config(TimeChecker(timedelta(seconds=0)), default_project)
+        assert mock_set_cache_spy.call_count == 1
+
+
 @django_db_all
 def test_get_metric_extraction_config_alerts_and_widgets(default_project: Project) -> None:
     # deduplication should work across alerts and widgets