Browse Source

fix(dynamic-sampling): Add query timeout logging for more functions (#52268)

Riccardo Busetti 1 year ago
parent
commit
a54d21244a

+ 3 - 1
src/sentry/dynamic_sampling/tasks/boost_low_volume_projects.py

@@ -193,7 +193,9 @@ def fetch_projects_with_total_root_transaction_count_and_rates(
             break
             break
     else:
     else:
         log_query_timeout(
         log_query_timeout(
-            query="fetch_projects_with_total_root_transaction_count_and_rates", offset=offset
+            query="fetch_projects_with_total_root_transaction_count_and_rates",
+            offset=offset,
+            timeout_seconds=MAX_SECONDS,
         )
         )
 
 
     return aggregated_projects
     return aggregated_projects

+ 13 - 2
src/sentry/dynamic_sampling/tasks/boost_low_volume_transactions.py

@@ -290,6 +290,11 @@ def get_orgs_with_project_counts(
         last_result = last_result[first_idx:]
         last_result = last_result[first_idx:]
         if not more_results:
         if not more_results:
             break
             break
+    else:
+        log_query_timeout(
+            query="get_orgs_with_project_counts", offset=offset, timeout_seconds=MAX_SECONDS
+        )
+
     if len(last_result) > 0:
     if len(last_result) > 0:
         yield [org_id for org_id, _ in last_result]
         yield [org_id for org_id, _ in last_result]
 
 
@@ -369,7 +374,9 @@ def fetch_project_transaction_totals(org_ids: List[int]) -> Iterator[ProjectTran
             }
             }
 
 
     else:
     else:
-        log_query_timeout(query="fetch_project_transaction_totals", offset=offset)
+        log_query_timeout(
+            query="fetch_project_transaction_totals", offset=offset, timeout_seconds=MAX_SECONDS
+        )
 
 
     return None
     return None
 
 
@@ -495,7 +502,11 @@ def fetch_transactions_with_total_volumes(
                 }
                 }
             break
             break
     else:
     else:
-        log_query_timeout(query="fetch_transactions_with_total_volumes", offset=offset)
+        log_query_timeout(
+            query="fetch_transactions_with_total_volumes",
+            offset=offset,
+            timeout_seconds=MAX_SECONDS,
+        )
 
 
     return None
     return None
 
 

+ 10 - 1
src/sentry/dynamic_sampling/tasks/common.py

@@ -108,6 +108,11 @@ def get_active_orgs_with_projects_counts(
         last_result = last_result[first_idx:]
         last_result = last_result[first_idx:]
         if not more_results:
         if not more_results:
             break
             break
+    else:
+        log_query_timeout(
+            query="get_active_orgs_with_projects_counts", offset=offset, timeout_seconds=MAX_SECONDS
+        )
+
     if len(last_result) > 0:
     if len(last_result) > 0:
         yield [org_id for org_id, _ in last_result]
         yield [org_id for org_id, _ in last_result]
 
 
@@ -175,7 +180,11 @@ def fetch_orgs_with_total_root_transactions_count(
         if not more_results:
         if not more_results:
             break
             break
     else:
     else:
-        log_query_timeout(query="fetch_orgs_with_total_root_transactions_count", offset=offset)
+        log_query_timeout(
+            query="fetch_orgs_with_total_root_transactions_count",
+            offset=offset,
+            timeout_seconds=MAX_SECONDS,
+        )
 
 
     return aggregated_projects
     return aggregated_projects
 
 

+ 11 - 2
src/sentry/dynamic_sampling/tasks/logging.py

@@ -1,6 +1,8 @@
 import logging
 import logging
 from typing import Any, Callable, Dict, Optional
 from typing import Any, Callable, Dict, Optional
 
 
+from sentry.utils import metrics
+
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
 
 
@@ -37,8 +39,15 @@ def log_sample_rate_source(
     )
     )
 
 
 
 
-def log_query_timeout(query: str, offset: int) -> None:
-    logger.error("dynamic_sampling.query_timeout", extra={"query": query, "offset": offset})
+def log_query_timeout(query: str, offset: int, timeout_seconds: int) -> None:
+    logger.error(
+        "dynamic_sampling.query_timeout",
+        extra={"query": query, "offset": offset, "timeout_seconds": timeout_seconds},
+    )
+
+    # We also want to collect a metric, in order to measure how many retries we are having. It may help us to spot
+    # possible problems on the Snuba end that affect query performance.
+    metrics.incr("dynamic_sampling.query_timeout", tags={"query": query})
 
 
 
 
 def log_recalibrate_org_error(org_id: int, error: str) -> None:
 def log_recalibrate_org_error(org_id: int, error: str) -> None:

+ 10 - 2
src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py

@@ -19,7 +19,6 @@ from snuba_sdk import (
 from sentry.dynamic_sampling.tasks.common import get_adjusted_base_rate_from_cache_or_compute
 from sentry.dynamic_sampling.tasks.common import get_adjusted_base_rate_from_cache_or_compute
 from sentry.dynamic_sampling.tasks.constants import (
 from sentry.dynamic_sampling.tasks.constants import (
     MAX_REBALANCE_FACTOR,
     MAX_REBALANCE_FACTOR,
-    MAX_SECONDS,
     MIN_REBALANCE_FACTOR,
     MIN_REBALANCE_FACTOR,
     RECALIBRATE_ORGS_QUERY_INTERVAL,
     RECALIBRATE_ORGS_QUERY_INTERVAL,
 )
 )
@@ -31,6 +30,7 @@ from sentry.dynamic_sampling.tasks.helpers.recalibrate_orgs import (
 )
 )
 from sentry.dynamic_sampling.tasks.logging import (
 from sentry.dynamic_sampling.tasks.logging import (
     log_action_if,
     log_action_if,
+    log_query_timeout,
     log_recalibrate_org_error,
     log_recalibrate_org_error,
     log_recalibrate_org_state,
     log_recalibrate_org_state,
     log_sample_rate_source,
     log_sample_rate_source,
@@ -43,6 +43,10 @@ from sentry.snuba.referrer import Referrer
 from sentry.tasks.base import instrumented_task
 from sentry.tasks.base import instrumented_task
 from sentry.utils.snuba import raw_snql_query
 from sentry.utils.snuba import raw_snql_query
 
 
+# Since we are using a granularity of 60 (minute granularity), we want to have a higher time upper limit for executing
+# multiple queries on Snuba.
+RECALIBRATE_ORGS_MAX_SECONDS = 600
+
 
 
 class RecalibrationError(Exception):
 class RecalibrationError(Exception):
     def __init__(self, org_id, message):
     def __init__(self, org_id, message):
@@ -166,7 +170,7 @@ def get_active_orgs(
     metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
     metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
     offset = 0
     offset = 0
 
 
-    while (time.time() - start_time) < MAX_SECONDS:
+    while (time.time() - start_time) < RECALIBRATE_ORGS_MAX_SECONDS:
         query = (
         query = (
             Query(
             Query(
                 match=Entity(EntityKey.GenericOrgMetricsCounters.value),
                 match=Entity(EntityKey.GenericOrgMetricsCounters.value),
@@ -212,6 +216,10 @@ def get_active_orgs(
 
 
         if not more_results:
         if not more_results:
             return
             return
+    else:
+        log_query_timeout(
+            query="get_active_orgs", offset=offset, timeout_seconds=RECALIBRATE_ORGS_MAX_SECONDS
+        )
 
 
 
 
 def fetch_org_volumes(
 def fetch_org_volumes(

+ 5 - 1
src/sentry/dynamic_sampling/tasks/sliding_window.py

@@ -208,6 +208,10 @@ def fetch_projects_with_total_root_transactions_count(
         if not more_results:
         if not more_results:
             break
             break
     else:
     else:
-        log_query_timeout(query="fetch_projects_with_total_root_transactions_count", offset=offset)
+        log_query_timeout(
+            query="fetch_projects_with_total_root_transactions_count",
+            offset=offset,
+            timeout_seconds=MAX_SECONDS,
+        )
 
 
     return aggregated_projects
     return aggregated_projects