Browse Source

add queue monitoring option (#71778)

We want to monitor the time a task stays on a queue, waiting to get
executed. This helps us debug tasks that are more time-sensitive. An
option is added to control the default behavior.
Yash Kamothi 9 months ago
parent
commit
48010596a4

+ 1 - 0
src/sentry/middleware/integrations/tasks.py

@@ -22,6 +22,7 @@ logger = logging.getLogger(__name__)
     silo_mode=SiloMode.CONTROL,
     max_retries=2,
     default_retry_delay=5,
+    record_timing=True,
 )
 def convert_to_async_slack_response(
     region_names: list[str],

+ 7 - 0
src/sentry/options/defaults.py

@@ -2652,3 +2652,10 @@ register(
     default=[],
     flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
 )
+
+register(
+    "sentry-metrics.monitor-queue-time",
+    type=Bool,
+    default=False,
+    flags=FLAG_AUTOMATOR_MODIFIABLE,
+)

+ 16 - 1
src/sentry/tasks/base.py

@@ -94,6 +94,21 @@ def instrumented_task(name, stat_suffix=None, silo_mode=None, record_timing=Fals
     def wrapped(func):
         @wraps(func)
         def _wrapped(*args, **kwargs):
+            record_queue_wait_time = record_timing
+
+            # Use a try/catch here to contain the blast radius of an exception being unhandled through the options lib
+            # Unhandled exception could cause all tasks to be effected and not work
+            try:
+                from sentry import options
+
+                # Use option to control default behavior of queue time monitoring
+                # Value can be dynamically updated, which is why the evaluation happens during function run-time
+                record_queue_wait_time = record_queue_wait_time or options.get(
+                    "sentry-metrics.monitor-queue-time"
+                )
+            except Exception:
+                pass
+
             # TODO(dcramer): we want to tag a transaction ID, but overriding
             # the base on app.task seems to cause problems w/ Celery internals
             transaction_id = kwargs.pop("__transaction_id", None)
@@ -105,7 +120,7 @@ def instrumented_task(name, stat_suffix=None, silo_mode=None, record_timing=Fals
             else:
                 instance = name
 
-            if start_time and record_timing:
+            if start_time and record_queue_wait_time:
                 curr_time = datetime.now().timestamp()
                 duration = (curr_time - start_time) * 1000
                 metrics.distribution(