Просмотр исходного кода

feat(backpressure): Proof of concept for queue monitoring service (#50315)

Third time's the charm. https://github.com/getsentry/sentry/pull/50296

---------

Co-authored-by: Pierre Massat <pierre.massat@sentry.io>
Co-authored-by: Arpad Borsos <arpad.borsos@sentry.io>
Sebastian Zivota 1 год назад
Родитель
Сommit
23e09e524d

+ 4 - 0
src/sentry/conf/server.py

@@ -3427,3 +3427,7 @@ MAX_ENVIRONMENTS_PER_MONITOR = 1000
 SENTRY_METRICS_INDEXER_RAISE_VALIDATION_ERRORS = False
 
 SENTRY_FILE_COPY_ROLLOUT_RATE = 0.01
+
+# The Redis cluster to use for monitoring the health of
+# Celery queues.
+SENTRY_QUEUE_MONITORING_REDIS_CLUSTER = "default"

+ 97 - 1
src/sentry/monitoring/queues.py

@@ -1,8 +1,19 @@
+from threading import Thread
+from time import sleep
+from typing import List, Tuple
 from urllib.parse import urlparse
 
+import sentry_sdk
 from django.conf import settings
 from django.utils.functional import cached_property
 
+from sentry import options
+from sentry.utils import redis
+
+QUEUES = ["profiles.process"]
+
+KEY_NAME = "unhealthy-queues"
+
 
 class RedisBackend:
     def __init__(self, broker_url):
@@ -38,7 +49,7 @@ class AmqpBackend:
             host="%s:%d" % (host, port),
             userid=dsn.username,
             password=dsn.password,
-            virtual_host=dsn.path[1:],
+            virtual_host=dsn.path[1:] or "/",
         )
 
     def get_conn(self):
@@ -94,3 +105,88 @@ try:
     backend = get_backend_for_broker(settings.BROKER_URL)
 except KeyError:
     backend = None
+
+
+queue_monitoring_cluster = redis.redis_clusters.get(settings.SENTRY_QUEUE_MONITORING_REDIS_CLUSTER)
+
+
+def _unhealthy_queue_key(queue_name: str) -> str:
+    return f"{KEY_NAME}:{queue_name}"
+
+
+def is_queue_healthy(queue_name: str) -> bool:
+    """Checks whether the given queue is healthy by looking it up in Redis.
+
+    NB: If the queue is not found in Redis, it is assumed to be healthy.
+    This behavior might change in the future.
+    """
+
+    if not options.get("backpressure.monitor_queues.enable"):
+        return True
+    # check if queue is healthy by pinging Redis
+    try:
+        healthy = queue_monitoring_cluster.exists(_unhealthy_queue_key(queue_name))
+    except Exception:
+        healthy = False
+    return healthy
+
+
+def _is_healthy(queue_size) -> bool:
+    return queue_size < options.get("backpressure.monitor_queues.unhealthy_threshold")
+
+
+def _update_queue_stats(redis_cluster, queue_health: List[Tuple[str, bool]]) -> None:
+    unhealthy = [queue for (queue, unhealthy) in queue_health if unhealthy]
+    if unhealthy:
+        # Report list of unhealthy queues to sentry
+        with sentry_sdk.push_scope() as scope:
+            scope.set_extra("unhealthy_queues", unhealthy)
+            sentry_sdk.capture_message("RabbitMQ queues are exceeding size threshold")
+
+    with redis_cluster.pipeline(transaction=True) as pipeline:
+        for (queue, unhealthy) in queue_health:
+            if unhealthy:
+                pipeline.set(_unhealthy_queue_key(queue), "1", ex=60)
+            else:
+                pipeline.delete(_unhealthy_queue_key(queue))
+        pipeline.execute()
+
+
+def _run_queue_stats_updater(redis_cluster: str) -> None:
+    # bonus point if we manage to use asyncio and launch all tasks at once
+    # in case we have many queues to check
+    cluster = redis.redis_clusters.get(redis_cluster)
+
+    queue_history = {queue: 0 for queue in QUEUES}
+    while True:
+        if not options.get("backpressure.monitor_queues.enable"):
+            sleep(10)
+            continue
+
+        try:
+            new_sizes = backend.bulk_get_sizes(QUEUES)
+            for (queue, size) in new_sizes:
+                if _is_healthy(size):
+                    queue_history[queue] = 0
+                else:
+                    queue_history[queue] += 1
+        except Exception:
+            # If there was an error getting queue sizes from RabbitMQ, assume
+            # all queues are unhealthy
+            for queue in QUEUES:
+                queue_history[queue] += 1
+
+        strike_threshold = options.get("backpressure.monitor_queues.strike_threshold")
+        queue_health = [(queue, count >= strike_threshold) for (queue, count) in queue_history]
+        _update_queue_stats(cluster, queue_health)
+        sleep(options.get("backpressure.monitor_queues.check_interval"))
+
+
+def monitor_queues():
+    if backend is None:
+        return
+    queue_stats_updater_process = Thread(
+        target=_run_queue_stats_updater,
+        args=(settings.SENTRY_QUEUE_MONITORING_REDIS_CLUSTER,),
+    )
+    queue_stats_updater_process.start()

+ 8 - 0
src/sentry/options/defaults.py

@@ -718,3 +718,11 @@ register("txnames.bump-lifetime-sample-rate", default=0.1)
 register("span_descs.bump-lifetime-sample-rate", default=0.25)
 # Decides whether artifact bundles asynchronous renewal is enabled.
 register("sourcemaps.artifact-bundles.enable-renewal", default=0.0)
+# Enables queue monitoring for backpressure management.
+register("backpressure.monitor_queues.enable", default=False)
+register("backpressure.monitor_queues.unhealthy_threshold", default=1000)
+# How often we check queue health.
+register("backpressure.monitor_queues.check_interval", default=5)
+# How many times in a row a queue must be unhealthy before it is
+# recorded in Redis. 12 * 5sec = unhealthy for 1 minute.
+register("backpressure.monitor_queues.strike_threshold", default=12)

+ 12 - 1
src/sentry/profiles/consumers/process/factory.py

@@ -1,19 +1,30 @@
 from typing import Mapping
 
 from arroyo.backends.kafka.consumer import KafkaPayload
-from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
+from arroyo.processing.strategies.abstract import (
+    MessageRejected,
+    ProcessingStrategy,
+    ProcessingStrategyFactory,
+)
 from arroyo.processing.strategies.commit import CommitOffsets
 from arroyo.processing.strategies.run_task import RunTask
 from arroyo.types import Commit, Message, Partition
 
+from sentry.monitoring.queues import is_queue_healthy, monitor_queues
 from sentry.profiles.task import process_profile_task
 
 
 def process_message(message: Message[KafkaPayload]) -> None:
+    if not is_queue_healthy("profiles.process"):
+        raise MessageRejected()
     process_profile_task.s(payload=message.payload.value).apply_async()
 
 
 class ProcessProfileStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
+    def __init__(self) -> None:
+        super().__init__()
+        monitor_queues()
+
     def create_with_partitions(
         self,
         commit: Commit,