|
@@ -1,12 +1,10 @@
|
|
|
import binascii
|
|
|
import itertools
|
|
|
import logging
|
|
|
-import random
|
|
|
import uuid
|
|
|
from collections import defaultdict, namedtuple
|
|
|
from collections.abc import Callable, Iterable, Mapping, Sequence
|
|
|
from datetime import datetime
|
|
|
-from functools import reduce
|
|
|
from hashlib import md5
|
|
|
from typing import Any, ContextManager, Generic, TypeVar
|
|
|
|
|
@@ -17,12 +15,7 @@ from redis.client import Script
|
|
|
|
|
|
from sentry.tsdb.base import BaseTSDB, IncrMultiOptions, TSDBItem, TSDBKey, TSDBModel
|
|
|
from sentry.utils.dates import to_datetime
|
|
|
-from sentry.utils.redis import (
|
|
|
- check_cluster_versions,
|
|
|
- get_cluster_from_options,
|
|
|
- is_instance_rb_cluster,
|
|
|
- load_redis_script,
|
|
|
-)
|
|
|
+from sentry.utils.redis import check_cluster_versions, get_cluster_from_options, load_redis_script
|
|
|
from sentry.utils.versioning import Version
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -563,95 +556,6 @@ class RedisTSDB(BaseTSDB):
|
|
|
|
|
|
return {key: value.value for key, value in responses.items()}
|
|
|
|
|
|
- def get_distinct_counts_union(
|
|
|
- self,
|
|
|
- model: TSDBModel,
|
|
|
- keys: list[int] | None,
|
|
|
- start: datetime,
|
|
|
- end: datetime | None = None,
|
|
|
- rollup: int | None = None,
|
|
|
- environment_id: int | None = None,
|
|
|
- tenant_ids: dict[str, str | int] | None = None,
|
|
|
- ) -> int:
|
|
|
- self.validate_arguments([model], [environment_id])
|
|
|
-
|
|
|
- if not keys:
|
|
|
- return 0
|
|
|
-
|
|
|
- rollup, series = self.get_optimal_rollup_series(start, end, rollup)
|
|
|
-
|
|
|
- temporary_id = uuid.uuid1().hex
|
|
|
-
|
|
|
- def make_temporary_key(key: str | int) -> str:
|
|
|
- return f"{self.prefix}{temporary_id}:{key}"
|
|
|
-
|
|
|
- def expand_key(key: int) -> list[int | str]:
|
|
|
- """
|
|
|
- Return a list containing all keys for each interval in the series for a key.
|
|
|
- """
|
|
|
- return [
|
|
|
- self.make_key(model, rollup, timestamp, key, environment_id) for timestamp in series
|
|
|
- ]
|
|
|
-
|
|
|
- cluster, _ = self.get_cluster(environment_id)
|
|
|
- if is_instance_rb_cluster(cluster, False):
|
|
|
- router = cluster.get_router()
|
|
|
- else:
|
|
|
- raise AssertionError("unreachable")
|
|
|
-
|
|
|
- def map_key_to_host(hosts: dict[int, set[int]], key: int) -> dict[int, set[int]]:
|
|
|
- """
|
|
|
- Identify the host where a key is located and add it to the host map.
|
|
|
- """
|
|
|
- hosts[router.get_host_for_key(key)].add(key)
|
|
|
- return hosts
|
|
|
-
|
|
|
- def get_partition_aggregate(value: tuple[int, set[int]]) -> tuple[int, int]:
|
|
|
- """
|
|
|
- Fetch the HyperLogLog value (in its raw byte representation) that
|
|
|
- results from merging all HyperLogLogs at the provided keys.
|
|
|
- """
|
|
|
- (host, _keys) = value
|
|
|
- destination = make_temporary_key(f"p:{host}")
|
|
|
- client = cluster.get_local_client(host)
|
|
|
- with client.pipeline(transaction=False) as pipeline:
|
|
|
- pipeline.execute_command(
|
|
|
- "PFMERGE",
|
|
|
- destination,
|
|
|
- *itertools.chain.from_iterable(expand_key(key) for key in _keys),
|
|
|
- )
|
|
|
- pipeline.get(destination)
|
|
|
- pipeline.delete(destination)
|
|
|
- return host, pipeline.execute()[1]
|
|
|
-
|
|
|
- def merge_aggregates(values: list[tuple[int, int]]) -> int:
|
|
|
- """
|
|
|
- Calculate the cardinality of the provided HyperLogLog values.
|
|
|
- """
|
|
|
- destination = make_temporary_key("a") # all values will be merged into this key
|
|
|
- aggregates = {make_temporary_key(f"a:{host}"): value for host, value in values}
|
|
|
-
|
|
|
- # Choose a random host to execute the reduction on. (We use a host
|
|
|
- # here that we've already accessed as part of this process -- this
|
|
|
- # way, we constrain the choices to only hosts that we know are
|
|
|
- # running.)
|
|
|
- client = cluster.get_local_client(random.choice(values)[0])
|
|
|
- with client.pipeline(transaction=False) as pipeline:
|
|
|
- pipeline.mset(aggregates)
|
|
|
- pipeline.execute_command("PFMERGE", destination, *aggregates.keys())
|
|
|
- pipeline.execute_command("PFCOUNT", destination)
|
|
|
- pipeline.delete(destination, *aggregates.keys())
|
|
|
- return pipeline.execute()[2]
|
|
|
-
|
|
|
- # TODO: This could be optimized to skip the intermediate step for the
|
|
|
- # host that has the largest number of keys if the final merge and count
|
|
|
- # is performed on that host. If that host contains *all* keys, the
|
|
|
- # final reduction could be performed as a single PFCOUNT, skipping the
|
|
|
- # MSET and PFMERGE operations entirely.
|
|
|
-
|
|
|
- reduced: dict[int, set[int]] = reduce(map_key_to_host, set(keys), defaultdict(set))
|
|
|
- return merge_aggregates([get_partition_aggregate(x) for x in reduced.items()])
|
|
|
-
|
|
|
def merge_distinct_counts(
|
|
|
self,
|
|
|
model: TSDBModel,
|
|
@@ -828,90 +732,6 @@ class RedisTSDB(BaseTSDB):
|
|
|
if durable:
|
|
|
raise
|
|
|
|
|
|
- def get_most_frequent(
|
|
|
- self,
|
|
|
- model: TSDBModel,
|
|
|
- keys: Sequence[TSDBKey],
|
|
|
- start: datetime,
|
|
|
- end: datetime | None = None,
|
|
|
- rollup: int | None = None,
|
|
|
- limit: int | None = None,
|
|
|
- environment_id: int | None = None,
|
|
|
- tenant_ids: dict[str, int | str] | None = None,
|
|
|
- ) -> dict[TSDBKey, list[tuple[str, float]]]:
|
|
|
- self.validate_arguments([model], [environment_id])
|
|
|
-
|
|
|
- if not self.enable_frequency_sketches:
|
|
|
- raise NotImplementedError("Frequency sketches are disabled.")
|
|
|
-
|
|
|
- rollup, series = self.get_optimal_rollup_series(start, end, rollup)
|
|
|
-
|
|
|
- arguments = ["RANKED"] + list(self.DEFAULT_SKETCH_PARAMETERS)
|
|
|
- if limit is not None:
|
|
|
- arguments.append(int(limit))
|
|
|
-
|
|
|
- commands = {}
|
|
|
- for key in keys:
|
|
|
- ks = []
|
|
|
- for timestamp in series:
|
|
|
- ks.extend(
|
|
|
- self.make_frequency_table_keys(model, rollup, timestamp, key, environment_id)
|
|
|
- )
|
|
|
- commands[key] = [(CountMinScript, ks, arguments)]
|
|
|
-
|
|
|
- results = {}
|
|
|
- cluster, _ = self.get_cluster(environment_id)
|
|
|
- for _key, responses in cluster.execute_commands(commands).items():
|
|
|
- results[_key] = [
|
|
|
- (member.decode("utf-8"), float(score)) for member, score in responses[0].value
|
|
|
- ]
|
|
|
-
|
|
|
- return results
|
|
|
-
|
|
|
- def get_most_frequent_series(
|
|
|
- self,
|
|
|
- model: TSDBModel,
|
|
|
- keys: Iterable[str],
|
|
|
- start: datetime,
|
|
|
- end: datetime | None = None,
|
|
|
- rollup: int | None = None,
|
|
|
- limit: int | None = None,
|
|
|
- environment_id: int | None = None,
|
|
|
- tenant_ids: dict[str, int | str] | None = None,
|
|
|
- ) -> dict[str, list[tuple[int, dict[str, float]]]]:
|
|
|
- self.validate_arguments([model], [environment_id])
|
|
|
-
|
|
|
- if not self.enable_frequency_sketches:
|
|
|
- raise NotImplementedError("Frequency sketches are disabled.")
|
|
|
-
|
|
|
- rollup, series = self.get_optimal_rollup_series(start, end, rollup)
|
|
|
-
|
|
|
- arguments = ["RANKED"] + list(self.DEFAULT_SKETCH_PARAMETERS)
|
|
|
- if limit is not None:
|
|
|
- arguments.append(int(limit))
|
|
|
-
|
|
|
- commands: dict[str, list[tuple[Script, list[str], list[str | int]]]] = {}
|
|
|
- for key in keys:
|
|
|
- commands[key] = [
|
|
|
- (
|
|
|
- CountMinScript,
|
|
|
- self.make_frequency_table_keys(model, rollup, timestamp, key, environment_id),
|
|
|
- arguments,
|
|
|
- )
|
|
|
- for timestamp in series
|
|
|
- ]
|
|
|
-
|
|
|
- def unpack_response(response: rb.Promise) -> dict[str, float]:
|
|
|
- return {item.decode("utf-8"): float(score) for item, score in response.value}
|
|
|
-
|
|
|
- results: dict[str, list[tuple[int, dict[str, float]]]] = {}
|
|
|
- cluster, _ = self.get_cluster(environment_id)
|
|
|
- for key, responses in cluster.execute_commands(commands).items():
|
|
|
- zipped_series = zip(series, (unpack_response(response) for response in responses))
|
|
|
- results[key] = list(zipped_series)
|
|
|
-
|
|
|
- return results
|
|
|
-
|
|
|
def get_frequency_series(
|
|
|
self,
|
|
|
model: TSDBModel,
|
|
@@ -961,33 +781,6 @@ class RedisTSDB(BaseTSDB):
|
|
|
|
|
|
return results
|
|
|
|
|
|
- def get_frequency_totals(
|
|
|
- self,
|
|
|
- model: TSDBModel,
|
|
|
- items: Mapping[TSDBKey, Sequence[TSDBItem]],
|
|
|
- start: datetime,
|
|
|
- end: datetime | None = None,
|
|
|
- rollup: int | None = None,
|
|
|
- environment_id: int | None = None,
|
|
|
- tenant_ids: dict[str, str | int] | None = None,
|
|
|
- ) -> dict[TSDBKey, dict[TSDBItem, float]]:
|
|
|
- self.validate_arguments([model], [environment_id])
|
|
|
-
|
|
|
- if not self.enable_frequency_sketches:
|
|
|
- raise NotImplementedError("Frequency sketches are disabled.")
|
|
|
-
|
|
|
- responses: dict[TSDBKey, dict[TSDBItem, float]] = {}
|
|
|
- frequency_series = self.get_frequency_series(
|
|
|
- model, items, start, end, rollup, environment_id
|
|
|
- )
|
|
|
- for _key, series in frequency_series.items():
|
|
|
- response = responses[_key] = defaultdict(float)
|
|
|
- for timestamp, results in series:
|
|
|
- for member, value in results.items():
|
|
|
- response[member] = response.get(member, 0) + value
|
|
|
-
|
|
|
- return responses
|
|
|
-
|
|
|
def merge_frequencies(
|
|
|
self,
|
|
|
model: TSDBModel,
|