Browse Source

feat(backpressure): Add support for redis rb.Cluster client (#53051)

Adding support for `rb.Cluster` configurations because it's needed in
production. Single instances will no longer use the simple client but
will be treated as on node `rb.Cluster`.
Michal Kuffa 1 year ago
parent
commit
2d468f46f1

+ 11 - 9
src/sentry/processing/backpressure/memory.py

@@ -1,8 +1,8 @@
 from dataclasses import dataclass
 from typing import Any, Generator, Mapping, Union
 
+import rb
 import requests
-from redis import Redis
 from rediscluster import RedisCluster
 
 
@@ -35,11 +35,9 @@ def query_rabbitmq_memory_usage(host: str) -> ServiceMemory:
 
 # Based on configuration, this could be:
 # - a `rediscluster` Cluster (actually `RetryingRedisCluster`)
-# - a straight `Redis` client (actually `FailoverRedis`)
+# - a `rb.Cluster` (client side routing cluster client)
 # - or any class configured via `client_class`.
-# It could in theory also be a `rb` (aka redis blaster) Cluster, but we
-# intentionally do not support these.
-Cluster = Union[RedisCluster, Redis]
+Cluster = Union[RedisCluster, rb.Cluster]
 
 
 def get_memory_usage(info: Mapping[str, Any]) -> ServiceMemory:
@@ -57,8 +55,12 @@ def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None
     """
     if isinstance(cluster, RedisCluster):
         # `RedisCluster` returns these as a dictionary, with the node-id as key
-        for info in cluster.info().values():
-            yield get_memory_usage(info)
+        cluster_info = cluster.info()
     else:
-        # otherwise, lets just hope that `info()` does the right thing
-        yield get_memory_usage(cluster.info())
+        # rb.Cluster returns a promise with a dictionary with a _local_ node-id as key
+        with cluster.all() as client:
+            promise = client.info()
+        cluster_info = promise.value
+
+    for info in cluster_info.values():
+        yield get_memory_usage(info)

+ 4 - 1
src/sentry/processing/backpressure/monitor.py

@@ -53,7 +53,10 @@ def load_service_definitions() -> Dict[str, Service]:
     services: Dict[str, Service] = {}
     for name, definition in settings.SENTRY_PROCESSING_SERVICES.items():
         if cluster_id := definition.get("redis"):
-            cluster = redis.redis_clusters.get(cluster_id)
+            _is_clsuter, cluster, _config = redis.get_dynamic_cluster_from_options(
+                setting=f"SENTRY_PROCESSING_SERVICES[{name}]",
+                config={"cluster": cluster_id},
+            )
             services[name] = Redis(cluster)
 
         elif rabbitmq_urls := definition.get("rabbitmq"):

+ 3 - 1
tests/sentry/processing/backpressure/test_monitoring.py

@@ -30,7 +30,9 @@ def test_loading_definitions() -> None:
 
 
 def test_check_redis_health() -> None:
-    cluster = redis.redis_clusters.get("default")
+    _, cluster, _ = redis.get_dynamic_cluster_from_options(
+        setting="tess", config={"cluster": "default"}
+    )
     services = {"redis": Redis(cluster)}
 
     with override_options(

+ 9 - 3
tests/sentry/processing/backpressure/test_redis.py

@@ -1,11 +1,17 @@
+from django.test.utils import override_settings
+
 from sentry.processing.backpressure.memory import iter_cluster_memory_usage
-from sentry.utils import redis
+from sentry.processing.backpressure.monitor import Redis, load_service_definitions
 
 
 def test_returns_some_usage() -> None:
-    cluster = redis.redis_clusters.get("default")
+    with override_settings(SENTRY_PROCESSING_SERVICES={"redis": {"redis": "default"}}):
+        services = load_service_definitions()
+
+    redis_service = services["redis"]
+    assert isinstance(redis_service, Redis)
 
-    usage = [usage for usage in iter_cluster_memory_usage(cluster)]
+    usage = [usage for usage in iter_cluster_memory_usage(redis_service.cluster)]
     assert len(usage) > 0
     memory = usage[0]
     assert memory.used > 0