Browse Source

fix(reprocessing): Migrate event attachments from "remaining" events. (#28025)

Markus Unterwaditzer 3 years ago
parent
commit
2e01b2de7f

+ 9 - 1
src/sentry/reprocessing2.py

@@ -98,7 +98,8 @@ logger = logging.getLogger("sentry.reprocessing")
 _REDIS_SYNC_TTL = 3600 * 24
 
 
-# Note: Event attachments and group reports are migrated in save_event.
+# Group-related models are only a few per-group and are migrated at
+# once.
 GROUP_MODELS_TO_MIGRATE = DIRECT_GROUP_RELATED_MODELS + (models.Activity,)
 
 # If we were to move groupinbox to the new, empty group, inbox would show the
@@ -106,6 +107,13 @@ GROUP_MODELS_TO_MIGRATE = DIRECT_GROUP_RELATED_MODELS + (models.Activity,)
 # care of assigning GroupInbox like normally.
 GROUP_MODELS_TO_MIGRATE = tuple(x for x in GROUP_MODELS_TO_MIGRATE if x != models.GroupInbox)
 
+# Event attachments and group reports are per-event. This means that:
+#
+# 1. they are migrated as part of the processing pipeline (post-process/save-event)
+# 2. there are a lot of them per group. For remaining events, we need to chunk
+#    up those queries for them to not get too slow
+EVENT_MODELS_TO_MIGRATE = (models.EventAttachment, models.UserReport)
+
 
 class CannotReprocess(Exception):
     pass

+ 9 - 5
src/sentry/tasks/reprocessing2.py

@@ -4,7 +4,7 @@ import time
 import sentry_sdk
 from django.db import transaction
 
-from sentry import eventstore, eventstream, models, nodestore
+from sentry import eventstore, eventstream, nodestore
 from sentry.eventstore.models import Event
 from sentry.tasks.base import instrumented_task, retry
 from sentry.utils.query import celery_run_batch_query
@@ -158,14 +158,13 @@ def handle_remaining_events(
 
     from sentry import buffer
     from sentry.models.group import Group
+    from sentry.reprocessing2 import EVENT_MODELS_TO_MIGRATE
 
     assert remaining_events in ("delete", "keep")
 
     if remaining_events == "delete":
-        models.EventAttachment.objects.filter(
-            project_id=project_id, event_id__in=event_ids
-        ).delete()
-        models.UserReport.objects.filter(project_id=project_id, event_id__in=event_ids).delete()
+        for cls in EVENT_MODELS_TO_MIGRATE:
+            cls.objects.filter(project_id=project_id, event_id__in=event_ids).delete()
 
         # Remove from nodestore
         node_ids = [Event.generate_node_id(project_id, event_id) for event_id in event_ids]
@@ -176,6 +175,11 @@ def handle_remaining_events(
             project_id, event_ids, from_timestamp=from_timestamp, to_timestamp=to_timestamp
         )
     elif remaining_events == "keep":
+        for cls in EVENT_MODELS_TO_MIGRATE:
+            cls.objects.filter(project_id=project_id, event_id__in=event_ids).update(
+                group_id=new_group_id
+            )
+
         eventstream.replace_group_unsafe(
             project_id,
             event_ids,

+ 32 - 15
tests/sentry/tasks/test_reprocessing2.py

@@ -26,6 +26,27 @@ from sentry.testutils.helpers.datetime import before_now, iso_format
 from sentry.utils.cache import cache_key_for_event
 
 
+def _create_event_attachment(evt, type):
+    file = File.objects.create(name="foo", type=type)
+    file.putfile(BytesIO(b"hello world"))
+    EventAttachment.objects.create(
+        event_id=evt.event_id,
+        group_id=evt.group_id,
+        project_id=evt.project_id,
+        file_id=file.id,
+        type=file.type,
+        name="foo",
+    )
+
+
+def _create_user_report(evt):
+    UserReport.objects.create(
+        project_id=evt.project_id,
+        event_id=evt.event_id,
+        name="User",
+    )
+
+
 @pytest.fixture(autouse=True)
 def reprocessing_feature(monkeypatch):
     monkeypatch.setattr("sentry.tasks.reprocessing2.GROUP_REPROCESSING_CHUNK_SIZE", 1)
@@ -237,6 +258,9 @@ def test_max_events(
         event_id: eventstore.get_event_by_id(default_project.id, event_id) for event_id in event_ids
     }
 
+    for evt in old_events.values():
+        _create_user_report(evt)
+
     (group_id,) = {e.group_id for e in old_events.values()}
 
     with burst_task_runner() as burst:
@@ -257,6 +281,12 @@ def test_max_events(
             elif remaining_events == "keep":
                 assert event.group_id != group_id
                 assert dict(event.data) == dict(old_events[event_id].data)
+                assert (
+                    UserReport.objects.get(
+                        project_id=default_project.id, event_id=event_id
+                    ).group_id
+                    != group_id
+                )
             else:
                 raise ValueError(remaining_events)
         else:
@@ -314,22 +344,9 @@ def test_attachments_and_userfeedback(
 
     for evt in (event, event_to_delete):
         for type in ("event.attachment", "event.minidump"):
-            file = File.objects.create(name="foo", type=type)
-            file.putfile(BytesIO(b"hello world"))
-            EventAttachment.objects.create(
-                event_id=evt.event_id,
-                group_id=evt.group_id,
-                project_id=default_project.id,
-                file_id=file.id,
-                type=file.type,
-                name="foo",
-            )
+            _create_event_attachment(evt, type)
 
-        UserReport.objects.create(
-            project_id=default_project.id,
-            event_id=evt.event_id,
-            name="User",
-        )
+        _create_user_report(evt)
 
     with burst_task_runner() as burst:
         reprocess_group(default_project.id, event.group_id, max_events=1)