|
@@ -1,11 +1,10 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import functools
|
|
|
import importlib.resources
|
|
|
import logging
|
|
|
from copy import deepcopy
|
|
|
from threading import Lock
|
|
|
-from typing import Any, Generic, TypeGuard, TypeVar, overload
|
|
|
+from typing import Any, TypeGuard
|
|
|
|
|
|
import rb
|
|
|
from django.utils.functional import SimpleLazyObject
|
|
@@ -52,21 +51,17 @@ def _shared_pool(**opts: Any) -> ConnectionPool:
|
|
|
return pool
|
|
|
|
|
|
|
|
|
-_make_rb_cluster = functools.partial(rb.Cluster, pool_cls=_shared_pool)
|
|
|
+class RBClusterManager:
|
|
|
+ def __init__(self, options_manager: OptionsManager) -> None:
|
|
|
+ self._clusters: dict[str, rb.Cluster] = {}
|
|
|
+ self._options_manager = options_manager
|
|
|
|
|
|
-
|
|
|
-class _RBCluster:
|
|
|
- def supports(self, config: dict[str, Any]) -> bool:
|
|
|
- return not config.get("is_redis_cluster", False)
|
|
|
-
|
|
|
- def factory(
|
|
|
+ def _factory(
|
|
|
self,
|
|
|
- decode_responses: bool | None = None,
|
|
|
+ *,
|
|
|
hosts: list[dict[int, Any]] | dict[int, Any] | None = None,
|
|
|
**config: Any,
|
|
|
) -> rb.Cluster:
|
|
|
- if not decode_responses:
|
|
|
- raise NotImplementedError("decode_responses=False mode is not implemented for `rb`")
|
|
|
if not hosts:
|
|
|
hosts = []
|
|
|
# rb expects a dict of { host, port } dicts where the key is the host
|
|
@@ -78,10 +73,26 @@ class _RBCluster:
|
|
|
pool_options = {**_REDIS_DEFAULT_CLIENT_ARGS, **pool_options}
|
|
|
config["pool_options"] = pool_options
|
|
|
|
|
|
- return _make_rb_cluster(**config)
|
|
|
+ return rb.Cluster(**config, pool_cls=_shared_pool)
|
|
|
|
|
|
- def __str__(self) -> str:
|
|
|
- return "Redis Blaster Cluster"
|
|
|
+ def get(self, key: str, *, decode_responses: bool = True) -> rb.Cluster:
|
|
|
+ if not decode_responses:
|
|
|
+ raise NotImplementedError("rb does not support decode_responses")
|
|
|
+
|
|
|
+ try:
|
|
|
+ return self._clusters[key]
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ cfg = self._options_manager.get("redis.clusters", {}).get(key)
|
|
|
+ if cfg is None:
|
|
|
+ raise KeyError(f"Invalid cluster name: {key}")
|
|
|
+
|
|
|
+ if cfg.get("is_redis_cluster", False):
|
|
|
+ raise KeyError("Invalid cluster type, expected rb cluster")
|
|
|
+
|
|
|
+ ret = self._clusters[key] = self._factory(**cfg)
|
|
|
+ return ret
|
|
|
|
|
|
|
|
|
class _RedisCluster:
|
|
@@ -101,7 +112,7 @@ class _RedisCluster:
|
|
|
hosts: list[dict[Any, Any]] | dict[Any, Any] | None = None,
|
|
|
client_args: dict[str, Any] | None = None,
|
|
|
**config: Any,
|
|
|
- ) -> SimpleLazyObject:
|
|
|
+ ) -> RetryingRedisCluster | FailoverRedis:
|
|
|
# StrictRedisCluster expects a list of { host, port } dicts. Coerce the
|
|
|
# configuration into the correct format if necessary.
|
|
|
if not hosts:
|
|
@@ -142,72 +153,57 @@ class _RedisCluster:
|
|
|
host["decode_responses"] = decode_responses
|
|
|
return FailoverRedis(**host, **client_args)
|
|
|
|
|
|
- return SimpleLazyObject(cluster_factory)
|
|
|
+ # losing some type safety: SimpleLazyObject acts like the underlying type
|
|
|
+ return SimpleLazyObject(cluster_factory) # type: ignore[return-value]
|
|
|
|
|
|
def __str__(self) -> str:
|
|
|
return "Redis Cluster"
|
|
|
|
|
|
|
|
|
-TCluster = TypeVar("TCluster", rb.Cluster, RedisCluster | StrictRedis)
|
|
|
-
|
|
|
-
|
|
|
-class ClusterManager(Generic[TCluster]):
|
|
|
- @overload
|
|
|
- def __init__(self: ClusterManager[rb.Cluster], options_manager: OptionsManager) -> None:
|
|
|
- ...
|
|
|
-
|
|
|
- @overload
|
|
|
- def __init__(
|
|
|
- self: ClusterManager[RedisCluster | StrictRedis],
|
|
|
- options_manager: OptionsManager,
|
|
|
- cluster_type: type[Any],
|
|
|
- ) -> None:
|
|
|
- ...
|
|
|
-
|
|
|
- def __init__(self, options_manager: OptionsManager, cluster_type: type[Any] = _RBCluster):
|
|
|
- self.__clusters: dict[tuple[str, bool], TCluster] = {}
|
|
|
+class RedisClusterManager:
|
|
|
+ def __init__(self, options_manager: OptionsManager) -> None:
|
|
|
+ self.__clusters: dict[tuple[str, bool], RedisCluster | StrictRedis] = {}
|
|
|
self.__options_manager = options_manager
|
|
|
- self.__cluster_type = cluster_type()
|
|
|
+ self.__cluster_type = _RedisCluster()
|
|
|
|
|
|
- def get(self, key: str, *, decode_responses: bool = True) -> TCluster:
|
|
|
+ def get(self, key: str, *, decode_responses: bool = True) -> RedisCluster | StrictRedis:
|
|
|
cache_key = (key, decode_responses)
|
|
|
try:
|
|
|
return self.__clusters[cache_key]
|
|
|
except KeyError:
|
|
|
- # Do not access attributes of the `cluster` object to prevent
|
|
|
- # setup/init of lazy objects. The _RedisCluster type will try to
|
|
|
- # connect to the cluster during initialization.
|
|
|
-
|
|
|
- # TODO: This would probably be safer with a lock, but I'm not sure
|
|
|
- # that it's necessary.
|
|
|
- configuration = self.__options_manager.get("redis.clusters", {}).get(key)
|
|
|
- if configuration is None:
|
|
|
- raise KeyError(f"Invalid cluster name: {key}")
|
|
|
-
|
|
|
- if not self.__cluster_type.supports(configuration):
|
|
|
- raise KeyError(f"Invalid cluster type, expected: {self.__cluster_type}")
|
|
|
-
|
|
|
- ret = self.__clusters[cache_key] = self.__cluster_type.factory(
|
|
|
- **configuration,
|
|
|
- decode_responses=decode_responses,
|
|
|
- )
|
|
|
- return ret
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Do not access attributes of the `cluster` object to prevent
|
|
|
+ # setup/init of lazy objects. The _RedisCluster type will try to
|
|
|
+ # connect to the cluster during initialization.
|
|
|
+
|
|
|
+ # TODO: This would probably be safer with a lock, but I'm not sure
|
|
|
+ # that it's necessary.
|
|
|
+ cfg = self.__options_manager.get("redis.clusters", {}).get(key)
|
|
|
+ if cfg is None:
|
|
|
+ raise KeyError(f"Invalid cluster name: {key}")
|
|
|
+
|
|
|
+ if not self.__cluster_type.supports(cfg):
|
|
|
+ raise KeyError(f"Invalid cluster type, expected: {self.__cluster_type}")
|
|
|
+
|
|
|
+ ret = self.__clusters[cache_key] = self.__cluster_type.factory(
|
|
|
+ **cfg, decode_responses=decode_responses
|
|
|
+ )
|
|
|
+ return ret
|
|
|
|
|
|
|
|
|
# TODO(epurkhiser): When migration of all rb cluster to true redis clusters has
|
|
|
# completed, remove the rb ``clusters`` module variable and rename
|
|
|
# redis_clusters to clusters.
|
|
|
-clusters: ClusterManager[rb.Cluster] = ClusterManager(options.default_manager)
|
|
|
-redis_clusters: ClusterManager[RedisCluster | StrictRedis] = ClusterManager(
|
|
|
- options.default_manager, _RedisCluster
|
|
|
-)
|
|
|
+clusters = RBClusterManager(options.default_manager)
|
|
|
+redis_clusters = RedisClusterManager(options.default_manager)
|
|
|
|
|
|
|
|
|
def get_cluster_from_options(
|
|
|
setting: str,
|
|
|
options: dict[str, Any],
|
|
|
- cluster_manager: ClusterManager = clusters,
|
|
|
-) -> tuple[rb.Cluster | RedisCluster | StrictRedis, dict[str, Any]]:
|
|
|
+ cluster_manager: RBClusterManager = clusters,
|
|
|
+) -> tuple[rb.Cluster, dict[str, Any]]:
|
|
|
cluster_option_name = "cluster"
|
|
|
default_cluster_name = "default"
|
|
|
cluster_constructor_option_names = frozenset(("hosts",))
|