Browse Source

ref(save_event): Consolidate save_aggregate (#25481)

Markus Unterwaditzer 3 years ago
parent
commit
4645a05b5a

+ 74 - 140
src/sentry/event_manager.py

@@ -395,16 +395,9 @@ class EventManager:
             with sentry_sdk.start_span(op="event_manager.save.get_attachments"):
                 attachments = get_attachments(cache_key, job)
 
-        save_aggregate_fn = (
-            _save_aggregate2
-            if not options.get("store.race-free-group-creation-force-disable")
-            and features.has("projects:race-free-group-creation", project)
-            else _save_aggregate
-        )
-
         try:
             with sentry_sdk.start_span(op="event_manager.save.save_aggregate_fn"):
-                job["group"], job["is_new"], job["is_regression"] = save_aggregate_fn(
+                job["group"], job["is_new"], job["is_regression"] = _save_aggregate(
                     event=job["event"],
                     flat_hashes=flat_hashes,
                     hierarchical_hashes=hierarchical_hashes,
@@ -925,29 +918,29 @@ def get_culprit(data):
     )
 
 
-def _find_group_id(all_hashes):
-    for h in all_hashes:
-        if h.group_id is not None:
-            return h.group_id
-        if h.group_tombstone_id is not None:
-            raise HashDiscarded("Matches group tombstone %s" % h.group_tombstone_id)
-
-    return None
-
-
-def _save_aggregate2(event, flat_hashes, hierarchical_hashes, release, **kwargs):
-    """
-    A rewrite of _save_aggregate that is supposed to eliminate races using DB transactions.
-    """
-
-    # TODO(markus): Port over hierarchical grouping changes from _save_aggregate
-
+def _save_aggregate(event, flat_hashes, hierarchical_hashes, release, **kwargs):
     project = event.project
 
-    all_hashes = [
+    flat_grouphashes = [
         GroupHash.objects.get_or_create(project=project, hash=hash)[0] for hash in flat_hashes
     ]
-    existing_group_id = _find_group_id(all_hashes)
+
+    # The root_hierarchical_hash is the least specific hash within the tree, so
+    # typically hierarchical_hashes[0], unless a hash `n` has been split in
+    # which case `root_hierarchical_hash = hierarchical_hashes[n + 1]`. Chosing
+    # this for select_for_update mostly provides sufficient synchronization
+    # when groups are created and also relieves contention by locking a more
+    # specific hash than `hierarchical_hashes[0]`.
+    existing_group_id, root_hierarchical_hash = _find_existing_group_id(
+        project, flat_grouphashes, hierarchical_hashes
+    )
+
+    if root_hierarchical_hash is not None:
+        root_hierarchical_grouphash = GroupHash.objects.get_or_create(
+            project=project, hash=root_hierarchical_hash
+        )[0]
+    else:
+        root_hierarchical_grouphash = None
 
     if existing_group_id is None:
 
@@ -962,11 +955,24 @@ def _save_aggregate2(event, flat_hashes, hierarchical_hashes, release, **kwargs)
             span.set_tag("create_group_transaction.outcome", "no_group")
             metric_tags["create_group_transaction.outcome"] = "no_group"
 
-            all_hashes = list(
-                GroupHash.objects.filter(id__in=[h.id for h in all_hashes]).select_for_update()
+            all_hash_ids = [h.id for h in flat_grouphashes]
+            if root_hierarchical_grouphash is not None:
+                all_hash_ids.append(root_hierarchical_grouphash.id)
+
+            all_hashes = list(GroupHash.objects.filter(id__in=all_hash_ids).select_for_update())
+
+            flat_grouphashes = [gh for gh in all_hashes if gh.hash in flat_hashes]
+
+            existing_group_id, root_hierarchical_hash = _find_existing_group_id(
+                project, flat_grouphashes, hierarchical_hashes
             )
 
-            existing_group_id = _find_group_id(all_hashes)
+            if root_hierarchical_hash is not None:
+                root_hierarchical_grouphash = GroupHash.objects.get_or_create(
+                    project=project, hash=root_hierarchical_hash
+                )[0]
+            else:
+                root_hierarchical_grouphash = None
 
             if existing_group_id is None:
 
@@ -995,10 +1001,14 @@ def _save_aggregate2(event, flat_hashes, hierarchical_hashes, release, **kwargs)
                     **kwargs,
                 )
 
-                # invariant: existing_group_id is None, therefore all hashes
-                # have group_id=None, therefore none of them can be locked in
-                # migration either
-                GroupHash.objects.filter(id__in=[h.id for h in all_hashes]).update(group=group)
+                if root_hierarchical_grouphash is not None:
+                    new_hashes = [root_hierarchical_grouphash]
+                else:
+                    new_hashes = list(flat_grouphashes)
+
+                GroupHash.objects.filter(id__in=[h.id for h in new_hashes]).exclude(
+                    state=GroupHash.State.LOCKED_IN_MIGRATION
+                ).update(group=group)
 
                 is_new = True
                 is_regression = False
@@ -1017,7 +1027,11 @@ def _save_aggregate2(event, flat_hashes, hierarchical_hashes, release, **kwargs)
     group = Group.objects.get(id=existing_group_id)
 
     is_new = False
-    new_hashes = [h for h in all_hashes if h.group_id is None]
+
+    if root_hierarchical_hash is not None:
+        new_hashes = []
+    else:
+        new_hashes = [h for h in flat_grouphashes if h.group_id is None]
 
     if new_hashes:
         # There may still be secondary hashes that we did not use to find an
@@ -1061,6 +1075,9 @@ def _find_existing_group_id(
     hierarchical_hashes,
 ):
     all_grouphashes = []
+    root_hierarchical_hash = None
+
+    found_split = False
 
     if hierarchical_hashes:
         hierarchical_grouphashes = {
@@ -1070,16 +1087,32 @@ def _find_existing_group_id(
 
         for hash in reversed(hierarchical_hashes):
             group_hash = hierarchical_grouphashes.get(hash)
-            if group_hash is None:
-                continue
 
-            all_grouphashes.append(group_hash)
+            if group_hash is not None and group_hash.state == GroupHash.State.SPLIT:
+                found_split = True
+                break
 
-    all_grouphashes.extend(flat_grouphashes)
+            root_hierarchical_hash = hash
+
+            if group_hash is not None:
+                all_grouphashes.append(group_hash)
+
+        if root_hierarchical_hash is None:
+            # All hashes were split (should not be reachable from UI), so
+            # we group by most specific hash.
+            root_hierarchical_hash = hierarchical_hashes[-1]
+
+    if not found_split:
+        # In case of a split we want to avoid accidentally finding the split-up
+        # group again via flat hashes, which are very likely associated with
+        # whichever group is attached to the split hash. This distinction will
+        # become irrelevant once we start moving existing events into child
+        # groups and delete the parent group.
+        all_grouphashes.extend(flat_grouphashes)
 
     for group_hash in all_grouphashes:
         if group_hash.group_id is not None:
-            return group_hash.group_id
+            return group_hash.group_id, root_hierarchical_hash
 
         # When refactoring for hierarchical grouping, we noticed that a
         # tombstone may get ignored entirely if there is another hash *before*
@@ -1092,106 +1125,7 @@ def _find_existing_group_id(
         if group_hash.group_tombstone_id is not None:
             raise HashDiscarded("Matches group tombstone %s" % group_hash.group_tombstone_id)
 
-
-def _save_aggregate(event, flat_hashes, hierarchical_hashes, release, **kwargs):
-    project = event.project
-
-    # attempt to find a matching hash
-    flat_grouphashes = [
-        GroupHash.objects.get_or_create(project=project, hash=hash)[0] for hash in flat_hashes
-    ]
-
-    if hierarchical_hashes:
-        root_hierarchical_hash = GroupHash.objects.get_or_create(
-            project=project, hash=hierarchical_hashes[0]
-        )[0]
-    else:
-        root_hierarchical_hash = None
-
-    existing_group_id = _find_existing_group_id(project, flat_grouphashes, hierarchical_hashes)
-
-    # XXX(dcramer): this has the opportunity to create duplicate groups
-    # it should be resolved by the hash merging function later but this
-    # should be better tested/reviewed
-    if existing_group_id is None:
-        # it's possible the release was deleted between
-        # when we queried for the release and now, so
-        # make sure it still exists
-        first_release = kwargs.pop("first_release", None)
-
-        if project.id in (options.get("store.load-shed-group-creation-projects") or ()):
-            raise HashDiscarded("Load shedding group creation")
-
-        try:
-            short_id = project.next_short_id()
-        except OperationalError:
-            metrics.incr(
-                "next_short_id.timeout",
-                tags={"platform": event.platform or "unknown"},
-            )
-            raise HashDiscarded("Timeout when getting next_short_id")
-
-        with transaction.atomic():
-            group, group_is_new = (
-                Group.objects.create(
-                    project=project,
-                    short_id=short_id,
-                    first_release_id=Release.objects.filter(id=first_release.id)
-                    .values_list("id", flat=True)
-                    .first()
-                    if first_release
-                    else None,
-                    **kwargs,
-                ),
-                True,
-            )
-
-        metrics.incr(
-            "group.created", skip_internal=True, tags={"platform": event.platform or "unknown"}
-        )
-
-    else:
-        group = Group.objects.get(id=existing_group_id)
-
-        group_is_new = False
-
-    group._project_cache = project
-
-    if root_hierarchical_hash is None or root_hierarchical_hash.group_id == existing_group_id:
-        to_update = list(flat_grouphashes)
-        if group_is_new and root_hierarchical_hash is not None:
-            to_update.append(root_hierarchical_hash)
-        new_hashes = [h for h in to_update if h.group_id is None]
-    else:
-        new_hashes = []
-
-    # If all hashes are brand new we treat this event as new
-    is_new = False
-    if new_hashes:
-        # XXX: There is a race condition here wherein another process could
-        # create a new group that is associated with one of the new hashes,
-        # add some event(s) to it, and then subsequently have the hash
-        # "stolen" by this process. This then "orphans" those events from
-        # their "siblings" in the group we've created here. We don't have a
-        # way to fix this, since we can't update the group on those hashes
-        # without filtering on `group_id` (which we can't do due to query
-        # planner weirdness.) For more context, see 84c6f75a and d0e22787,
-        # as well as GH-5085.
-        GroupHash.objects.filter(id__in=[h.id for h in new_hashes]).exclude(
-            state=GroupHash.State.LOCKED_IN_MIGRATION
-        ).update(group=group)
-
-        if group_is_new and len(new_hashes) == len(to_update):
-            is_new = True
-
-    if not is_new:
-        is_regression = _process_existing_aggregate(
-            group=group, event=event, data=kwargs, release=release
-        )
-    else:
-        is_regression = False
-
-    return group, is_new, is_regression
+    return None, root_hierarchical_hash
 
 
 def _handle_regression(group, event, release):

+ 5 - 0
src/sentry/models/grouphash.py

@@ -11,9 +11,14 @@ class GroupHash(Model):
         UNLOCKED = None
         LOCKED_IN_MIGRATION = 1
 
+        # This hierarchical grouphash should be ignored/skipped for finding the group.
+        SPLIT = 2
+
     project = FlexibleForeignKey("sentry.Project", null=True)
     hash = models.CharField(max_length=32)
     group = FlexibleForeignKey("sentry.Group", null=True)
+
+    # not-null => the event should be discarded
     group_tombstone_id = BoundedPositiveIntegerField(db_index=True, null=True)
     state = BoundedPositiveIntegerField(
         choices=[(State.LOCKED_IN_MIGRATION, _("Locked (Migration in Progress)"))], null=True

+ 0 - 8
tests/sentry/event_manager/test_event_manager.py

@@ -40,7 +40,6 @@ from sentry.models import (
     UserReport,
 )
 from sentry.testutils import TestCase, assert_mock_called_once_with_partial
-from sentry.testutils.helpers import Feature
 from sentry.utils.cache import cache_key_for_event
 from sentry.utils.compat import mock
 from sentry.utils.outcomes import Outcome
@@ -1580,10 +1579,3 @@ class ReleaseIssueTest(TestCase):
             last_seen=self.timestamp + 100,
             first_seen=self.timestamp + 100,
         )
-
-
-class RaceFreeEventManagerTest(EventManagerTest):
-    @pytest.fixture(autouse=True)
-    def _save_aggregate_parameterized(self):
-        with Feature({"projects:race-free-group-creation": True}):
-            yield

+ 19 - 17
tests/sentry/event_manager/test_hierarchical_hashes.py

@@ -31,6 +31,17 @@ def fast_save(default_project):
     return inner
 
 
+def _group_hashes(group_id):
+    return {gh.hash for gh in GroupHash.objects.filter(group_id=group_id)}
+
+
+def _assoc_hash(group, hash):
+    gh = GroupHash.objects.get_or_create(project=group.project, hash=hash)[0]
+    assert gh.group is None or gh.group.id != group.id
+    gh.group = group
+    gh.save()
+
+
 @pytest.mark.django_db
 def test_move_all_events(default_project, fast_save):
     group, is_new, is_regression = fast_save("f")
@@ -43,7 +54,10 @@ def test_move_all_events(default_project, fast_save):
     assert not is_regression
     assert new_group.id == group.id
 
-    assert {g.hash for g in GroupHash.objects.filter(group=group)} == {"a" * 32, "b" * 32, "c" * 32}
+    _assoc_hash(group, "a" * 32)
+    _assoc_hash(group, "b" * 32)
+
+    assert _group_hashes(group.id) == {"a" * 32, "b" * 32, "c" * 32}
 
     # simulate split operation where all events of group are moved into a more specific hash
     GroupHash.objects.filter(group=group).delete()
@@ -66,14 +80,7 @@ def test_move_all_events(default_project, fast_save):
     assert not is_regression
     assert new_group.id != group.id
 
-    assert {g.hash for g in GroupHash.objects.filter(group=new_group)} == {
-        # Since this is the "root group" again (primary hash is c), it's fine
-        # to associate flat hashes w it
-        "a" * 32,
-        "b" * 32,
-        # one hierarchical hash associated
-        "c" * 32,
-    }
+    assert _group_hashes(new_group.id) == {"c" * 32}
 
 
 @pytest.mark.django_db
@@ -87,7 +94,7 @@ def test_partial_move(default_project, fast_save):
     assert not is_regression
     assert new_group.id == group.id
 
-    assert {g.hash for g in GroupHash.objects.filter(group=group)} == {"a" * 32, "b" * 32, "c" * 32}
+    assert _group_hashes(group.id) == {"c" * 32}
 
     # simulate split operation where event "f" of group is moved into a more specific hash
     group2 = Group.objects.create(project=default_project)
@@ -98,7 +105,7 @@ def test_partial_move(default_project, fast_save):
     assert not is_regression
     assert new_group.id == group2.id
 
-    assert {g.hash for g in GroupHash.objects.filter(group=new_group)} == {
+    assert _group_hashes(new_group.id) == {
         # one hierarchical hash associated
         # no flat hashes associated when sorting into split group!
         "f"
@@ -110,12 +117,7 @@ def test_partial_move(default_project, fast_save):
     assert not is_regression
     assert new_group.id == group.id
 
-    assert {g.hash for g in GroupHash.objects.filter(group=new_group)} == {
-        # Since this is the "root group" again (primary hash is c), it's fine
-        # to associate flat hashes w it
-        "a" * 32,
-        "b" * 32,
-        # one hierarchical hash associated
+    assert _group_hashes(new_group.id) == {
         "c" * 32,
     }
 

+ 9 - 19
tests/sentry/event_manager/test_save_aggregate.py

@@ -4,17 +4,17 @@ from threading import Thread
 
 import pytest
 
-from sentry.event_manager import _save_aggregate, _save_aggregate2
+from sentry.event_manager import _save_aggregate
 from sentry.eventstore.models import Event
 
 
 @pytest.mark.django_db(transaction=True)
 @pytest.mark.parametrize(
-    "save_aggregate_version",
+    "is_race_free",
     [
-        # New group creation code, which is supposed to not have races
-        "new_save_aggregate",
-        # New group creation code with removed transaction isolation, which is then
+        # regular group creation code, which is supposed to not have races
+        True,
+        # group creation code with removed transaction isolation, which is then
         # supposed to create multiple groups. This variant exists such that we can
         # ensure the test would find race conditions in principle, and does not
         # just always pass because of low parallelism. In a sense this variant
@@ -23,18 +23,13 @@ from sentry.eventstore.models import Event
         # If this variant fails, CONCURRENCY needs to be increased or e.g. thread
         # barriers need to be used to ensure data races. This does not seem to be
         # necessary so far.
-        "new_broken_save_aggregate",
-        # Old group creation code which is "supposed to" have races
-        "old_save_aggregate",
+        False,
     ],
 )
-def test_group_creation_race(monkeypatch, default_project, save_aggregate_version):
+def test_group_creation_race(monkeypatch, default_project, is_race_free):
     CONCURRENCY = 2
 
-    new_variant = save_aggregate_version in ("new_save_aggregate", "new_broken_save_aggregate")
-    is_race_free = save_aggregate_version == "new_save_aggregate"
-
-    if save_aggregate_version == "new_broken_save_aggregate":
+    if not is_race_free:
 
         class FakeTransactionModule:
             @staticmethod
@@ -49,11 +44,6 @@ def test_group_creation_race(monkeypatch, default_project, save_aggregate_versio
         # select_for_update cannot be used outside of transactions
         monkeypatch.setattr("django.db.models.QuerySet.select_for_update", lambda self: self)
 
-    if new_variant:
-        save_aggregate = _save_aggregate2
-    else:
-        save_aggregate = _save_aggregate
-
     return_values = []
 
     def save_event():
@@ -65,7 +55,7 @@ def test_group_creation_race(monkeypatch, default_project, save_aggregate_versio
         )
 
         return_values.append(
-            save_aggregate(
+            _save_aggregate(
                 evt,
                 flat_hashes=["a" * 32, "b" * 32],
                 hierarchical_hashes=[],