Browse Source

Add frequency table (Count-Min Sketch), without usage.

Ted Kaemming 9 years ago

+ 1 - 1

@@ -130,7 +130,7 @@ install_requires = [
-    'rb>=1.3.0,<2.0.0',
+    'rb>=1.4.0,<2.0.0',
 postgres_requires = [

+ 569 - 0

@@ -0,0 +1,569 @@
+Count-Min Sketch
+This provides a Redis-based implementation of the Count-Min Sketch, a
+probabilistic data structure that allows counting observations of items from
+high cardinality input stream in sublinear space, with the tradeoff of
+potentially overcounting lower-frequency items due to hash collisions.
+This implementation extends the conventional Count-Min algorithm, adding an
+index that allows querying for the top N items that have been observed in the
+stream. The index also serves as the primary storage, reducing storage
+requirements and improving accuracy, auntil it's capacity is exceeded, at which
+point the index data is used to initialize the estimation matrix. Once the
+index capacity as been exceeded and the estimation matrix has been initialized,
+the index of most frequent items is maintained using the estimates from the
+The public API consists of three main methods:
+- INCR: used to record observations of items,
+- ESTIMATE: used to query the number of times a specific item has been seen,
+- RANKED: used to query the top N items that have been recorded in a sketch.
+The named command to use is the first item passed as ``ARGV``. For commands
+that mutate data (`INCR`), the command is followed by the accuracy and storage
+parameters to use when initializing a new sketch:
+- DEPTH: number of rows for the estimation matrix,
+- WIDTH: number of columns for the estimation matrix,
+- CAPACITY: maximum size of the index (to disable indexing entirely, set to 0.)
+(Configuration parameters are not required for readonly commands such as
+The ``KEYS`` provided to each command are the three keys used for sketch storage:
+- configuration key (bytes, serialized MessagePack data)
+- index key (sorted set)
+- estimation matrix key (hash of frequencies (floats), keyed by struct packed matrix coordinates)
+Multiple sketches can be provided to each command by providing another set of keys, e.g.
+    EVALSHA $SHA 6 1:config 1:index 1:estimates 2:config 2:index 2:estimates [...]
+(Whether a command returns a single result that encompasses all sketches, or a
+sequence of results that correspond to each sketch is dependent on the command
+being called.)
+To add two items, "foo" with a score of 1, and "bar" with a score of 2 to two
+sketches with depth 5, width 64 and index capacity of 50:
+    EVALSHA $SHA 6 1:c 1:i 1:e 2:c 2:i 2:e INCR 5 64 50 1 foo 2 bar
+To query the top 5 items from the first sketch:
+    EVALSHA $SHA 3 1:c 1:i 1:e RANKED 5
+--[[ Helpers ]]--
+local function filter(f, t)
+    local result = {}
+    for i, value in ipairs(t) do
+        if f(value) then
+            table.insert(result, value)
+        end
+    end
+    return result
+local function map(f, t)
+    local result = {}
+    for i, value in ipairs(t) do
+        result[i] = f(value)
+    end
+    return result
+local function head(t)
+    return (
+        function (head, ...)
+            return head, {...}
+        end
+    )(unpack(t))
+local function reduce(f, t, initializer)
+    if initializer == nil then
+        initializer, t = head(t)
+    end
+    local result = initializer
+    for _, value in pairs(t) do
+        result = f(result, value)
+    end
+    return result
+local function sum(series)
+    return reduce(
+        function (x, y)
+            return x + y
+        end,
+        series,
+        0
+    )
+local function zip(items)
+    local length = reduce(
+        math.min,
+        map(
+            function (t)
+                return #t
+            end,
+            items
+        )
+    )
+    local results = {}
+    for i = 1, length do
+        local value = {}
+        for j = 1, #items do
+            value[j] = items[j][i]
+        end
+        results[i] = value
+    end
+    return results
+    MurmurHash3
+    This implementation of MurmurHash3 is the 32-bit variation, based on the
+    example implementations here:
+local function mmh3(key, seed)
+    local c1 = 0xcc9e2d51
+    local c2 = 0x1b873593
+    local r1 = 15
+    local r2 = 13
+    local m = 5
+    local n = 0xe6546b64
+    local function multiply(x, y)
+        -- This is required to emulate uint32 overflow correctly -- otherwise,
+        -- higher order bits are simply truncated and discarded.
+        return (, 0xffff) * y) + bit.lshift(, 16) * y,  0xffff), 16)
+    end
+    local hash = bit.tobit(seed)
+    local remainder = #key % 4
+    for i = 1, #key - remainder, 4 do
+        local k = struct.unpack('<I4', key, i)
+        k = multiply(k, c1)
+        k = bit.rol(k, r1)
+        k = multiply(k, c2)
+        hash = bit.bxor(hash, k)
+        hash = bit.rol(hash, r2)
+        hash = multiply(hash, m) + n
+    end
+    if remainder ~= 0 then
+        local k1 = struct.unpack('<I' .. remainder, key, #key - remainder + 1)
+        k1 = multiply(k1, c1)
+        k1 = bit.rol(k1, r1)
+        k1 = multiply(k1, c2)
+        hash = bit.bxor(hash, k1)
+    end
+    hash = bit.bxor(hash, #key)
+    hash = bit.bxor(hash, bit.rshift(hash, 16))
+    hash = multiply(hash, 0x85ebca6b)
+    hash = bit.bxor(hash, bit.rshift(hash, 13))
+    hash = multiply(hash, 0xc2b2ae35)
+    hash = bit.bxor(hash, bit.rshift(hash, 16))
+    return hash
+--[[ Configuration ]]--
+local Configuration = {}
+function Configuration:new(key, readonly, defaults)
+    self.__index = self
+    return setmetatable({
+        key = key,
+        readonly = readonly,
+        defaults = defaults,
+        data = nil,
+        loaded = false
+    }, self)
+function Configuration:exists()
+    self:load()
+    return ~= nil
+function Configuration:load()
+    if not self.loaded then
+        local raw ='GET', self.key)
+        if raw == false then
+   = self.defaults
+            if not self.readonly then
+      'SET', self.key, cmsgpack.pack(
+            end
+        else
+   = cmsgpack.unpack(raw)
+        end
+        self.loaded = true
+    end
+function Configuration:get(key)
+    self:load()
+    return[key]
+--[[ Sketch ]]--
+local Sketch = {}
+function Sketch:new(configuration, index, estimates)
+    self.__index = self
+    return setmetatable({
+        configuration = configuration,
+        index = index,
+        estimates = estimates
+    }, self)
+function Sketch:coordinates(value)
+    local coordinates = {}
+    local width = self.configuration:get('width')
+    for d = 1, self.configuration:get('depth') do
+        local w = (mmh3(value, d) % width) + 1  -- This Kool-Aid is delicious!
+        table.insert(coordinates, {d, w})
+    end
+    return coordinates
+function Sketch:observations(coordinates)
+    return tonumber('HGET', self.estimates, struct.pack('>HH', unpack(coordinates)))) or 0
+function Sketch:estimate(value)
+    if self.configuration:exists() then
+        local score = tonumber('ZSCORE', self.index, value))
+        if score ~= nil then
+            return score
+        end
+        return reduce(
+            math.min,
+            map(
+                function (c)
+                    return self:observations(c)
+                end,
+                self:coordinates(value)
+            )
+        )
+    else
+        return 0
+    end
+function Sketch:increment(items)
+    assert(not self.configuration.readonly)
+    local results = {}
+    local usage ='ZCARD', self.index)
+    if self.configuration:get('index') > usage then
+        -- Add all of the items to the index. (Note that this can cause the
+        -- index to temporarily grow to the size of the capacity - 1 + number
+        -- of items being updated in the worst case.)
+        local added = 0
+        for i, item in pairs(items) do
+            local value, delta = unpack(item)
+            local score = tonumber('ZINCRBY', self.index, delta, value))
+            if score == delta then
+                added = added + 1
+            end
+        end
+        -- If the number of items added pushes the index to capacity, we need
+        -- to initialize the sketch matrix with all of the current members of
+        -- the index.
+        if added + usage >= self.configuration:get('index') then
+            -- TODO: Use this data to generate the response value.
+            local members ='ZRANGE', self.index, 0, -1, 'WITHSCORES')
+            for i = 1, #members, 2 do
+                local value = members[i]
+                local score = members[i + 1]
+                local coordinates = self:coordinates(value)
+                local estimates = map(
+                    function (c)
+                        return self:observations(c)
+                    end,
+                    coordinates
+                )
+                for i, item in pairs(zip({coordinates, estimates})) do
+                    local c, estimate = unpack(item)
+                    local update = math.max(score, estimate)
+                    if estimate ~= update then
+              'HSET', self.estimates, struct.pack('>HH', unpack(c)), update)
+                    end
+                end
+            end
+            -- Remove extra items from the index.
+  'ZREMRANGEBYRANK', self.index, 0, -self.configuration:get('index') - 1)
+        end
+    else
+        -- Fetch the estimates for each item and update them.
+        for i, item in pairs(items) do
+            local value, delta = unpack(item)
+            local coordinates = self:coordinates(value)
+            local estimates = map(
+                function (c)
+                    return self:observations(c)
+                end,
+                coordinates
+            )
+            -- This uses the score from the index (if it's available) instead
+            -- of the index to avoid data rot as much as possible.
+            local score = (tonumber('ZSCORE', self.index, item)) or reduce(math.min, estimates)) + delta
+            for i, item in pairs(zip({coordinates, estimates})) do
+                local c, estimate = unpack(item)
+                local update = math.max(score, estimate)
+                if estimate ~= update then
+          'HSET', self.estimates, struct.pack('>HH', unpack(c)), update)
+                end
+            end
+            results[i] = score
+        end
+        if self.configuration:get('index') > 0 then
+            local added = 0
+            local minimum = tonumber('ZRANGE', self.index, 0, 0, 'WITHSCORES')[2])
+            for i, item in pairs(items) do
+                local score = results[i]
+                -- TODO: This should also probably lexicographically sort items for consistent behavior.
+                if score > minimum then
+                    local value = unpack(item)
+                    added = added +'ZADD', self.index, score, value)
+                end
+            end
+            if added > 0 then
+                -- Remove extra items from the index.
+      'ZREMRANGEBYRANK', self.index, 0, -self.configuration:get('index') - 1)
+            end
+        end
+    end
+    return results
+--[[ Redis API ]]--
+local Command = {}
+function Command:new(fn, readonly)
+    if readonly == nil then
+        readonly = false
+    end
+    return function (keys, arguments)
+        local defaults = nil
+        if not readonly then
+            defaults, arguments = (
+                function (depth, width, index, ...)
+                    return {
+                        -- TODO: Actually validate these.
+                        depth=tonumber(depth),
+                        width=tonumber(width),
+                        index=tonumber(index)
+                    }, {...}
+                end
+            )(unpack(arguments))
+        end
+        local sketches = {}
+        for i = 1, #keys, 3 do
+            table.insert(sketches, Sketch:new(
+                Configuration:new(keys[i], readonly, defaults),
+                keys[i + 1],
+                keys[i + 2]
+            ))
+        end
+        return fn(sketches, arguments)
+    end
+local Router = {}
+function Router:new(commands)
+    return function (keys, arguments)
+        local name, arguments = head(arguments)
+        return commands[name:upper()](keys, arguments)
+    end
+return Router:new({
+    --[[
+    Increment the number of observations for each item in all sketches.
+    ]]--
+    INCR = Command:new(
+        function (sketches, arguments)
+            local items = {}
+            for i = 1, #arguments, 2 do
+                -- The increment value needs to be positive, since we're using the conservative
+                -- update strategy proposed by Estan and Varghese:
+                --
+                local delta = tonumber(arguments[i])
+                assert(delta > 0, 'The increment value must be positive and nonzero.')
+                local value = arguments[i + 1]
+                table.insert(items, {value, delta})
+            end
+            return map(
+                function (sketch)
+                    return sketch:increment(items)
+                end,
+                sketches
+            )
+        end,
+        false
+    ),
+    --[[
+    Estimate the number of observations for each item in all sketches,
+    returning a sequence containing scores for items in the order that they
+    were provided for each sketch.
+    ]]--
+    ESTIMATE = Command:new(
+        function (sketches, values)
+            return map(
+                function (sketch)
+                    return map(
+                        function (value)
+                            return string.format(
+                                '%s',
+                                sketch:estimate(value)
+                            )
+                        end,
+                        values
+                    )
+                end,
+                sketches
+            )
+        end,
+        true
+    ),
+    --[[
+    Find the most frequently observed items across all sketches, returning a
+    seqeunce of item, score pairs.
+    ]]--
+    RANKED = Command:new(
+        function (sketches, arguments)
+            local limit = unpack(arguments)
+            -- We only care about sketches that actually exist.
+            sketches = filter(
+                function (sketch)
+                    return sketch.configuration:exists()
+                end,
+                sketches
+            )
+            if #sketches == 0 then
+                return {}
+            end
+            -- TODO: There are probably a bunch of performance optimizations that could be made here.
+            -- If no limit is provided, use an implicit limit of the smallest index.
+            if limit == nil then
+                limit = reduce(
+                    math.min,
+                    map(
+                        function (sketch)
+                            return sketch.configuration:get('index')
+                        end,
+                        sketches
+                    )
+                )
+            end
+            if #sketches == 1 then
+                local results = {}
+                -- Note that the ZREVRANGE bounds are *inclusive*, so the limit
+                -- needs to be reduced by one to act as a typical slice bound.
+                local members ='ZREVRANGE', sketches[1].index, 0, limit - 1, 'WITHSCORES')
+                for i=1, #members, 2 do
+                    table.insert(
+                        results,
+                        {
+                            members[i],
+                            string.format('%s', members[i + 1])
+                        }
+                    )
+                end
+                return results
+            else
+                -- As the first pass, we need to find all of the items to look
+                -- up in all sketches.
+                local items = {}
+                for _, sketch in pairs(sketches) do
+                    local members ='ZRANGE', sketch.index, 0, -1)
+                    for _, member in pairs(members) do
+                        items[member] = true
+                    end
+                end
+                local results = {}
+                for value in pairs(items) do
+                    table.insert(
+                        results,
+                        {
+                            value,
+                            sum(
+                                map(
+                                    function (sketch)
+                                        return sketch:estimate(value)
+                                    end,
+                                    sketches
+                                )
+                            ),
+                        }
+                    )
+                end
+                local function comparator(x, y)
+                    if x[2] == y[2] then
+                        return x[1] < y[1]  -- lexicographically by key ascending
+                    else
+                        return x[2] > y[2]  -- score descending
+                    end
+                end
+                table.sort(results, comparator)
+                -- Trim the results to the limit.
+                local trimmed = {}
+                for i = 1, math.min(limit, #results) do
+                    local item, score = unpack(results[i])
+                    trimmed[i] = {
+                        item,
+                        string.format('%s', score)
+                    }
+                end
+                return trimmed
+            end
+        end,
+        true
+    )

+ 62 - 2

@@ -13,7 +13,6 @@ from enum import Enum
 from sentry.utils.dates import to_timestamp
@@ -50,10 +49,22 @@ class TSDBModel(Enum):
     # distinct count of users that have been affected by an event in a group
     users_affected_by_group = 300
     # distinct count of users that have been affected by an event in a project
     users_affected_by_project = 301
+    # number of events sent to server for an organization (key is always 0)
+    frequent_organization_received_by_system = 400
+    # number of events rejected by server for an organization (key is always 0)
+    frequent_organization_rejected_by_system = 401
+    # number of events blacklisted by server for an organization (key is always 0)
+    frequent_organization_blacklisted_by_system = 402
+    # number of events seen for a project, by organization
+    frequent_projects_by_organization = 403
+    # number of issues seen for a project, by project
+    frequent_issues_by_project = 404
+    # number of issues seen for a tag value, by issue:tag
+    frequent_values_by_issue_tag = 405
 class BaseTSDB(object):
     models = TSDBModel
@@ -221,3 +232,52 @@ class BaseTSDB(object):
         Count distinct items during a time range.
         raise NotImplementedError
+    def record_frequency_multi(self, requests, timestamp=None):
+        """
+        Record items in a frequency table.
+        Metrics to increment should be passed as sequence pairs, using this
+        structure: ``(model, {key: {item: score, ...}, ...})``
+        """
+        raise NotImplementedError
+    def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None):
+        """
+        Retrieve the most frequently seen items in a frequency table.
+        Results are returned as a mapping, where the key is the key requested
+        and the value is a list of ``(member, score)`` tuples, ordered by the
+        highest (most frequent) to lowest (least frequent) score. The maximum
+        number of items returned is ``index capacity * rollup intervals`` if no
+        ``limit`` is provided.
+        """
+        raise NotImplementedError
+    def get_frequency_series(self, model, items, start, end=None, rollup=None):
+        """
+        Retrieve the frequency of known items in a table over time.
+        The items requested should be passed as a mapping, where the key is the
+        metric key, and the value is a sequence of members to retrieve scores
+        for.
+        Results are returned as a mapping, where the key is the key requested
+        and the value is a list of ``(timestamp, {item: score, ...})`` pairs
+        over the series.
+        """
+        raise NotImplementedError
+    def get_frequency_totals(self, model, items, start, end=None, rollup=None):
+        """
+        Retrieve the total frequency of known items in a table over time.
+        The items requested should be passed as a mapping, where the key is the
+        metric key, and the value is a sequence of members to retrieve scores
+        for.
+        Results are returned as a mapping, where the key is the key requested
+        and the value is a mapping of ``{item: score, ...}`` containing the
+        total score of items over the interval.
+        """
+        raise NotImplementedError

+ 26 - 0

@@ -28,3 +28,29 @@ class DummyTSDB(BaseTSDB):
     def get_distinct_counts_totals(self, model, keys, start, end=None, rollup=None):
         return {k: 0 for k in keys}
+    def record_frequency_multi(self, requests, timestamp=None):
+        pass
+    def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None):
+        return {key: [] for key in keys}
+    def get_frequency_series(self, model, items, start, end=None, rollup=None):
+        rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+        results = {}
+        for key, members in items.items():
+            result = results[key] = []
+            for timestamp in series:
+                result.append((
+                    timestamp,
+                    {k: 0.0 for k in members},
+                ))
+        return results
+    def get_frequency_totals(self, model, items, start, end=None, rollup=None):
+        results = {}
+        for key, members in items.items():
+            results[key] = {member: 0.0 for member in members}
+        return results

+ 71 - 2

@@ -7,8 +7,9 @@ sentry.tsdb.inmemory
 from __future__ import absolute_import
-from collections import defaultdict
+from collections import Counter, defaultdict
 from datetime import timedelta
 from django.utils import timezone
 from sentry.tsdb.base import BaseTSDB
@@ -99,4 +100,72 @@ class InMemoryTSDB(BaseTSDB): = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
         # self.sets[model][key][rollup] = set of elements
-        self.sets = defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
+        self.sets = defaultdict(
+            lambda: defaultdict(
+                lambda: defaultdict(
+                    set,
+                ),
+            ),
+        )
+        # self.frequencies[model][key][rollup] = Counter()
+        self.frequencies = defaultdict(
+            lambda: defaultdict(
+                lambda: defaultdict(
+                    Counter,
+                )
+            ),
+        )
+    def record_frequency_multi(self, requests, timestamp=None):
+        if timestamp is None:
+            timestamp =
+        for model, request in requests:
+            for key, items in request.items():
+                items = {k: float(v) for k, v in items.items()}
+                source = self.frequencies[model][key]
+                for rollup, _ in self.rollups:
+                    source[self.normalize_to_rollup(timestamp, rollup)].update(items)
+    def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None):
+        rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+        results = {}
+        for key in keys:
+            result = results[key] = Counter()
+            source = self.frequencies[model][key]
+            for timestamp in series:
+                result.update(source[self.normalize_ts_to_rollup(timestamp, rollup)])
+        for key, counter in results.items():
+            results[key] = counter.most_common(limit)
+        return results
+    def get_frequency_series(self, model, items, start, end=None, rollup=None):
+        rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+        results = {}
+        for key, members in items.items():
+            result = results[key] = []
+            source = self.frequencies[model][key]
+            for timestamp in series:
+                scores = source[self.normalize_ts_to_rollup(timestamp, rollup)]
+                result.append((
+                    timestamp,
+                    {k: scores.get(k, 0.0) for k in members},
+                ))
+        return results
+    def get_frequency_totals(self, model, items, start, end=None, rollup=None):
+        results = {}
+        for key, series in self.get_frequency_series(model, items, start, end, rollup).iteritems():
+            result = results[key] = {}
+            for timestamp, scores in series:
+                for member, score in scores.items():
+                    result[member] = result.get(member, 0.0) + score
+        return results

+ 161 - 24

@@ -8,35 +8,44 @@ sentry.tsdb.redis
 from __future__ import absolute_import
 import logging
-import six
+import operator
 from binascii import crc32
-from collections import defaultdict
+from collections import defaultdict, namedtuple
 from datetime import timedelta
+from hashlib import md5
+import six
 from django.conf import settings
 from django.utils import timezone
-from hashlib import md5
+from pkg_resources import resource_string
+from redis.client import Script
 from sentry.tsdb.base import BaseTSDB
 from sentry.utils.dates import to_timestamp
-from sentry.utils.redis import (
-    check_cluster_versions,
-    make_rb_cluster,
+from sentry.utils.redis import check_cluster_versions, make_rb_cluster
 from sentry.utils.versioning import Version
 logger = logging.getLogger(__name__)
+SketchParameters = namedtuple('SketchParameters', 'depth width capacity')
+CountMinScript = Script(
+    None,
+    resource_string('sentry', 'scripts/tsdb/cmsketch.lua'),
 class RedisTSDB(BaseTSDB):
     A time series storage backend for Redis.
-    The time series API supports two data types:
+    The time series API supports three data types:
         * simple counters
         * distinct counters (number of unique elements seen)
+        * frequency tables (a set of items ranked by most frequently observed)
     The backend also supports virtual nodes (``vnodes``) which controls shard
     distribution. This value should be set to the anticipated maximum number of
@@ -65,7 +74,25 @@ class RedisTSDB(BaseTSDB):
+    Frequency tables are modeled using two data structures:
+        * top-N index: a sorted set containing the most frequently observed items,
+        * estimation matrix: a hash table containing counters, used in a Count-Min sketch
+    Member scores are 100% accurate until the index is filled (and no memory is
+    used for the estimation matrix until this point), after which the data
+    structure switches to a probabilistic implementation and accuracy begins to
+    degrade for less frequently observed items, but remains accurate for more
+    frequently observed items.
+    Frequency tables are especially useful when paired with a (non-distinct)
+    counter of the total number of observations so that scores of items of the
+    frequency table can be displayed as percentages of the whole data set.
+    (Additional documentation and the bulk of the logic for implementing the
+    frequency table API can be found in the ``cmsketch.lua`` script.)
+    DEFAULT_SKETCH_PARAMETERS = SketchParameters(3, 128, 50)
     def __init__(self, hosts=None, prefix='ts:', vnodes=64, **kwargs):
         # inherit default options from REDIS_OPTIONS
         defaults = settings.SENTRY_REDIS_OPTIONS
@@ -86,7 +113,22 @@ class RedisTSDB(BaseTSDB):
-    def make_key(self, model, epoch, model_key):
+    def make_key(self, model, rollup, timestamp, key):
+        """
+        Make a key that is used for distinct counter and frequency table
+        values.
+        """
+        return '{prefix}{model}:{epoch}:{key}'.format(
+            prefix=self.prefix,
+            model=model.value,
+            epoch=self.normalize_ts_to_rollup(timestamp, rollup),
+            key=self.get_model_key(key),
+        )
+    def make_counter_key(self, model, epoch, model_key):
+        """
+        Make a key that is used for counter values.
+        """
         if isinstance(model_key, six.integer_types):
             vnode = model_key % self.vnodes
@@ -114,7 +156,7 @@ class RedisTSDB(BaseTSDB):
         >>> incr_multi([(TimeSeriesModel.project, 1), (, 5)])
-        make_key = self.make_key
+        make_key = self.make_counter_key
         normalize_to_rollup = self.normalize_to_rollup
         if timestamp is None:
             timestamp =
@@ -144,7 +186,7 @@ class RedisTSDB(BaseTSDB):
         normalize_to_epoch = self.normalize_to_epoch
         normalize_to_rollup = self.normalize_to_rollup
-        make_key = self.make_key
+        make_key = self.make_counter_key
         if rollup is None:
             rollup = self.get_optimal_rollup(start, end)
@@ -172,14 +214,6 @@ class RedisTSDB(BaseTSDB):
             results_by_key[key] = sorted(points.items())
         return dict(results_by_key)
-    def make_distinct_counter_key(self, model, rollup, timestamp, key):
-        return '{prefix}{model}:{epoch}:{key}'.format(
-            prefix=self.prefix,
-            model=model.value,
-            epoch=self.normalize_ts_to_rollup(timestamp, rollup),
-            key=self.get_model_key(key),
-        )
     def record(self, model, key, values, timestamp=None):
         self.record_multi(((model, key, values),), timestamp)
@@ -196,7 +230,7 @@ class RedisTSDB(BaseTSDB):
             for model, key, values in items:
                 c = client.target_key(key)
                 for rollup, max_values in self.rollups:
-                    k = self.make_distinct_counter_key(
+                    k = self.make_key(
@@ -227,7 +261,7 @@ class RedisTSDB(BaseTSDB):
-                            self.make_distinct_counter_key(
+                            self.make_key(
@@ -255,8 +289,111 @@ class RedisTSDB(BaseTSDB):
                 # directly here instead.
                 ks = []
                 for timestamp in series:
-                    ks.append(self.make_distinct_counter_key(model, rollup, timestamp, key))
+                    ks.append(self.make_key(model, rollup, timestamp, key))
                 responses[key] = client.target_key(key).execute_command('PFCOUNT', *ks)
         return {key: value.value for key, value in responses.iteritems()}
+    def make_frequency_table_keys(self, model, rollup, timestamp, key):
+        prefix = self.make_key(model, rollup, timestamp, key)
+        return map(
+            operator.methodcaller('format', prefix),
+            ('{}:c', '{}:i', '{}:e'),
+        )
+    def record_frequency_multi(self, requests, timestamp=None):
+        if timestamp is None:
+            timestamp =
+        ts = int(to_timestamp(timestamp))  # ``timestamp`` is not actually a timestamp :(
+        commands = {}
+        for model, request in requests:
+            for key, items in request.iteritems():
+                keys = []
+                expirations = {}
+                # Figure out all of the keys we need to be incrementing, as
+                # well as their expiration policies.
+                for rollup, max_values in self.rollups:
+                    chunk = self.make_frequency_table_keys(model, rollup, ts, key)
+                    keys.extend(chunk)
+                    expiry = self.calculate_expiry(rollup, max_values, timestamp)
+                    for k in chunk:
+                        expirations[k] = expiry
+                arguments = ['INCR'] + list(self.DEFAULT_SKETCH_PARAMETERS)
+                for member, score in items.items():
+                    arguments.extend((score, member))
+                # Since we're essentially merging dictionaries, we need to
+                # append this to any value that already exists at the key.
+                cmds = commands.setdefault(key, [])
+                cmds.append((CountMinScript, keys, arguments))
+                for k, t in expirations.items():
+                    cmds.append(('EXPIREAT', k, t))
+        self.cluster.execute_commands(commands)
+    def get_most_frequent(self, model, keys, start, end=None, rollup=None, limit=None):
+        rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+        commands = {}
+        arguments = ['RANKED']
+        if limit is not None:
+            arguments.append(int(limit))
+        for key in keys:
+            ks = []
+            for timestamp in series:
+                ks.extend(self.make_frequency_table_keys(model, rollup, timestamp, key))
+            commands[key] = [(CountMinScript, ks, arguments)]
+        results = {}
+        for key, responses in self.cluster.execute_commands(commands).items():
+            results[key] = [(member, float(score)) for member, score in responses[0].value]
+        return results
+    def get_frequency_series(self, model, items, start, end=None, rollup=None):
+        rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+        # Freeze ordering of the members (we'll need these later.)
+        for key, members in items.items():
+            items[key] = tuple(members)
+        commands = {}
+        for key, members in items.items():
+            ks = []
+            for timestamp in series:
+                ks.extend(self.make_frequency_table_keys(model, rollup, timestamp, key))
+            commands[key] = [(CountMinScript, ks, ('ESTIMATE',) + members)]
+        results = {}
+        for key, responses in self.cluster.execute_commands(commands).items():
+            members = items[key]
+            chunk = results[key] = []
+            for timestamp, scores in zip(series, responses[0].value):
+                chunk.append((timestamp, dict(zip(members, map(float, scores)))))
+        return results
+    def get_frequency_totals(self, model, items, start, end=None, rollup=None):
+        responses = {}
+        for key, series in self.get_frequency_series(model, items, start, end, rollup).iteritems():
+            response = responses[key] = {}
+            for timestamp, results in series:
+                for member, value in results.items():
+                    response[member] = response.get(member, 0.0) + value
+        return responses

+ 136 - 3

@@ -28,11 +28,11 @@ class RedisTSDBTest(TestCase):
         with self.db.cluster.all() as client:
-    def test_make_key(self):
-        result = self.db.make_key(TSDBModel.project, 1368889980, 1)
+    def test_make_counter_key(self):
+        result = self.db.make_counter_key(TSDBModel.project, 1368889980, 1)
         assert result == 'ts:1:1368889980:1'
-        result = self.db.make_key(TSDBModel.project, 1368889980, 'foo')
+        result = self.db.make_counter_key(TSDBModel.project, 1368889980, 'foo')
         assert result == 'ts:1:1368889980:33'
     def test_get_model_key(self):
@@ -150,3 +150,136 @@ class RedisTSDBTest(TestCase):
             1: 3,
             2: 2,
+    def test_frequency_tables(self):
+        now = datetime.utcnow().replace(tzinfo=pytz.UTC)
+        model = TSDBModel.frequent_projects_by_organization
+        self.db.record_frequency_multi(
+            (
+                (model, {
+                    'organization:1': {
+                        "project:1": 1,
+                        "project:2": 2,
+                        "project:3": 3,
+                    },
+                }),
+            ),
+            now
+        )
+        self.db.record_frequency_multi(
+            (
+                (model, {
+                    'organization:1': {
+                        "project:1": 1,
+                        "project:2": 2,
+                        "project:3": 3,
+                        "project:4": 4,
+                    },
+                    "organization:2": {
+                        "project:5": 1.5,
+                    },
+                }),
+            ),
+            now - timedelta(hours=1),
+        )
+        assert self.db.get_most_frequent(
+            model,
+            ('organization:1', 'organization:2'),
+            now,
+        ) == {
+            'organization:1': [
+                ('project:3', 3.0),
+                ('project:2', 2.0),
+                ('project:1', 1.0),
+            ],
+            'organization:2': [],
+        }
+        assert self.db.get_most_frequent(
+            model,
+            ('organization:1', 'organization:2'),
+            now,
+            limit=1,
+        ) == {
+            'organization:1': [
+                ('project:3', 3.0),
+            ],
+            'organization:2': [],
+        }
+        assert self.db.get_most_frequent(
+            model,
+            ('organization:1', 'organization:2'),
+            now - timedelta(hours=1),
+            now,
+        ) == {
+            'organization:1': [
+                ('project:3', 3.0 + 3.0),
+                ('project:2', 2.0 + 2.0),
+                ('project:4', 4.0),
+                ('project:1', 1.0 + 1.0),
+            ],
+            'organization:2': [
+                ('project:5', 1.5),
+            ],
+        }
+        rollup = 3600
+        timestamp = int(to_timestamp(now) // rollup) * rollup
+        assert self.db.get_frequency_series(
+            model,
+            {
+                'organization:1': ("project:1", "project:2", "project:3", "project:4"),
+                'organization:2': ("project:5",),
+            },
+            now - timedelta(hours=1),
+            now,
+            rollup=rollup,
+        ) == {
+            'organization:1': [
+                (timestamp - rollup, {
+                    "project:1": 1.0,
+                    "project:2": 2.0,
+                    "project:3": 3.0,
+                    "project:4": 4.0,
+                }),
+                (timestamp, {
+                    "project:1": 1.0,
+                    "project:2": 2.0,
+                    "project:3": 3.0,
+                    "project:4": 0.0,
+                }),
+            ],
+            'organization:2': [
+                (timestamp - rollup, {
+                    "project:5": 1.5,
+                }),
+                (timestamp, {
+                    "project:5": 0.0,
+                }),
+            ],
+        }
+        assert self.db.get_frequency_totals(
+            model,
+            {
+                'organization:1': ("project:1", "project:2", "project:3", "project:4", "project:5"),
+                'organization:2': ("project:1",),
+            },
+            now - timedelta(hours=1),
+            now,
+        ) == {
+            'organization:1': {
+                "project:1": 1.0 + 1.0,
+                "project:2": 2.0 + 2.0,
+                "project:3": 3.0 + 3.0,
+                "project:4": 4.0,
+                "project:5": 0.0,
+            },
+            'organization:2': {
+                "project:1": 0.0,
+            },
+        }