|
@@ -6,8 +6,10 @@ import threading
|
|
|
from datetime import date, datetime, timezone
|
|
|
from time import time
|
|
|
|
|
|
+import rb
|
|
|
from django.db import models
|
|
|
from django.utils.encoding import force_bytes, force_str
|
|
|
+from rediscluster import RedisCluster
|
|
|
|
|
|
from sentry.buffer.base import Buffer
|
|
|
from sentry.tasks.process_buffer import process_incr, process_pending
|
|
@@ -15,7 +17,12 @@ from sentry.utils import json, metrics
|
|
|
from sentry.utils.compat import crc32
|
|
|
from sentry.utils.hashlib import md5_text
|
|
|
from sentry.utils.imports import import_string
|
|
|
-from sentry.utils.redis import get_dynamic_cluster_from_options, validate_dynamic_cluster
|
|
|
+from sentry.utils.redis import (
|
|
|
+ get_dynamic_cluster_from_options,
|
|
|
+ is_instance_rb_cluster,
|
|
|
+ is_instance_redis_cluster,
|
|
|
+ validate_dynamic_cluster,
|
|
|
+)
|
|
|
|
|
|
_local_buffers = None
|
|
|
_local_buffers_lock = threading.Lock()
|
|
@@ -82,11 +89,13 @@ class RedisBuffer(Buffer):
|
|
|
assert self.pending_partitions > 0
|
|
|
assert self.incr_batch_size > 0
|
|
|
|
|
|
- def get_routing_client(self):
|
|
|
- if self.is_redis_cluster:
|
|
|
+ def get_routing_client(self) -> RedisCluster | rb.RoutingClient:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
return self.cluster
|
|
|
- else:
|
|
|
+ elif is_instance_rb_cluster(self.cluster, self.is_redis_cluster):
|
|
|
return self.cluster.get_routing_client()
|
|
|
+ else:
|
|
|
+ raise AssertionError("unreachable")
|
|
|
|
|
|
def validate(self):
|
|
|
validate_dynamic_cluster(self.is_redis_cluster, self.cluster)
|
|
@@ -182,11 +191,13 @@ class RedisBuffer(Buffer):
|
|
|
Fetches buffered values for a model/filter. Passed columns must be integer columns.
|
|
|
"""
|
|
|
key = self._make_key(model, filters)
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
pipe = self.cluster.pipeline(transaction=False)
|
|
|
- else:
|
|
|
+ elif is_instance_rb_cluster(self.cluster, self.is_redis_cluster):
|
|
|
conn = self.cluster.get_local_client_for_key(key)
|
|
|
pipe = conn.pipeline()
|
|
|
+ else:
|
|
|
+ raise AssertionError("unreachable")
|
|
|
|
|
|
for col in columns:
|
|
|
pipe.hget(key, f"i+{col}")
|
|
@@ -211,16 +222,18 @@ class RedisBuffer(Buffer):
|
|
|
pending_key = self._make_pending_key_from_key(key)
|
|
|
# We can't use conn.map() due to wanting to support multiple pending
|
|
|
# keys (one per Redis partition)
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
conn = self.cluster
|
|
|
- else:
|
|
|
+ elif is_instance_rb_cluster(self.cluster, self.is_redis_cluster):
|
|
|
conn = self.cluster.get_local_client_for_key(key)
|
|
|
+ else:
|
|
|
+ raise AssertionError("unreachable")
|
|
|
|
|
|
pipe = conn.pipeline()
|
|
|
pipe.hsetnx(key, "m", f"{model.__module__}.{model.__name__}")
|
|
|
_validate_json_roundtrip(filters, model)
|
|
|
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
pipe.hsetnx(key, "f", json.dumps(self._dump_values(filters)))
|
|
|
else:
|
|
|
pipe.hsetnx(key, "f", pickle.dumps(filters))
|
|
@@ -234,7 +247,7 @@ class RedisBuffer(Buffer):
|
|
|
# e.g. "update score if last_seen or times_seen is changed"
|
|
|
_validate_json_roundtrip(extra, model)
|
|
|
for column, value in extra.items():
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
pipe.hset(key, "e+" + column, json.dumps(self._dump_value(value)))
|
|
|
else:
|
|
|
pipe.hset(key, "e+" + column, pickle.dumps(value))
|
|
@@ -263,10 +276,7 @@ class RedisBuffer(Buffer):
|
|
|
# super fast and is fine to do redundantly.
|
|
|
|
|
|
pending_key = self._make_pending_key(partition)
|
|
|
- if self.is_redis_cluster:
|
|
|
- client = self.cluster
|
|
|
- else:
|
|
|
- client = self.cluster.get_routing_client()
|
|
|
+ client = self.get_routing_client()
|
|
|
lock_key = self._make_lock_key(pending_key)
|
|
|
# prevent a stampede due to celerybeat + periodic task
|
|
|
if not client.set(lock_key, "1", nx=True, ex=60):
|
|
@@ -276,7 +286,7 @@ class RedisBuffer(Buffer):
|
|
|
|
|
|
try:
|
|
|
keycount = 0
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
keys = self.cluster.zrange(pending_key, 0, -1)
|
|
|
keycount += len(keys)
|
|
|
|
|
@@ -286,22 +296,24 @@ class RedisBuffer(Buffer):
|
|
|
process_incr.apply_async(kwargs={"batch_keys": pending_buffer.flush()})
|
|
|
|
|
|
self.cluster.zrem(pending_key, *keys)
|
|
|
- else:
|
|
|
+ elif is_instance_rb_cluster(self.cluster, self.is_redis_cluster):
|
|
|
with self.cluster.all() as conn:
|
|
|
results = conn.zrange(pending_key, 0, -1)
|
|
|
|
|
|
with self.cluster.all() as conn:
|
|
|
- for host_id, keys in results.value.items():
|
|
|
- if not keys:
|
|
|
+ for host_id, keysb in results.value.items():
|
|
|
+ if not keysb:
|
|
|
continue
|
|
|
- keycount += len(keys)
|
|
|
- for key in keys:
|
|
|
- pending_buffer.append(key.decode("utf-8"))
|
|
|
+ keycount += len(keysb)
|
|
|
+ for keyb in keysb:
|
|
|
+ pending_buffer.append(keyb.decode("utf-8"))
|
|
|
if pending_buffer.full():
|
|
|
process_incr.apply_async(
|
|
|
kwargs={"batch_keys": pending_buffer.flush()}
|
|
|
)
|
|
|
- conn.target([host_id]).zrem(pending_key, *keys)
|
|
|
+ conn.target([host_id]).zrem(pending_key, *keysb)
|
|
|
+ else:
|
|
|
+ raise AssertionError("unreachable")
|
|
|
|
|
|
# queue up remainder of pending keys
|
|
|
if not pending_buffer.empty():
|
|
@@ -325,11 +337,7 @@ class RedisBuffer(Buffer):
|
|
|
return super().process(model, columns, filters, extra, signal_only)
|
|
|
|
|
|
def _process_single_incr(self, key):
|
|
|
- if self.is_redis_cluster:
|
|
|
- client = self.cluster
|
|
|
- else:
|
|
|
- client = self.cluster.get_routing_client()
|
|
|
-
|
|
|
+ client = self.get_routing_client()
|
|
|
lock_key = self._make_lock_key(key)
|
|
|
# prevent a stampede due to the way we use celery etas + duplicate
|
|
|
# tasks
|
|
@@ -341,12 +349,13 @@ class RedisBuffer(Buffer):
|
|
|
pending_key = self._make_pending_key_from_key(key)
|
|
|
|
|
|
try:
|
|
|
- if self.is_redis_cluster:
|
|
|
+ if is_instance_redis_cluster(self.cluster, self.is_redis_cluster):
|
|
|
pipe = self.cluster.pipeline(transaction=False)
|
|
|
- else:
|
|
|
+ elif is_instance_rb_cluster(self.cluster, self.is_redis_cluster):
|
|
|
conn = self.cluster.get_local_client_for_key(key)
|
|
|
pipe = conn.pipeline()
|
|
|
-
|
|
|
+ else:
|
|
|
+ raise AssertionError("unreachable")
|
|
|
pipe.hgetall(key)
|
|
|
pipe.zrem(pending_key, key)
|
|
|
pipe.delete(key)
|