Browse Source

ref(kvstore): Add Redis backend, cache backend compatibility wrapper (#26014)

ted kaemming 3 years ago
parent
commit
9d0869ef04

+ 14 - 2
src/sentry/cache/base.py

@@ -1,8 +1,20 @@
 from threading import local
+from typing import Any
 
 from django.conf import settings
 
 
+def wrap_key(prefix: str, version: Any, key: Any) -> str:
+    return f"{prefix}:{version}:{key}"
+
+
+def unwrap_key(prefix: str, version: Any, value: str) -> str:
+    header = f"{prefix}:{version}"
+    if value[: len(header)] != header:
+        raise ValueError("invalid key header")
+    return value[len(header) + 1 :]
+
+
 class BaseCache(local):
     prefix = "c"
 
@@ -11,8 +23,8 @@ class BaseCache(local):
         if prefix is not None:
             self.prefix = prefix
 
-    def make_key(self, key, version=None):
-        return f"{self.prefix}:{version or self.version}:{key}"
+    def make_key(self, key, version=None) -> str:
+        return wrap_key(self.prefix, version or self.version, key)
 
     def set(self, key, value, timeout, version=None, raw=False):
         raise NotImplementedError

+ 59 - 3
src/sentry/utils/kvstore/cache.py

@@ -1,8 +1,10 @@
 from datetime import timedelta
-from typing import Any, Optional
+from typing import Any, Iterator, Optional, Sequence, Tuple
 
-from sentry.cache.base import BaseCache
-from sentry.utils.kvstore.abstract import KVStorage
+from django.conf import settings
+
+from sentry.cache.base import BaseCache, unwrap_key, wrap_key
+from sentry.utils.kvstore.abstract import KVStorage, V
 
 
 class CacheKVStorage(KVStorage[Any, Any]):
@@ -49,3 +51,57 @@ class CacheKVStorage(KVStorage[Any, Any]):
         # running this against a real deployment (it could be helpful to
         # destroy a development environment, though.)
         pass
+
+
+class CacheKeyWrapper(KVStorage[str, V]):
+    """
+    This class implements a compatibility layer for interacting with storages
+    that have existing data written with cache key prefixes.
+    """
+
+    # XXX: ``keys`` must be ``str`` to avoid type mismatches when returning
+    # unwrapped values (e.g. from ``get_many``), even though the write path
+    # would accept ``Any`` type.
+
+    def __init__(
+        self,
+        storage: KVStorage[str, V],
+        prefix: str = BaseCache.prefix,
+        version: Optional[Any] = None,
+    ):
+        if version is None:
+            version = settings.CACHE_VERSION
+
+        self.storage = storage
+        self.prefix = prefix
+        self.version = version
+
+    def get(self, key: str) -> Optional[V]:
+        return self.storage.get(wrap_key(self.prefix, self.version, key))
+
+    def get_many(self, keys: Sequence[str]) -> Iterator[Tuple[str, V]]:
+        results = self.storage.get_many([wrap_key(self.prefix, self.version, key) for key in keys])
+        for key, value in results:
+            yield unwrap_key(self.prefix, self.version, key), value
+
+    def set(self, key: str, value: V, ttl: Optional[timedelta] = None) -> None:
+        return self.storage.set(
+            wrap_key(self.prefix, self.version, key),
+            value,
+            ttl,
+        )
+
+    def delete(self, key: str) -> None:
+        self.storage.delete(wrap_key(self.prefix, self.version, key))
+
+    def delete_many(self, keys: Sequence[str]) -> None:
+        return self.storage.delete_many([wrap_key(self.prefix, self.version, key) for key in keys])
+
+    def bootstrap(self) -> None:
+        self.storage.bootstrap()
+
+    def destroy(self) -> None:
+        # ``destroy`` is not implemented since the cache key prefix implies this
+        # is a shared keyspace, and suggests that this may cause collateral
+        # damage to other storage instances
+        raise NotImplementedError

+ 31 - 0
src/sentry/utils/kvstore/redis.py

@@ -0,0 +1,31 @@
+from datetime import timedelta
+from typing import Optional
+
+from redis import Redis
+
+from sentry.utils.kvstore.abstract import KVStorage
+
+
+class RedisKVStorage(KVStorage[str, bytes]):
+    """
+    This class provides a key/value store backed by Redis (either a single node
+    or cluster.)
+    """
+
+    def __init__(self, client: "Redis[bytes]") -> None:
+        self.client = client
+
+    def get(self, key: str) -> Optional[bytes]:
+        return self.client.get(key.encode("utf8"))
+
+    def set(self, key: str, value: bytes, ttl: Optional[timedelta] = None) -> None:
+        self.client.set(key.encode("utf8"), value, ex=ttl)
+
+    def delete(self, key: str) -> None:
+        self.client.delete(key.encode("utf8"))
+
+    def bootstrap(self) -> None:
+        pass  # nothing to do
+
+    def destroy(self) -> None:
+        self.client.flushdb()

+ 26 - 1
tests/sentry/utils/kvstore/test_common.py

@@ -1,5 +1,6 @@
 import itertools
 from dataclasses import dataclass
+from datetime import timedelta
 from typing import Iterator, Tuple
 
 import pytest
@@ -18,7 +19,7 @@ class Properties:
         return zip(self.keys, self.values)
 
 
-@pytest.fixture(params=["bigtable", "cache/default", "memory"])
+@pytest.fixture(params=["bigtable", "cache/default", "memory", "memory+cachewrapper", "redis"])
 def properties(request) -> Properties:
     if request.param == "bigtable":
         from tests.sentry.utils.kvstore.test_bigtable import create_store, get_credentials
@@ -53,6 +54,25 @@ def properties(request) -> Properties:
             keys=itertools.count(),
             values=itertools.count(),
         )
+    elif request.param == "redis":
+        from redis import Redis
+
+        from sentry.utils.kvstore.redis import RedisKVStorage
+
+        return Properties(
+            RedisKVStorage(Redis(db=6)),
+            keys=(f"kvstore/{i}" for i in itertools.count()),
+            values=(f"{i}".encode("utf8") for i in itertools.count()),
+        )
+    elif request.param == "memory+cachewrapper":
+        from sentry.utils.kvstore.cache import CacheKeyWrapper
+        from sentry.utils.kvstore.memory import MemoryKVStorage
+
+        return Properties(
+            CacheKeyWrapper(MemoryKVStorage()),
+            keys=map(str, itertools.count()),
+            values=itertools.count(),
+        )
     else:
         raise ValueError("unknown kvstore label")
 
@@ -70,6 +90,11 @@ def test_single_key_operations(properties: Properties) -> None:
     store.set(key, new_value)
     assert store.get(key) == new_value
 
+    # Test overwriting a key with a new TTL.
+    new_value = next(properties.values)
+    store.set(key, new_value, ttl=timedelta(seconds=30))
+    assert store.get(key) == new_value
+
     # Test deleting an existing key.
     store.delete(key)
     assert store.get(key) is None

+ 35 - 0
tests/sentry/utils/kvstore/test_compat.py

@@ -0,0 +1,35 @@
+from redis import Redis
+
+from sentry.cache.redis import CommonRedisCache
+from sentry.utils.codecs import BytesCodec, JSONCodec
+from sentry.utils.kvstore.cache import CacheKeyWrapper, CacheKVStorage
+from sentry.utils.kvstore.encoding import KVStorageCodecWrapper
+from sentry.utils.kvstore.redis import RedisKVStorage
+
+
+def test_redis_cache_compat() -> None:
+    redis = Redis(db=6)
+    version = 5
+    prefix = "test"
+
+    cache_backend = CacheKVStorage(CommonRedisCache(redis, version=version, prefix=prefix))
+    redis_backend = KVStorageCodecWrapper(
+        CacheKeyWrapper(RedisKVStorage(redis), version=version, prefix=prefix),
+        JSONCodec() | BytesCodec(),
+    )
+
+    key = "key"
+
+    value = [1, 2, 3]
+    cache_backend.set(key, value)
+    assert cache_backend.get(key) == value
+    assert redis_backend.get(key) == value
+
+    value = [4, 5, 6]
+    redis_backend.set("key", value)
+    assert cache_backend.get(key) == value
+    assert redis_backend.get(key) == value
+
+    cache_backend.delete("key")
+    assert cache_backend.get("key") is None
+    assert redis_backend.get("key") is None