Browse Source

ref(unmerge): Refactor unmerge task to use Snuba (#14776)

Fetch events to be unmerged from Snuba instead of Postgres. We also stop updating the group_id for Postgres events now, since we are not relying on this anymore.
Lyn Nagara 5 years ago
parent
commit
e683bc2784
2 changed files with 207 additions and 229 deletions
  1. 45 18
      src/sentry/tasks/unmerge.py
  2. 162 211
      tests/snuba/tasks/test_unmerge.py

+ 45 - 18
src/sentry/tasks/unmerge.py

@@ -4,8 +4,9 @@ import logging
 from collections import defaultdict, OrderedDict
 
 from django.db import transaction
+from django.conf import settings
 
-from sentry import eventstream, tagstore
+from sentry import eventstore, eventstream, tagstore
 from sentry.app import tsdb
 from sentry.constants import DEFAULT_LOGGER_NAME, LOG_LEVELS_MAP
 from sentry.event_manager import generate_culprit
@@ -214,16 +215,21 @@ def migrate_events(
         destination = Group.objects.get(id=destination_id)
         destination.update(**get_group_backfill_attributes(caches, destination, events))
 
-    event_id_set = set(event.id for event in events)
-
-    Event.objects.filter(project_id=project.id, id__in=event_id_set).update(group_id=destination_id)
+    event_id_set = set(event.event_id for event in events)
 
     for event in events:
         event.group = destination
 
-    tagstore.update_group_for_events(
-        project_id=project.id, event_ids=event_id_set, destination_id=destination_id
-    )
+    if settings.SENTRY_TAGSTORE == "sentry.tagstore.legacy.LegacyTagStorage":
+        postgres_id_set = set(
+            Event.objects.filter(project_id=project.id, event_id__in=event_id_set).values_list(
+                "id", flat=True
+            )
+        )
+
+        tagstore.update_group_for_events(
+            project_id=project.id, event_ids=postgres_id_set, destination_id=destination_id
+        )
 
     event_event_id_set = set(event.event_id for event in events)
 
@@ -495,7 +501,7 @@ def unmerge(
     destination_id,
     fingerprints,
     actor_id,
-    cursor=None,
+    last_event=None,
     batch_size=500,
     source_fields_reset=False,
     eventstream_state=None,
@@ -510,7 +516,7 @@ def unmerge(
     # On the first iteration of this loop, we clear out all of the
     # denormalizations from the source group so that we can have a clean slate
     # for the new, repaired data.
-    if cursor is None:
+    if last_event is None:
         fingerprints = lock_hashes(project_id, source_id, fingerprints)
         truncate_denormalizations(source)
 
@@ -518,20 +524,41 @@ def unmerge(
 
     project = caches["Project"](project_id)
 
-    # We fetch the events in descending order by their primary key to get the
-    # best approximation of the most recently received events.
-    queryset = Event.objects.filter(project_id=project_id, group_id=source_id).order_by("-id")
-
-    if cursor is not None:
-        queryset = queryset.filter(id__lt=cursor)
+    # We process events sorted in descending order by -timestamp, -event_id. We need
+    # to include event_id as well as timestamp in the ordering criteria since:
+    #
+    # - Event timestamps are rounded to the second so multiple events are likely
+    # to have the same timestamp.
+    #
+    # - When sorting by timestamp alone, Snuba may not give us a deterministic
+    # order for events with the same timestamp.
+    #
+    # - We need to ensure that we do not skip any events between batches. If we
+    # only sorted by timestamp < last_event.timestamp it would be possible to
+    # have missed an event with the same timestamp as the last item in the
+    # previous batch.
+
+    conditions = []
+    if last_event is not None:
+        conditions.extend(
+            [
+                ["timestamp", "<=", last_event.timestamp],
+                [["timestamp", "<", last_event.timestamp], ["event_id", "<", last_event.event_id]],
+            ]
+        )
 
-    events = list(queryset[:batch_size])
+    events = eventstore.get_events(
+        filter_keys={"project_id": [project_id], "issue": [source.id]},
+        conditions=conditions,
+        limit=batch_size,
+        referrer="unmerge",
+        orderby=["-timestamp", "-event_id"],
+    )
 
     # If there are no more events to process, we're done with the migration.
     if not events:
         tagstore.update_group_tag_key_values_seen(project_id, [source_id, destination_id])
         unlock_hashes(project_id, fingerprints)
-
         logger.warning("Unmerge complete (eventstream state: %s)", eventstream_state)
         if eventstream_state:
             eventstream.end_unmerge(eventstream_state)
@@ -574,7 +601,7 @@ def unmerge(
         destination_id,
         fingerprints,
         actor_id,
-        cursor=events[-1].id,
+        last_event=events[-1],
         batch_size=batch_size,
         source_fields_reset=source_fields_reset,
         eventstream_state=eventstream_state,

+ 162 - 211
tests/sentry/tasks/test_unmerge.py → tests/snuba/tasks/test_unmerge.py

@@ -7,25 +7,13 @@ import logging
 import uuid
 from collections import OrderedDict
 from datetime import datetime, timedelta
-
 import pytz
-from django.utils import timezone
-from mock import patch, Mock
 
-from sentry import tagstore
-from sentry.tagstore.models import GroupTagValue
+from mock import patch
+
+from sentry import eventstream
 from sentry.app import tsdb
-from sentry.models import (
-    Activity,
-    Environment,
-    EnvironmentProject,
-    Event,
-    Group,
-    GroupHash,
-    GroupRelease,
-    Release,
-    UserReport,
-)
+from sentry.models import Environment, Event, Group, GroupHash, GroupRelease, Release, UserReport
 from sentry.similarity import features, _make_index_backend
 from sentry.tasks.unmerge import (
     get_caches,
@@ -35,9 +23,12 @@ from sentry.tasks.unmerge import (
     get_group_creation_attributes,
     unmerge,
 )
-from sentry.testutils import TestCase
+from sentry.testutils import SnubaTestCase, TestCase
 from sentry.utils.dates import to_timestamp
 from sentry.utils import redis
+from sentry.testutils.helpers.datetime import before_now, iso_format
+from sentry.tasks.merge import merge_groups
+from sentry.tagstore.snuba.backend import SnubaTagStorage
 
 from six.moves import xrange
 
@@ -60,7 +51,7 @@ def test_get_fingerprint():
 
 
 @patch("sentry.similarity.features.index", new=index)
-class UnmergeTestCase(TestCase):
+class UnmergeTestCase(TestCase, SnubaTestCase):
     def test_get_group_creation_attributes(self):
         now = datetime(2017, 5, 3, 6, 6, 6, tzinfo=pytz.utc)
         events = [
@@ -161,89 +152,53 @@ class UnmergeTestCase(TestCase):
             "first_release": None,
         }
 
-    @patch("sentry.tasks.unmerge.eventstream")
-    def test_unmerge(self, mock_eventstream):
-        eventstream_state = object()
-        mock_eventstream.start_unmerge = Mock(return_value=eventstream_state)
-
-        def shift(i):
-            return timedelta(seconds=1 << i)
+    def test_unmerge(self):
+        tagstore = SnubaTagStorage()  # Snuba is not the default tag storage for tests yet
+        now = before_now(seconds=20).replace(microsecond=0, tzinfo=pytz.utc)
 
-        now = timezone.now() - shift(16)
+        def time_from_now(offset=0):
+            return now + timedelta(seconds=offset)
 
         project = self.create_project()
-        source = self.create_group(project)
 
         sequence = itertools.count(0)
         tag_values = itertools.cycle(["red", "green", "blue"])
         user_values = itertools.cycle([{"id": 1}, {"id": 2}])
 
-        for environment in ("production", ""):
-            EnvironmentProject.objects.create(
-                environment=Environment.objects.create(
-                    organization_id=project.organization_id, name=environment
-                ),
-                project=project,
-            )
-
-        def create_message_event(template, parameters, environment, release):
+        def create_message_event(template, parameters, environment, release, fingerprint="group1"):
             i = next(sequence)
 
             event_id = uuid.UUID(fields=(i, 0x0, 0x1000, 0x80, 0x80, 0x808080808080)).hex
 
             tags = [["color", next(tag_values)]]
 
-            if environment:
-                tags.append(["environment", environment])
-
             if release:
                 tags.append(["sentry:release", release])
 
-            event = Event.objects.create(
-                project_id=project.id,
-                group_id=source.id,
-                event_id=event_id,
-                message="%s" % (id,),
-                datetime=now + shift(i),
+            event = self.store_event(
                 data={
-                    "environment": environment,
+                    "event_id": event_id,
+                    "message": template % parameters,
                     "type": "default",
-                    "metadata": {"title": template % parameters},
-                    "logentry": {
-                        "message": template,
-                        "params": parameters,
-                        "formatted": template % parameters,
-                    },
                     "user": next(user_values),
                     "tags": tags,
+                    "fingerprint": [fingerprint],
+                    "timestamp": iso_format(now + timedelta(seconds=i)),
+                    "environment": environment,
+                    "release": release,
                 },
+                project_id=project.id,
             )
 
-            with self.tasks():
-                Group.objects.add_tags(
-                    source,
-                    Environment.objects.get(
-                        organization_id=project.organization_id, name=environment
-                    ),
-                    tags=event.tags,
-                )
-
             UserReport.objects.create(
                 project_id=project.id,
-                group_id=source.id,
+                group_id=event.group.id,
                 event_id=event_id,
                 name="Log Hat",
                 email="ceo@corptron.com",
                 comments="Quack",
             )
 
-            if release:
-                Release.get_or_create(
-                    project=project,
-                    version=event.get_tag("sentry:release"),
-                    date_added=event.datetime,
-                )
-
             features.record([event])
 
             return event
@@ -260,185 +215,143 @@ class UnmergeTestCase(TestCase):
 
         for event in (
             create_message_event(
-                "This is message #%s!", i, environment="production", release="version"
+                "This is message #%s!",
+                i,
+                environment="production",
+                release="version2",
+                fingerprint="group2",
             )
             for i in xrange(10, 16)
         ):
             events.setdefault(get_fingerprint(event), []).append(event)
 
-        event = create_message_event("This is message #%s!", 17, environment="", release=None)
+        event = create_message_event(
+            "This is message #%s!",
+            17,
+            environment="staging",
+            release="version3",
+            fingerprint="group3",
+        )
+
         events.setdefault(get_fingerprint(event), []).append(event)
 
-        assert len(events) == 2
-        assert sum(map(len, events.values())) == 17
+        merge_source, source, destination = list(Group.objects.all())
 
-        # XXX: This is super contrived considering that it doesn't actually go
-        # through the event pipeline, but them's the breaks, eh?
-        for fingerprint in events.keys():
-            GroupHash.objects.create(project=project, group=source, hash=fingerprint)
+        assert len(events) == 3
+        assert sum(map(len, events.values())) == 17
 
         production_environment = Environment.objects.get(
             organization_id=project.organization_id, name="production"
         )
 
-        assert set(
-            [
-                (gtk.key, gtk.values_seen)
-                for gtk in tagstore.get_group_tag_keys(
-                    source.project_id, source.id, [production_environment.id]
-                )
-            ]
-        ) == set([(u"color", 3), (u"environment", 1), (u"sentry:release", 1)])
+        with self.tasks():
+            eventstream_state = eventstream.start_merge(project.id, [merge_source.id], source.id)
+            merge_groups.delay([merge_source.id], source.id)
+            eventstream.end_merge(eventstream_state)
 
         assert set(
             [
-                (gtv.key, gtv.value, gtv.times_seen)
-                for gtv in GroupTagValue.objects.filter(
-                    project_id=source.project_id, group_id=source.id
+                (gtv.value, gtv.times_seen)
+                for gtv in tagstore.get_group_tag_values(
+                    project.id, source.id, production_environment.id, "color"
                 )
             ]
-        ) == set(
-            [
-                (u"color", u"red", 6),
-                (u"color", u"green", 6),
-                (u"color", u"blue", 5),
-                (u"environment", u"production", 16),
-                (u"sentry:release", u"version", 16),
-            ]
-        )
+        ) == set([("red", 6), ("green", 5), ("blue", 5)])
 
-        assert features.compare(source) == [
-            (
-                source.id,
-                {
-                    "exception:message:character-shingles": None,
-                    "exception:stacktrace:application-chunks": None,
-                    "exception:stacktrace:pairs": None,
-                    "message:message:character-shingles": 1.0,
-                },
-            )
-        ]
+        similar_items = features.compare(source)
+        assert len(similar_items) == 2
+        assert similar_items[0][0] == source.id
+        assert similar_items[0][1]["message:message:character-shingles"] == 1.0
+        assert similar_items[1][0] == destination.id
+        assert similar_items[1][1]["message:message:character-shingles"] < 1.0
 
         with self.tasks():
+            eventstream_state = eventstream.start_unmerge(
+                project.id, [events.keys()[0]], source.id, destination.id
+            )
             unmerge.delay(
-                source.project_id, source.id, None, [events.keys()[1]], None, batch_size=5
+                project.id, source.id, destination.id, [events.keys()[0]], None, batch_size=5
             )
+            eventstream.end_unmerge(eventstream_state)
 
-        assert list(
-            Group.objects.filter(id=source.id).values_list("times_seen", "first_seen", "last_seen")
-        ) == [(10, now + shift(0), now + shift(9))]
-
-        source_activity = Activity.objects.get(group_id=source.id, type=Activity.UNMERGE_SOURCE)
-
-        destination = Group.objects.get(id=source_activity.data["destination_id"])
-
-        mock_eventstream.start_unmerge.assert_called_once_with(
-            source.project_id, [events.keys()[1]], source.id, destination.id
+        assert (
+            list(
+                Group.objects.filter(id=merge_source.id).values_list(
+                    "times_seen", "first_seen", "last_seen"
+                )
+            )
+            == []
         )
 
-        mock_eventstream.end_unmerge.assert_called_once_with(eventstream_state)
+        assert list(
+            Group.objects.filter(id=source.id).values_list("times_seen", "first_seen", "last_seen")
+        ) == [(6, time_from_now(10), time_from_now(15))]
 
         assert list(
             Group.objects.filter(id=destination.id).values_list(
                 "times_seen", "first_seen", "last_seen"
             )
-        ) == [(7, now + shift(10), now + shift(16))]
-
-        assert source_activity.data == {
-            "destination_id": destination.id,
-            "fingerprints": [events.keys()[1]],
-        }
+        ) == [(11, time_from_now(0), time_from_now(16))]
 
         assert source.id != destination.id
         assert source.project == destination.project
 
-        assert Activity.objects.get(
-            group_id=destination.id, type=Activity.UNMERGE_DESTINATION
-        ).data == {"source_id": source.id, "fingerprints": [events.keys()[1]]}
-
-        source_event_event_ids = map(lambda event: event.event_id, events.values()[0])
+        destination_event_ids = map(lambda event: event.event_id, events.values()[1])
 
         assert set(
             UserReport.objects.filter(group_id=source.id).values_list("event_id", flat=True)
-        ) == set(source_event_event_ids)
+        ) == set(destination_event_ids)
 
         assert set(
             GroupHash.objects.filter(group_id=source.id).values_list("hash", flat=True)
-        ) == set([events.keys()[0]])
+        ) == set([events.keys()[0], events.keys()[1]])
 
         assert set(
             GroupRelease.objects.filter(group_id=source.id).values_list(
                 "environment", "first_seen", "last_seen"
             )
-        ) == set([(u"production", now + shift(0), now + shift(9))])
+        ) == set([(u"production", time_from_now(10), time_from_now(15))])
 
         assert set(
             [
-                (gtk.key, gtk.values_seen)
-                for gtk in tagstore.get_group_tag_keys(
-                    source.project_id, source.id, [production_environment.id]
+                (gtv.value, gtv.times_seen)
+                for gtv in tagstore.get_group_tag_values(
+                    project.id, destination.id, production_environment.id, "color"
                 )
             ]
-        ) == set([(u"color", 3), (u"environment", 1), (u"sentry:release", 1)])
+        ) == set([(u"red", 4), (u"green", 3), (u"blue", 3)])
 
-        assert set(
-            [
-                (gtv.key, gtv.value, gtv.times_seen, gtv.first_seen, gtv.last_seen)
-                for gtv in GroupTagValue.objects.filter(
-                    project_id=source.project_id, group_id=source.id
-                )
-            ]
-        ) == set(
-            [
-                (u"color", u"red", 4, now + shift(0), now + shift(9)),
-                (u"color", u"green", 3, now + shift(1), now + shift(7)),
-                (u"color", u"blue", 3, now + shift(2), now + shift(8)),
-                (u"environment", u"production", 10, now + shift(0), now + shift(9)),
-                (u"sentry:release", u"version", 10, now + shift(0), now + shift(9)),
-            ]
+        destination_event_ids = map(
+            lambda event: event.event_id, events.values()[0] + events.values()[2]
         )
 
-        destination_event_event_ids = map(lambda event: event.event_id, events.values()[1])
-
         assert set(
             UserReport.objects.filter(group_id=destination.id).values_list("event_id", flat=True)
-        ) == set(destination_event_event_ids)
+        ) == set(destination_event_ids)
 
         assert set(
             GroupHash.objects.filter(group_id=destination.id).values_list("hash", flat=True)
-        ) == set([events.keys()[1]])
+        ) == set([events.keys()[2]])
 
         assert set(
             GroupRelease.objects.filter(group_id=destination.id).values_list(
                 "environment", "first_seen", "last_seen"
             )
-        ) == set([(u"production", now + shift(10), now + shift(15))])
-
-        assert set(
+        ) == set(
             [
-                (gtk.key, gtk.values_seen)
-                for gtk in tagstore.get_group_tag_keys(
-                    source.project_id, source.id, [production_environment.id]
-                )
+                ("production", time_from_now(0), time_from_now(9)),
+                ("staging", time_from_now(16), time_from_now(16)),
             ]
-        ) == set([(u"color", 3), (u"environment", 1), (u"sentry:release", 1)])
+        )
 
         assert set(
             [
-                (gtv.key, gtv.value, gtv.times_seen, gtv.first_seen, gtv.last_seen)
-                for gtv in GroupTagValue.objects.filter(
-                    project_id=destination.project_id, group_id=destination.id
+                (gtk.value, gtk.times_seen)
+                for gtk in tagstore.get_group_tag_values(
+                    project.id, destination.id, production_environment.id, "color"
                 )
             ]
-        ) == set(
-            [
-                (u"color", u"red", 2, now + shift(12), now + shift(15)),
-                (u"color", u"green", 3, now + shift(10), now + shift(16)),
-                (u"color", u"blue", 2, now + shift(11), now + shift(14)),
-                (u"environment", u"production", 6, now + shift(10), now + shift(15)),
-                (u"sentry:release", u"version", 6, now + shift(10), now + shift(15)),
-            ]
-        )
+        ) == set([("red", 4), ("blue", 3), ("green", 3)])
 
         rollup_duration = 3600
 
@@ -446,7 +359,7 @@ class UnmergeTestCase(TestCase):
             tsdb.models.group,
             [source.id, destination.id],
             now - timedelta(seconds=rollup_duration),
-            now + shift(15),
+            time_from_now(17),
             rollup_duration,
         )
 
@@ -454,7 +367,7 @@ class UnmergeTestCase(TestCase):
             tsdb.models.group,
             [source.id, destination.id],
             now - timedelta(seconds=rollup_duration),
-            now + shift(15),
+            time_from_now(17),
             rollup_duration,
             environment_ids=[production_environment.id],
         )
@@ -481,24 +394,37 @@ class UnmergeTestCase(TestCase):
             for key in set(actual.keys()) - set(expected.keys()):
                 assert actual.get(key, 0) == default
 
-        for series in [time_series, environment_time_series]:
-            assert_series_contains(
-                get_expected_series_values(rollup_duration, events.values()[0]),
-                series[source.id],
-                0,
-            )
+        assert_series_contains(
+            get_expected_series_values(rollup_duration, events.values()[1]),
+            time_series[source.id],
+            0,
+        )
 
-            assert_series_contains(
-                get_expected_series_values(rollup_duration, events.values()[1][:-1]),
-                series[destination.id],
-                0,
-            )
+        assert_series_contains(
+            get_expected_series_values(rollup_duration, events.values()[0] + events.values()[2]),
+            time_series[destination.id],
+            0,
+        )
+
+        assert_series_contains(
+            get_expected_series_values(rollup_duration, events.values()[1]),
+            environment_time_series[source.id],
+            0,
+        )
+
+        assert_series_contains(
+            get_expected_series_values(
+                rollup_duration, events.values()[0][:-1] + events.values()[2]
+            ),
+            environment_time_series[destination.id],
+            0,
+        )
 
         time_series = tsdb.get_distinct_counts_series(
             tsdb.models.users_affected_by_group,
             [source.id, destination.id],
             now - timedelta(seconds=rollup_duration),
-            now + shift(16),
+            time_from_now(17),
             rollup_duration,
         )
 
@@ -506,7 +432,7 @@ class UnmergeTestCase(TestCase):
             tsdb.models.users_affected_by_group,
             [source.id, destination.id],
             now - timedelta(seconds=rollup_duration),
-            now + shift(16),
+            time_from_now(17),
             rollup_duration,
             environment_id=production_environment.id,
         )
@@ -521,7 +447,7 @@ class UnmergeTestCase(TestCase):
                 {
                     timestamp: len(values)
                     for timestamp, values in get_expected_series_values(
-                        rollup_duration, events.values()[0], collect_by_user_tag
+                        rollup_duration, events.values()[1], collect_by_user_tag
                     ).items()
                 },
                 series[source.id],
@@ -531,19 +457,22 @@ class UnmergeTestCase(TestCase):
                 {
                     timestamp: len(values)
                     for timestamp, values in get_expected_series_values(
-                        rollup_duration, events.values()[1], collect_by_user_tag
+                        rollup_duration,
+                        events.values()[0] + events.values()[2],
+                        collect_by_user_tag,
                     ).items()
                 },
                 time_series[destination.id],
             )
 
-        time_series = tsdb.get_most_frequent_series(
-            tsdb.models.frequent_releases_by_group,
-            [source.id, destination.id],
-            now - timedelta(seconds=rollup_duration),
-            now + shift(16),
-            rollup_duration,
-        )
+        def strip_zeroes(data):
+            for group_id, series in data.items():
+                for _, values in series:
+                    for key, val in values.items():
+                        if val == 0:
+                            values.pop(key)
+
+            return data
 
         def collect_by_release(group, aggregate, event):
             aggregate = aggregate if aggregate is not None else {}
@@ -560,9 +489,23 @@ class UnmergeTestCase(TestCase):
             aggregate[release] = aggregate.get(release, 0) + 1
             return aggregate
 
+        items = {}
+        for i in [source.id, destination.id]:
+            items[i] = list(GroupRelease.objects.filter(group_id=i).values_list("id", flat=True))
+
+        time_series = strip_zeroes(
+            tsdb.get_frequency_series(
+                tsdb.models.frequent_releases_by_group,
+                items,
+                now - timedelta(seconds=rollup_duration),
+                time_from_now(17),
+                rollup_duration,
+            )
+        )
+
         assert_series_contains(
             get_expected_series_values(
-                rollup_duration, events.values()[0], functools.partial(collect_by_release, source)
+                rollup_duration, events.values()[1], functools.partial(collect_by_release, source)
             ),
             time_series[source.id],
             {},
@@ -571,19 +514,25 @@ class UnmergeTestCase(TestCase):
         assert_series_contains(
             get_expected_series_values(
                 rollup_duration,
-                events.values()[1],
+                events.values()[0] + events.values()[2],
                 functools.partial(collect_by_release, destination),
             ),
             time_series[destination.id],
             {},
         )
 
-        time_series = tsdb.get_most_frequent_series(
-            tsdb.models.frequent_environments_by_group,
-            [source.id, destination.id],
-            now - timedelta(seconds=rollup_duration),
-            now + shift(16),
-            rollup_duration,
+        items = {}
+        for i in [source.id, destination.id]:
+            items[i] = list(Environment.objects.all().values_list("id", flat=True))
+
+        time_series = strip_zeroes(
+            tsdb.get_frequency_series(
+                tsdb.models.frequent_environments_by_group,
+                items,
+                now - timedelta(seconds=rollup_duration),
+                time_from_now(17),
+                rollup_duration,
+            )
         )
 
         def collect_by_environment(aggregate, event):
@@ -595,13 +544,15 @@ class UnmergeTestCase(TestCase):
             return aggregate
 
         assert_series_contains(
-            get_expected_series_values(rollup_duration, events.values()[0], collect_by_environment),
+            get_expected_series_values(rollup_duration, events.values()[1], collect_by_environment),
             time_series[source.id],
             {},
         )
 
         assert_series_contains(
-            get_expected_series_values(rollup_duration, events.values()[1], collect_by_environment),
+            get_expected_series_values(
+                rollup_duration, events.values()[0] + events.values()[2], collect_by_environment
+            ),
             time_series[destination.id],
             {},
         )