|
@@ -5,6 +5,7 @@ import logging
|
|
|
import random
|
|
|
import time
|
|
|
from contextlib import contextmanager
|
|
|
+from pkg_resources import resource_string
|
|
|
|
|
|
from django.conf import settings
|
|
|
from redis.client import Script
|
|
@@ -71,85 +72,13 @@ def make_record_key(timeline_key, record):
|
|
|
return '{0}:{1}:{2}'.format(timeline_key, TIMELINE_RECORD_PATH_COMPONENT, record)
|
|
|
|
|
|
|
|
|
-# Ensures a timeline is scheduled to be digested, adjusting the schedule time
|
|
|
-# if necessary.
|
|
|
-# KEYS: {WAITING, READY, LAST_PROCESSED_TIMESTAMP}
|
|
|
-# ARGV: {
|
|
|
-# TIMELINE, -- timeline key
|
|
|
-# TIMESTAMP, --
|
|
|
-# INCREMENT, -- amount of time (in seconds) that an event addition delays scheduling
|
|
|
-# MAXIMUM -- maximum amount of time (in seconds) between a timeline being
|
|
|
-# -- digested, and the same timeline being scheduled for the next
|
|
|
-# -- digestion
|
|
|
-# }
|
|
|
-ENSURE_TIMELINE_SCHEDULED_SCRIPT = """\
|
|
|
-local WAITING = KEYS[1] or error("incorrect number of keys provided")
|
|
|
-local READY = KEYS[2] or error("incorrect number of keys provided")
|
|
|
-local LAST_PROCESSED_TIMESTAMP = KEYS[3] or error("incorrect number of keys provided")
|
|
|
-
|
|
|
-local TIMELINE = ARGV[1] or error("incorrect number of arguments provided")
|
|
|
-local TIMESTAMP = ARGV[2] or error("incorrect number of arguments provided")
|
|
|
-local INCREMENT = ARGV[3] or error("incorrect number of arguments provided")
|
|
|
-local MAXIMUM = ARGV[4] or error("incorrect number of arguments provided")
|
|
|
-
|
|
|
--- If the timeline is already in the "ready" set, this is a noop.
|
|
|
-if tonumber(redis.call('ZSCORE', READY, TIMELINE)) ~= nil then
|
|
|
- return false
|
|
|
-end
|
|
|
-
|
|
|
--- Otherwise, check to see if the timeline is in the "waiting" set.
|
|
|
-local score = tonumber(redis.call('ZSCORE', WAITING, TIMELINE))
|
|
|
-if score ~= nil then
|
|
|
- -- If the timeline is already in the "waiting" set, increase the delay by
|
|
|
- -- min(current schedule + increment value, maximum delay after last processing time).
|
|
|
- local last = tonumber(redis.call('GET', LAST_PROCESSED_TIMESTAMP))
|
|
|
- local update = nil;
|
|
|
- if last == nil then
|
|
|
- -- If the last processed timestamp is missing for some reason (possibly
|
|
|
- -- evicted), be conservative and allow the timeline to be scheduled
|
|
|
- -- with either the current schedule time or provided timestamp,
|
|
|
- -- whichever is smaller.
|
|
|
- update = math.min(score, TIMESTAMP)
|
|
|
- else
|
|
|
- update = math.min(
|
|
|
- score + tonumber(INCREMENT),
|
|
|
- last + tonumber(MAXIMUM)
|
|
|
- )
|
|
|
- end
|
|
|
-
|
|
|
- if update ~= score then
|
|
|
- redis.call('ZADD', WAITING, update, TIMELINE)
|
|
|
- end
|
|
|
- return false
|
|
|
-end
|
|
|
-
|
|
|
--- If the timeline isn't already in either set, add it to the "ready" set with
|
|
|
--- the provided timestamp. This allows for immediate scheduling, bypassing the
|
|
|
--- imposed delay of the "waiting" state.
|
|
|
-redis.call('ZADD', READY, TIMESTAMP, TIMELINE)
|
|
|
-return true
|
|
|
-"""
|
|
|
-
|
|
|
-
|
|
|
-# Trims a timeline to a maximum number of records.
|
|
|
-# Returns the number of keys that were deleted.
|
|
|
-# KEYS: {TIMELINE}
|
|
|
-# ARGV: {LIMIT, PREFIX}
|
|
|
-TRUNCATE_TIMELINE_SCRIPT = """\
|
|
|
-local keys = redis.call('ZREVRANGE', KEYS[1], ARGV[1], -1)
|
|
|
-local prefix = ARGV[2] or KEYS[1]
|
|
|
-for i, record in pairs(keys) do
|
|
|
- redis.call('DEL', prefix .. ':{TIMELINE_RECORD_PATH_COMPONENT}:' .. record)
|
|
|
- redis.call('ZREM', KEYS[1], record)
|
|
|
-end
|
|
|
-return table.getn(keys)
|
|
|
-""".format(TIMELINE_RECORD_PATH_COMPONENT=TIMELINE_RECORD_PATH_COMPONENT)
|
|
|
-
|
|
|
-
|
|
|
-# XXX: Passing `None` as the first argument is a dirty hack to allow us to use
|
|
|
-# this more easily with the cluster
|
|
|
-ensure_timeline_scheduled = Script(None, ENSURE_TIMELINE_SCHEDULED_SCRIPT)
|
|
|
-truncate_timeline = Script(None, TRUNCATE_TIMELINE_SCRIPT)
|
|
|
+def load_script(name):
|
|
|
+ path = '/'.join(__name__.split('.')[1:-1] + [name])
|
|
|
+ return Script(None, resource_string('sentry', path))
|
|
|
+
|
|
|
+
|
|
|
+ensure_timeline_scheduled = load_script('ensure_timeline_scheduled.lua')
|
|
|
+truncate_timeline = load_script('truncate_timeline.lua')
|
|
|
|
|
|
|
|
|
class RedisBackend(Backend):
|
|
@@ -276,7 +205,11 @@ class RedisBackend(Backend):
|
|
|
|
|
|
should_truncate = random.random() < self.truncation_chance
|
|
|
if should_truncate:
|
|
|
- truncate_timeline((timeline_key,), (self.capacity,), pipeline)
|
|
|
+ truncate_timeline(
|
|
|
+ (timeline_key,),
|
|
|
+ (self.capacity, timeline_key),
|
|
|
+ pipeline,
|
|
|
+ )
|
|
|
|
|
|
results = pipeline.execute()
|
|
|
if should_truncate:
|
|
@@ -569,7 +502,7 @@ class RedisBackend(Backend):
|
|
|
connection = self.cluster.get_local_client_for_key(timeline_key)
|
|
|
with Lock(timeline_key, nowait=True, timeout=30), \
|
|
|
connection.pipeline() as pipeline:
|
|
|
- truncate_timeline((timeline_key,), (0,), pipeline)
|
|
|
+ truncate_timeline((timeline_key,), (0, timeline_key), pipeline)
|
|
|
truncate_timeline((make_digest_key(timeline_key),), (0, timeline_key), pipeline)
|
|
|
pipeline.delete(make_last_processed_timestamp_key(timeline_key))
|
|
|
pipeline.zrem(make_schedule_key(self.namespace, SCHEDULE_STATE_READY), key)
|