Browse Source

feat: Basic implementation of reprocessing, cloning events (#20252)

If a user fails to provide debug symbols, their native stacktraces are worthless. Sometimes they realize this too late when the event has already been saved.

Ideally they would be able to just "refresh" an event, which would go through stacktrace symbolication again and be updated in place, with the event moving between groups if the stacktrace is significantly different. Unfortunately Snuba does not support that, so we resorted to a more barebones implementation: When the user hits the reprocess button, the event is duplicated, assigned a new event ID and timestamp, and that copy is inserted into the evnet processing pipeline again. That's basically what this PR does:

    Additional code in preprocess_event to backup the event payload before processing (but after data scrubbing)
    Reprocessing endpoints that trigger celery task to reprocess entire issue or a single event
    In either case the payload backup is loaded, the attachments are loaded from filestore into the attachments_cache, and the event goes through the same exact pipeline again.
    ...while being on the reprocessing queue such that backlogs don't interfere with "live" event ingestion (note: that queue already exists for the "old reprocessing" feature that basically prevents the event from saving if the stacktrace is bad, and gives the user a button to "resume the pipeline/restart processing")
Markus Unterwaditzer 4 years ago
parent
commit
288d63d889

+ 1 - 1
.travis.yml

@@ -244,7 +244,7 @@ matrix:
       before_install:
         - *base_install
         - *start_snuba
-        - docker run -d --network host --name symbolicator us.gcr.io/sentryio/symbolicator:latest run
+        - docker run -d -v $PWD/config/symbolicator/:/etc/symbolicator --network host --name symbolicator us.gcr.io/sentryio/symbolicator:latest run -c /etc/symbolicator/config.yml
         - docker ps -a
 
     - <<: *postgres_default

+ 11 - 0
config/symbolicator/config.yml

@@ -0,0 +1,11 @@
+bind: "0.0.0.0:3021"
+logging:
+  level: "debug"
+  format: "pretty"
+  enable_backtraces: true
+
+# explicitly disable caches as it's not something we want in tests. in
+# development it may be less ideal. perhaps we should do the same thing as we
+# do with relay one day (one container per test/session), although that will be
+# slow
+cache_dir: null

+ 33 - 0
src/sentry/api/endpoints/event_reprocessing.py

@@ -0,0 +1,33 @@
+from __future__ import absolute_import
+
+import time
+
+from sentry import features
+from sentry.api.bases.project import ProjectEndpoint
+from sentry.tasks.reprocessing2 import reprocess_event
+
+
+class EventReprocessingEndpoint(ProjectEndpoint):
+    def post(self, request, project, event_id):
+        """
+        Reprocess a single event
+        ````````````````````````
+
+        Triggers reprocessing for a single event. Currently this means
+        duplicating the event with a new event ID and bumped timestamps.
+
+        :pparam string organization_slug: the slug of the organization the
+                                          issues belong to.
+        :pparam string project_slug: the slug of the project the event
+                                     belongs to.
+        :pparam string event_id: the id of the event.
+        :auth: required
+        """
+        if not features.has("projects:reprocessing-v2", project, actor=request.user):
+            return self.respond(status=404)
+
+        start_time = time.time()
+
+        reprocess_event.delay(project_id=project.id, event_id=event_id, start_time=start_time)
+
+        return self.respond(status=200)

+ 39 - 0
src/sentry/api/endpoints/group_reprocessing.py

@@ -0,0 +1,39 @@
+from __future__ import absolute_import
+
+
+from sentry import features
+from sentry.api.base import DocSection
+from sentry.api.bases import GroupEndpoint
+from sentry.models import Group
+from sentry.utils.apidocs import scenario, attach_scenarios
+from sentry.tasks.reprocessing2 import reprocess_group
+
+
+@scenario("ReprocessGroup")
+def reprocess_group_scenario(runner):
+    group = Group.objects.filter(project=runner.default_project).first()
+    runner.request(method="POST", path="/issues/%s/reprocessing/" % group.id)
+
+
+class GroupReprocessingEndpoint(GroupEndpoint):
+    doc_section = DocSection.EVENTS
+
+    @attach_scenarios([reprocess_group_scenario])
+    def post(self, request, group):
+        """
+        Reprocess a group
+        `````````````````
+
+        This endpoint triggers reprocessing for all events in a group.
+        Currently this means duplicating the events with new event IDs and
+        bumped timestamps.
+
+        :pparam string issue_id: the ID of the issue to retrieve.
+        :auth: required
+        """
+
+        if not features.has("projects:reprocessing-v2", group.project, actor=request.user):
+            return self.respond(status=404)
+
+        reprocess_group.delay(project_id=group.project_id, group_id=group.id)
+        return self.respond(status=200)

+ 8 - 0
src/sentry/api/urls.py

@@ -29,6 +29,7 @@ from .endpoints.debug_files import (
 from .endpoints.event_apple_crash_report import EventAppleCrashReportEndpoint
 from .endpoints.event_attachment_details import EventAttachmentDetailsEndpoint
 from .endpoints.event_attachments import EventAttachmentsEndpoint
+from .endpoints.event_reprocessing import EventReprocessingEndpoint
 from .endpoints.event_file_committers import EventFileCommittersEndpoint
 from .endpoints.event_grouping_info import EventGroupingInfoEndpoint
 from .endpoints.event_owners import EventOwnersEndpoint
@@ -41,6 +42,7 @@ from .endpoints.group_events_oldest import GroupEventsOldestEndpoint
 from .endpoints.group_external_issue_details import GroupExternalIssueDetailsEndpoint
 from .endpoints.group_external_issues import GroupExternalIssuesEndpoint
 from .endpoints.group_hashes import GroupHashesEndpoint
+from .endpoints.group_reprocessing import GroupReprocessingEndpoint
 from .endpoints.group_integration_details import GroupIntegrationDetailsEndpoint
 from .endpoints.group_integrations import GroupIntegrationsEndpoint
 from .endpoints.group_notes import GroupNotesEndpoint
@@ -341,6 +343,7 @@ GROUP_URLS = [
         GroupNotesDetailsEndpoint.as_view(),
     ),
     url(r"^(?P<issue_id>[^\/]+)/hashes/$", GroupHashesEndpoint.as_view()),
+    url(r"^(?P<issue_id>[^\/]+)/reprocessing/$", GroupReprocessingEndpoint.as_view()),
     url(r"^(?P<issue_id>[^\/]+)/stats/$", GroupStatsEndpoint.as_view()),
     url(r"^(?P<issue_id>[^\/]+)/tags/$", GroupTagsEndpoint.as_view()),
     url(r"^(?P<issue_id>[^\/]+)/tags/(?P<key>[^/]+)/$", GroupTagKeyDetailsEndpoint.as_view()),
@@ -1271,6 +1274,11 @@ urlpatterns = [
                     EventAttachmentDetailsEndpoint.as_view(),
                     name="sentry-api-0-event-attachment-details",
                 ),
+                url(
+                    r"^(?P<organization_slug>[^\/]+)/(?P<project_slug>[^\/]+)/events/(?P<event_id>[\w-]+)/reprocessing/$",
+                    EventReprocessingEndpoint.as_view(),
+                    name="sentry-api-0-event-reprocessing",
+                ),
                 url(
                     r"^(?P<organization_slug>[^\/]+)/(?P<project_slug>[^\/]+)/events/(?P<event_id>[\w-]+)/committers/$",
                     EventFileCommittersEndpoint.as_view(),

+ 11 - 1
src/sentry/conf/server.py

@@ -122,6 +122,10 @@ RELAY_CONFIG_DIR = os.path.normpath(
     os.path.join(PROJECT_ROOT, os.pardir, os.pardir, "config", "relay")
 )
 
+SYMBOLICATOR_CONFIG_DIR = os.path.normpath(
+    os.path.join(PROJECT_ROOT, os.pardir, os.pardir, "config", "symbolicator")
+)
+
 sys.path.insert(0, os.path.normpath(os.path.join(PROJECT_ROOT, os.pardir)))
 
 DATABASES = {
@@ -754,6 +758,7 @@ LOGGING = {
         "sentry_plugins": {"level": "INFO"},
         "sentry.files": {"level": "WARNING"},
         "sentry.minidumps": {"handlers": ["internal"], "propagate": False},
+        "sentry.reprocessing": {"handlers": ["internal"], "propagate": False},
         "sentry.interfaces": {"handlers": ["internal"], "propagate": False},
         # This only needs to go to Sentry for now.
         "sentry.similarity": {"handlers": ["internal"], "propagate": False},
@@ -917,6 +922,8 @@ SENTRY_FEATURES = {
     "projects:plugins": True,
     # Enable functionality for rate-limiting events on projects.
     "projects:rate-limits": True,
+    # Enable version 2 of reprocessing (completely distinct from v1)
+    "projects:reprocessing-v2": False,
     # Enable functionality for sampling of events on projects.
     "projects:sample-events": False,
     # Enable functionality to trigger service hooks upon event ingestion.
@@ -1547,7 +1554,8 @@ SENTRY_DEVSERVICES = {
         "image": "us.gcr.io/sentryio/symbolicator:latest",
         "pull": True,
         "ports": {"3021/tcp": 3021},
-        "command": ["run"],
+        "volumes": {SYMBOLICATOR_CONFIG_DIR: {"bind": "/etc/symbolicator"}},
+        "command": ["run", "--config", "/etc/symbolicator/config.yml"],
         "only_if": lambda settings, options: options.get("symbolicator.enabled"),
     },
     "relay": {
@@ -1963,3 +1971,5 @@ SENTRY_SIMILARITY_GROUPING_CONFIGURATIONS_TO_INDEX = {
 }
 
 SENTRY_USE_UWSGI = True
+
+SENTRY_REPROCESSING_ATTACHMENT_CHUNK_SIZE = 2 ** 20

+ 3 - 0
src/sentry/deletions/defaults/group.py

@@ -4,6 +4,7 @@ import os
 from sentry import eventstore, nodestore
 from sentry.eventstore.models import Event
 from sentry.models import EventAttachment, UserReport
+from sentry.reprocessing2 import delete_unprocessed_events
 
 from ..base import BaseDeletionTask, BaseRelation, ModelDeletionTask, ModelRelation
 
@@ -52,6 +53,8 @@ class EventDataDeletionTask(BaseDeletionTask):
         node_ids = [Event.generate_node_id(self.project_id, event.event_id) for event in events]
         nodestore.delete_multi(node_ids)
 
+        delete_unprocessed_events(events)
+
         # Remove EventAttachment and UserReport
         event_ids = [event.event_id for event in events]
         EventAttachment.objects.filter(event_id__in=event_ids, project_id=self.project_id).delete()

+ 2 - 0
src/sentry/event_manager.py

@@ -69,6 +69,7 @@ from sentry.utils.safe import safe_execute, trim, get_path, setdefault_path
 from sentry.stacktraces.processing import normalize_stacktraces_for_grouping
 from sentry.culprit import generate_culprit
 from sentry.utils.compat import map
+from sentry.reprocessing2 import save_unprocessed_event
 
 logger = logging.getLogger("sentry.events")
 
@@ -434,6 +435,7 @@ class EventManager(object):
             job["event_metrics"][key] = old_bytes + attachment.size
 
         _nodestore_save_many(jobs)
+        save_unprocessed_event(project, event_id=job["event"].event_id)
 
         if job["release"]:
             if job["is_new"]:

+ 15 - 8
src/sentry/eventstore/processing/base.py

@@ -4,6 +4,10 @@ from sentry.utils.cache import cache_key_for_event
 DEFAULT_TIMEOUT = 60 * 60 * 24
 
 
+def _get_unprocessed_key(key):
+    return key + ":u"
+
+
 class BaseEventProcessingStore(object):
     """
     Store for event blobs during processing
@@ -21,19 +25,22 @@ class BaseEventProcessingStore(object):
         self.inner = inner
         self.timeout = timeout
 
-    def _key_for_event(self, event):
-        return cache_key_for_event(event)
-
-    def store(self, event):
-        key = self._key_for_event(event)
+    def store(self, event, unprocessed=False):
+        key = cache_key_for_event(event)
+        if unprocessed:
+            key = _get_unprocessed_key(key)
         self.inner.set(key, event, self.timeout)
         return key
 
-    def get(self, key):
+    def get(self, key, unprocessed=False):
+        if unprocessed:
+            key = _get_unprocessed_key(key)
         return self.inner.get(key)
 
     def delete_by_key(self, key):
-        return self.inner.delete(key)
+        self.inner.delete(key)
+        self.inner.delete(_get_unprocessed_key(key))
 
     def delete(self, event):
-        return self.inner.delete(self._key_for_event(event))
+        key = cache_key_for_event(event)
+        self.delete_by_key(key)

+ 1 - 0
src/sentry/features/__init__.py

@@ -121,6 +121,7 @@ default_manager.add("projects:similarity-view", ProjectFeature)  # NOQA
 default_manager.add("projects:similarity-indexing", ProjectFeature)  # NOQA
 default_manager.add("projects:similarity-view-v2", ProjectFeature)  # NOQA
 default_manager.add("projects:similarity-indexing-v2", ProjectFeature)  # NOQA
+default_manager.add("projects:reprocessing-v2", ProjectFeature)  # NOQA
 
 # Project plugin features
 default_manager.add("projects:plugins", ProjectPluginFeature)  # NOQA

Some files were not shown because too many files changed in this diff