|
@@ -14,8 +14,9 @@ for package in ('nydus', 'redis'):
|
|
|
try:
|
|
|
__import__(package, {}, {}, [], -1)
|
|
|
except ImportError:
|
|
|
- raise ImproperlyConfigured('Missing %r package, which is required for Redis buffers' %
|
|
|
- (package,))
|
|
|
+ raise ImproperlyConfigured(
|
|
|
+ 'Missing %r package, which is required for Redis buffers' % (
|
|
|
+ package,))
|
|
|
|
|
|
from django.db import models
|
|
|
from django.utils.encoding import smart_str
|
|
@@ -68,6 +69,13 @@ class RedisBuffer(Buffer):
|
|
|
for k, v in sorted(filters.iteritems())))).hexdigest(),
|
|
|
)
|
|
|
|
|
|
+ def _make_lock_key(self, model, filters):
|
|
|
+ return '%s:lock:%s' % (
|
|
|
+ model._meta,
|
|
|
+ md5(smart_str('&'.join('%s=%s' % (k, self._coerce_val(v))
|
|
|
+ for k, v in sorted(filters.iteritems())))).hexdigest(),
|
|
|
+ )
|
|
|
+
|
|
|
def incr(self, model, columns, filters, extra=None):
|
|
|
with self.conn.map() as conn:
|
|
|
for column, amount in columns.iteritems():
|
|
@@ -84,6 +92,12 @@ class RedisBuffer(Buffer):
|
|
|
super(RedisBuffer, self).incr(model, columns, filters, extra)
|
|
|
|
|
|
def process(self, model, columns, filters, extra=None):
|
|
|
+ lock_key = self._make_lock_key(model, filters)
|
|
|
+ # prevent a stampede due to the way we use celery etas + duplicate
|
|
|
+ # tasks
|
|
|
+ if not self.conn.setnx(lock_key, '1'):
|
|
|
+ self.conn.expire(lock_key, self.delay)
|
|
|
+
|
|
|
results = {}
|
|
|
with self.conn.map() as conn:
|
|
|
for column, amount in columns.iteritems():
|