|
@@ -96,19 +96,19 @@ class RedisBuffer(Buffer):
|
|
|
|
|
|
try:
|
|
|
keycount = 0
|
|
|
- for host_id in six.iterkeys(self.cluster.hosts):
|
|
|
- conn = self.cluster.get_local_client(host_id)
|
|
|
- keys = conn.zrange(self.pending_key, 0, -1)
|
|
|
- if not keys:
|
|
|
- continue
|
|
|
- for key in keys:
|
|
|
- keycount += 1
|
|
|
- process_incr.apply_async(kwargs={
|
|
|
- 'key': key,
|
|
|
- })
|
|
|
- pipe = conn.pipeline()
|
|
|
- pipe.zrem(self.pending_key, *keys)
|
|
|
- pipe.execute()
|
|
|
+ with self.cluster.all() as conn:
|
|
|
+ results = conn.zrange(self.pending_key, 0, -1)
|
|
|
+
|
|
|
+ with self.cluster.all() as conn:
|
|
|
+ for host_id, keys in six.iteritems(results.value):
|
|
|
+ if not keys:
|
|
|
+ continue
|
|
|
+ keycount += len(keys)
|
|
|
+ for key in keys:
|
|
|
+ process_incr.apply_async(kwargs={
|
|
|
+ 'key': key,
|
|
|
+ })
|
|
|
+ conn.target([host_id]).zrem(self.pending_key, *keys)
|
|
|
metrics.timing('buffer.pending-size', keycount)
|
|
|
finally:
|
|
|
client.delete(lock_key)
|