Просмотр исходного кода

feat(snuba): Add SnubaSearchBackend (#7925)

Brett Hoerner 6 лет назад
Родитель
Сommit
a7f8f17e1c

+ 1 - 0
.gitignore

@@ -40,3 +40,4 @@ Gemfile.lock
 coverage.xml
 junit.xml
 *.codestyle.xml
+.pytest_cache/

+ 2 - 2
.travis.yml

@@ -128,7 +128,7 @@ matrix:
         - redis-server
         - postgresql
     - python: 2.7
-      env: TEST_SUITE=snuba SNUBA=http://localhost:8000
+      env: TEST_SUITE=snuba SENTRY_TAGSTORE=sentry.tagstore.snuba.SnubaTagStorage SNUBA=http://localhost:8000
       services:
         - docker
         - memcached
@@ -140,7 +140,7 @@ matrix:
         - docker ps -a
   allow_failures:
     - python: 2.7
-      env: TEST_SUITE=snuba SNUBA=http://localhost:8000
+      env: TEST_SUITE=snuba SENTRY_TAGSTORE=sentry.tagstore.snuba.SnubaTagStorage SNUBA=http://localhost:8000
 notifications:
   webhooks:
     urls:

+ 1 - 1
src/sentry/conf/server.py

@@ -957,7 +957,7 @@ SENTRY_TAGSTORE_OPTIONS = (
 )
 
 # Search backend
-SENTRY_SEARCH = 'sentry.search.django.DjangoSearchBackend'
+SENTRY_SEARCH = os.environ.get('SENTRY_SEARCH', 'sentry.search.django.DjangoSearchBackend')
 SENTRY_SEARCH_OPTIONS = {}
 # SENTRY_SEARCH_OPTIONS = {
 #     'urls': ['http://localhost:9200/'],

+ 2 - 1
src/sentry/event_manager.py

@@ -47,6 +47,7 @@ from sentry.stacktraces import normalize_in_app
 
 HASH_RE = re.compile(r'^[0-9a-f]{32}$')
 DEFAULT_FINGERPRINT_VALUES = frozenset(['{{ default }}', '{{default}}'])
+ALLOWED_FUTURE_DELTA = timedelta(minutes=1)
 
 
 def count_limit(count):
@@ -221,7 +222,7 @@ def process_timestamp(value, current_datetime=None):
     if current_datetime is None:
         current_datetime = datetime.now()
 
-    if value > current_datetime + timedelta(minutes=1):
+    if value > current_datetime + ALLOWED_FUTURE_DELTA:
         raise InvalidTimestamp(EventError.FUTURE_TIMESTAMP)
 
     if value < current_datetime - timedelta(days=30):

+ 46 - 24
src/sentry/search/django/backend.py

@@ -206,8 +206,8 @@ def get_latest_release(project, environment):
 class DjangoSearchBackend(SearchBackend):
     def query(self, project, tags=None, environment=None, sort_by='date', limit=100,
               cursor=None, count_hits=False, paginator_options=None, **parameters):
-        from sentry.models import (Environment, Event, Group, GroupEnvironment,
-                                   GroupStatus, GroupSubscription, Release)
+
+        from sentry.models import Group, GroupStatus, GroupSubscription, Release
 
         if paginator_options is None:
             paginator_options = {}
@@ -272,13 +272,26 @@ class DjangoSearchBackend(SearchBackend):
         retention = quotas.get_event_retention(organization=project.organization)
         if retention:
             retention_window_start = timezone.now() - timedelta(days=retention)
-            # TODO: This could be optimized when building querysets to identify
-            # criteria that are logically impossible (e.g. if the upper bound
-            # for last seen is before the retention window starts, no results
-            # exist.)
-            group_queryset = group_queryset.filter(last_seen__gte=retention_window_start)
         else:
             retention_window_start = None
+        # TODO: This could be optimized when building querysets to identify
+        # criteria that are logically impossible (e.g. if the upper bound
+        # for last seen is before the retention window starts, no results
+        # exist.)
+        if retention_window_start:
+            group_queryset = group_queryset.filter(last_seen__gte=retention_window_start)
+
+        # This is a punt because the SnubaSearchBackend (a subclass) shares so much that it
+        # seemed better to handle all the shared initialization and then handoff to the
+        # actual backend.
+        return self._query(project, retention_window_start, group_queryset, tags,
+                           environment, sort_by, limit, cursor, count_hits,
+                           paginator_options, **parameters)
+
+    def _query(self, project, retention_window_start, group_queryset, tags, environment,
+               sort_by, limit, cursor, count_hits, paginator_options, **parameters):
+
+        from sentry.models import (Group, Environment, Event, GroupEnvironment, Release)
 
         if environment is not None:
             if 'environment' in tags:
@@ -293,13 +306,14 @@ class DjangoSearchBackend(SearchBackend):
                 'date_from': ScalarCondition('date_added', 'gt'),
                 'date_to': ScalarCondition('date_added', 'lt'),
             })
+
             if any(key in parameters for key in event_queryset_builder.conditions.keys()):
                 event_queryset = event_queryset_builder.build(
                     tagstore.get_event_tag_qs(
-                        project.id,
-                        environment.id,
-                        'environment',
-                        environment.name,
+                        project_id=project.id,
+                        environment_id=environment.id,
+                        key='environment',
+                        value=environment.name,
                     ),
                     parameters,
                 )
@@ -414,16 +428,17 @@ class DjangoSearchBackend(SearchBackend):
             get_sort_expression, sort_value_to_cursor_value = environment_sort_strategies[sort_by]
 
             group_tag_value_queryset = tagstore.get_group_tag_value_qs(
-                project.id,
-                set(group_queryset.values_list('id', flat=True)),  # TODO: Limit?,
-                environment.id,
-                'environment',
-                environment.name,
+                project_id=project.id,
+                group_id=set(group_queryset.values_list('id', flat=True)),  # TODO: Limit?,
+                environment_id=environment.id,
+                key='environment',
+                value=environment.name,
             )
 
             if retention_window_start is not None:
                 group_tag_value_queryset = group_tag_value_queryset.filter(
-                    last_seen__gte=retention_window_start)
+                    last_seen__gte=retention_window_start
+                )
 
             candidates = dict(
                 QuerySetBuilder({
@@ -451,10 +466,10 @@ class DjangoSearchBackend(SearchBackend):
                 # utilize the retention window start parameter for additional
                 # optimizations.
                 matches = tagstore.get_group_ids_for_search_filter(
-                    project.id,
-                    environment.id,
-                    tags,
-                    candidates.keys(),
+                    project_id=project.id,
+                    environment_id=environment.id,
+                    tags=tags,
+                    candidates=candidates.keys(),
                     limit=len(candidates),
                 )
                 for key in set(candidates) - set(matches or []):
@@ -475,6 +490,7 @@ class DjangoSearchBackend(SearchBackend):
                 'date_from': ScalarCondition('datetime', 'gt'),
                 'date_to': ScalarCondition('datetime', 'lt'),
             })
+
             if any(key in parameters for key in event_queryset_builder.conditions.keys()):
                 group_queryset = group_queryset.filter(
                     id__in=list(
@@ -511,9 +527,15 @@ class DjangoSearchBackend(SearchBackend):
             )
 
             if tags:
-                matches = tagstore.get_group_ids_for_search_filter(project.id, None, tags)
-                if matches:
-                    group_queryset = group_queryset.filter(id__in=matches)
+                group_ids = tagstore.get_group_ids_for_search_filter(
+                    project_id=project.id,
+                    environment_id=None,
+                    tags=tags,
+                    candidates=None,
+                )
+
+                if group_ids:
+                    group_queryset = group_queryset.filter(id__in=group_ids)
                 else:
                     group_queryset = group_queryset.none()
 

+ 3 - 0
src/sentry/search/snuba/__init__.py

@@ -0,0 +1,3 @@
+from __future__ import absolute_import, print_function
+
+from .backend import *  # NOQA

+ 367 - 0
src/sentry/search/snuba/backend.py

@@ -0,0 +1,367 @@
+from __future__ import absolute_import
+
+import six
+
+import logging
+import math
+import pytz
+from collections import defaultdict
+from datetime import timedelta, datetime
+
+from django.utils import timezone
+
+from sentry.api.paginator import SequencePaginator
+from sentry.event_manager import ALLOWED_FUTURE_DELTA
+from sentry.models import Release, Group, GroupEnvironment, GroupHash
+from sentry.search.django import backend as ds
+from sentry.utils import snuba
+from sentry.utils.dates import to_timestamp
+
+
+logger = logging.getLogger('sentry.search.snuba')
+
+
+# https://github.com/getsentry/sentry/blob/804c85100d0003cfdda91701911f21ed5f66f67c/src/sentry/event_manager.py#L241-L271
+priority_expr = 'toUInt32(log(times_seen) * 600) + toUInt32(last_seen)'
+
+
+datetime_format = '%Y-%m-%dT%H:%M:%S+00:00'
+
+
+# TODO: Would be nice if this was handled in the Snuba abstraction, but that
+# would require knowledge of which fields are datetimes
+def snuba_str_to_datetime(d):
+    if not isinstance(d, datetime):
+        d = datetime.strptime(d, datetime_format)
+
+    if not d.tzinfo:
+        d = d.replace(tzinfo=pytz.utc)
+
+    return d
+
+
+def calculate_priority_cursor(data):
+    times_seen = sum(data['times_seen'])
+    last_seen = max(int(to_timestamp(snuba_str_to_datetime(d)) * 1000) for d in data['last_seen'])
+    return ((math.log(times_seen) * 600) + last_seen)
+
+
+def _datetime_cursor_calculator(field, fn):
+    def calculate(data):
+        datetime = fn(snuba_str_to_datetime(d) for d in data[field])
+        return int(to_timestamp(datetime) * 1000)
+
+    return calculate
+
+
+sort_strategies = {
+    # sort_by -> Tuple[
+    #   String: expression to generate sort value (of type T, used below),
+    #   Function[T] -> int: function for converting a group's data to a cursor value),
+    # ]
+    'priority': (
+        '-priority', calculate_priority_cursor,
+    ),
+    'date': (
+        '-last_seen', _datetime_cursor_calculator('last_seen', max),
+    ),
+    'new': (
+        '-first_seen', _datetime_cursor_calculator('first_seen', min),
+    ),
+    'freq': (
+        '-times_seen', lambda data: sum(data['times_seen']),
+    ),
+}
+
+
+class SnubaConditionBuilder(object):
+    """\
+    Constructions a Snuba conditions list from a ``parameters`` mapping.
+
+    ``Condition`` objects are registered by their parameter name and used to
+    construct the Snuba condition list if they are present in the ``parameters``
+    mapping.
+    """
+
+    def __init__(self, conditions):
+        self.conditions = conditions
+
+    def build(self, parameters):
+        result = []
+        for name, condition in self.conditions.items():
+            if name in parameters:
+                result.append(condition.apply(name, parameters))
+        return result
+
+
+class Condition(object):
+    """\
+    Adds a single condition to a Snuba conditions list. Used with
+    ``SnubaConditionBuilder``.
+    """
+
+    def apply(self, name, parameters):
+        raise NotImplementedError
+
+
+class CallbackCondition(Condition):
+    def __init__(self, callback):
+        self.callback = callback
+
+    def apply(self, name, parameters):
+        return self.callback(parameters[name])
+
+
+class ScalarCondition(Condition):
+    """\
+    Adds a scalar filter (less than or greater than are supported) to a Snuba
+    condition list. Whether or not the filter is inclusive is defined by the
+    '{parameter_name}_inclusive' parameter.
+    """
+
+    def __init__(self, field, operator, default_inclusivity=True):
+        assert operator in ['<', '>']
+        self.field = field
+        self.operator = operator
+        self.default_inclusivity = default_inclusivity
+
+    def apply(self, name, parameters):
+        inclusive = parameters.get(
+            '{}_inclusive'.format(name),
+            self.default_inclusivity,
+        )
+
+        arg = parameters[name]
+        if isinstance(arg, datetime):
+            arg = int(to_timestamp(arg))
+
+        return (
+            self.field,
+            self.operator + ('=' if inclusive else ''),
+            arg
+        )
+
+
+class SnubaSearchBackend(ds.DjangoSearchBackend):
+    def _query(self, project, retention_window_start, group_queryset, tags, environment,
+               sort_by, limit, cursor, count_hits, paginator_options, **parameters):
+
+        # TODO: Product decision: we currently search Group.message to handle
+        # the `query` parameter, because that's what we've always done. We could
+        # do that search against every event in Snuba instead, but results may
+        # differ.
+
+        now = timezone.now()
+        end = parameters.get('date_to') or (now + ALLOWED_FUTURE_DELTA)
+        # TODO: Presumably we want to search back to the project's full retention,
+        #       which may be higher than 90 days in the future, but apparently
+        #       `retention_window_start` can be None?
+        start = max(
+            filter(None, [
+                retention_window_start,
+                parameters.get('date_from'),
+                now - timedelta(days=90)
+            ])
+        )
+        assert start < end
+
+        # TODO: It's possible `first_release` could be handled by Snuba.
+        if environment is not None:
+            group_queryset = ds.QuerySetBuilder({
+                'first_release': ds.CallbackCondition(
+                    lambda queryset, version: queryset.extra(
+                        where=[
+                            '{} = {}'.format(
+                                ds.get_sql_column(GroupEnvironment, 'first_release_id'),
+                                ds.get_sql_column(Release, 'id'),
+                            ),
+                            '{} = %s'.format(
+                                ds.get_sql_column(Release, 'organization'),
+                            ),
+                            '{} = %s'.format(
+                                ds.get_sql_column(Release, 'version'),
+                            ),
+                        ],
+                        params=[project.organization_id, version],
+                        tables=[Release._meta.db_table],
+                    ),
+                ),
+            }).build(
+                group_queryset.extra(
+                    where=[
+                        '{} = {}'.format(
+                            ds.get_sql_column(Group, 'id'),
+                            ds.get_sql_column(GroupEnvironment, 'group_id'),
+                        ),
+                        '{} = %s'.format(
+                            ds.get_sql_column(GroupEnvironment, 'environment_id'),
+                        ),
+                    ],
+                    params=[environment.id],
+                    tables=[GroupEnvironment._meta.db_table],
+                ),
+                parameters,
+            )
+        else:
+            group_queryset = ds.QuerySetBuilder({
+                'first_release': ds.CallbackCondition(
+                    lambda queryset, version: queryset.filter(
+                        first_release__organization_id=project.organization_id,
+                        first_release__version=version,
+                    ),
+                ),
+            }).build(
+                group_queryset,
+                parameters,
+            )
+
+        # TODO: If the query didn't include anything to significantly filter
+        # down the number of groups at this point ('first_release', 'query',
+        # 'status', 'bookmarked_by', 'assigned_to', 'unassigned',
+        # 'subscribed_by', 'active_at_from', or 'active_at_to') then this
+        # queryset might return a *huge* number of groups. In this case, we
+        # probably *don't* want to pass candidates down to Snuba, and rather we
+        # want Snuba to do all the filtering/sorting it can and *then* apply
+        # this queryset to the results from Snuba.
+        #
+        # However, if this did filter down the number of groups significantly,
+        # then passing in candidates is, of course, valuable.
+        #
+        # Should we decide which way to handle it based on the number of
+        # group_ids, the number of hashes? Or should we just always start the
+        # query with Snuba? Something else?
+        candidate_group_ids = list(group_queryset.values_list('id', flat=True))
+
+        sort_expression, calculate_cursor_for_group = sort_strategies[sort_by]
+
+        group_data = do_search(
+            project_id=project.id,
+            environment_id=environment and environment.id,
+            tags=tags,
+            start=start,
+            end=end,
+            sort=sort_expression,
+            candidates=candidate_group_ids,
+            **parameters
+        )
+
+        group_to_score = {}
+        for group_id, data in group_data.items():
+            group_to_score[group_id] = calculate_cursor_for_group(data)
+
+        paginator_results = SequencePaginator(
+            [(score, id) for (id, score) in group_to_score.items()],
+            reverse=True,
+            **paginator_options
+        ).get_result(limit, cursor, count_hits=count_hits)
+
+        groups = Group.objects.in_bulk(paginator_results.results)
+        paginator_results.results = [groups[k] for k in paginator_results.results if k in groups]
+
+        return paginator_results
+
+
+def do_search(project_id, environment_id, tags, start, end,
+              sort, candidates=None, limit=1000, **parameters):
+    from sentry.search.base import ANY
+
+    filters = {
+        'project_id': [project_id],
+    }
+
+    if environment_id is not None:
+        filters['environment'] = [environment_id]
+
+    if candidates is not None:
+        hashes = list(
+            GroupHash.objects.filter(
+                group_id__in=candidates
+            ).values_list(
+                'hash', flat=True
+            ).distinct()
+        )
+
+        if not hashes:
+            return {}
+
+        filters['primary_hash'] = hashes
+
+    having = SnubaConditionBuilder({
+        'age_from': ScalarCondition('first_seen', '>'),
+        'age_to': ScalarCondition('first_seen', '<'),
+        'last_seen_from': ScalarCondition('last_seen', '>'),
+        'last_seen_to': ScalarCondition('last_seen', '<'),
+        'times_seen': CallbackCondition(
+            lambda times_seen: ('times_seen', '=', times_seen),
+        ),
+        'times_seen_lower': ScalarCondition('times_seen', '>'),
+        'times_seen_upper': ScalarCondition('times_seen', '<'),
+    }).build(parameters)
+
+    conditions = []
+    for tag, val in six.iteritems(tags):
+        col = 'tags[{}]'.format(tag)
+        if val == ANY:
+            conditions.append((col, '!=', ''))
+        else:
+            conditions.append((col, '=', val))
+
+    aggregations = [
+        ['count()', '', 'times_seen'],
+        ['min', 'timestamp', 'first_seen'],
+        ['max', 'timestamp', 'last_seen'],
+        [priority_expr, '', 'priority']
+    ]
+
+    # {hash -> {times_seen -> int
+    #           first_seen -> date_str,
+    #           last_seen -> date_str,
+    #           priority -> int},
+    #  ...}
+    snuba_results = snuba.query(
+        start=start,
+        end=end,
+        groupby=['primary_hash'],
+        conditions=conditions,
+        having=having,
+        filter_keys=filters,
+        aggregations=aggregations,
+        orderby=sort,
+        limit=limit,
+    )
+
+    # {hash -> group_id, ...}
+    hash_to_group = dict(
+        GroupHash.objects.filter(
+            project_id=project_id,
+            hash__in=snuba_results.keys()
+        ).values_list(
+            'hash', 'group_id'
+        )
+    )
+
+    # {group_id -> {field1: [...all values from field1 for all hashes...],
+    #               field2: [...all values from field2 for all hashes...]
+    #               ...}
+    #  ...}
+    group_data = {}
+    for hash, obj in snuba_results.items():
+        if hash in hash_to_group:
+            group_id = hash_to_group[hash]
+
+            if group_id not in group_data:
+                group_data[group_id] = defaultdict(list)
+
+            dest = group_data[group_id]
+            for k, v in obj.items():
+                dest[k].append(v)
+        else:
+            logger.warning(
+                'search.hash_not_found',
+                extra={
+                    'project_id': project_id,
+                    'hash': hash,
+                },
+            )
+
+    return group_data

+ 3 - 2
src/sentry/tagstore/legacy/backend.py

@@ -573,6 +573,7 @@ class LegacyTagStorage(TagStorage):
 
     def get_group_ids_for_search_filter(
             self, project_id, environment_id, tags, candidates=None, limit=1000):
+
         from sentry.search.base import ANY
         # Django doesnt support union, so we limit results and try to find
         # reasonable matches
@@ -582,7 +583,7 @@ class LegacyTagStorage(TagStorage):
         tag_lookups = sorted(six.iteritems(tags), key=lambda (k, v): v == ANY)
 
         # get initial matches to start the filter
-        matches = candidates
+        matches = candidates or []
 
         # for each remaining tag, find matches contained in our
         # existing set, pruning it down each iteration
@@ -609,7 +610,7 @@ class LegacyTagStorage(TagStorage):
             matches = list(base_qs.values_list('group_id', flat=True)[:limit])
 
             if not matches:
-                return None
+                return []
 
         return matches
 

+ 7 - 24
src/sentry/tagstore/snuba/backend.py

@@ -24,6 +24,7 @@ from sentry.tagstore.exceptions import (
 )
 from sentry.utils import snuba
 
+
 SEEN_COLUMN = 'timestamp'
 
 
@@ -390,27 +391,9 @@ class SnubaTagStorage(TagStorage):
         result = snuba.query(start, end, ['issue'], None, filters, aggregations)
         return defaultdict(int, result.items())
 
-    # Search
-    def get_group_ids_for_search_filter(self, project_id, environment_id, tags):
-        from sentry.search.base import ANY
-
-        start, end = self.get_time_range()
-
-        filters = {
-            'environment': [environment_id],
-            'project_id': [project_id],
-        }
-
-        conditions = []
-        for tag, val in six.iteritems(tags):
-            col = 'tags[{}]'.format(tag)
-            if val == ANY:
-                conditions.append((col, 'IS NOT NULL', None))
-            else:
-                conditions.append((col, '=', val))
-
-        issues = snuba.query(start, end, ['issue'], conditions, filters)
-        return issues.keys()
+    def get_group_ids_for_search_filter(
+            self, project_id, environment_id, tags, candidates=None, limit=1000):
+        raise NotImplementedError
 
     # Everything from here down is basically no-ops
     def create_tag_key(self, project_id, environment_id, key, **kwargs):
@@ -501,10 +484,10 @@ class SnubaTagStorage(TagStorage):
         pass
 
     def get_tag_value_qs(self, project_id, environment_id, key, query=None):
-        return None
+        raise NotImplementedError
 
     def get_group_tag_value_qs(self, project_id, group_id, environment_id, key, value=None):
-        return None
+        raise NotImplementedError
 
     def get_event_tag_qs(self, project_id, environment_id, key, value):
-        return None
+        raise NotImplementedError

+ 3 - 2
src/sentry/tagstore/v2/backend.py

@@ -824,6 +824,7 @@ class V2TagStorage(TagStorage):
 
     def get_group_ids_for_search_filter(
             self, project_id, environment_id, tags, candidates=None, limit=1000):
+
         from sentry.search.base import ANY
         # Django doesnt support union, so we limit results and try to find
         # reasonable matches
@@ -833,7 +834,7 @@ class V2TagStorage(TagStorage):
         tag_lookups = sorted(six.iteritems(tags), key=lambda (k, v): v == ANY)
 
         # get initial matches to start the filter
-        matches = candidates
+        matches = candidates or []
 
         # for each remaining tag, find matches contained in our
         # existing set, pruning it down each iteration
@@ -865,7 +866,7 @@ class V2TagStorage(TagStorage):
             matches = list(base_qs.values_list('group_id', flat=True)[:limit])
 
             if not matches:
-                return None
+                return []
 
         return matches
 

Некоторые файлы не были показаны из-за большого количества измененных файлов