Browse Source

feat: Split native and non-native processing (#17833)

Anton Ovchinnikov 5 years ago
parent
commit
0abe0284e2

+ 4 - 0
src/sentry/conf/server.py

@@ -556,6 +556,10 @@ CELERY_QUEUES = [
     Queue(
         "events.reprocessing.preprocess_event", routing_key="events.reprocessing.preprocess_event"
     ),
+    Queue("events.symbolicate_event", routing_key="events.symbolicate_event"),
+    Queue(
+        "events.reprocessing.symbolicate_event", routing_key="events.reprocessing.symbolicate_event"
+    ),
     Queue("events.process_event", routing_key="events.process_event"),
     Queue("events.reprocessing.process_event", routing_key="events.reprocessing.process_event"),
     Queue("events.reprocess_events", routing_key="events.reprocess_events"),

+ 17 - 2
src/sentry/lang/native/processing.py

@@ -6,13 +6,14 @@ import six
 
 from sentry.event_manager import validate_and_set_timestamp
 from sentry.lang.native.error import write_error, SymbolicationFailed
-from sentry.lang.native.minidump import MINIDUMP_ATTACHMENT_TYPE
+from sentry.lang.native.minidump import MINIDUMP_ATTACHMENT_TYPE, is_minidump_event
 from sentry.lang.native.symbolicator import Symbolicator
-from sentry.lang.native.unreal import APPLECRASHREPORT_ATTACHMENT_TYPE
+from sentry.lang.native.unreal import APPLECRASHREPORT_ATTACHMENT_TYPE, is_applecrashreport_event
 from sentry.lang.native.utils import (
     get_sdk_from_event,
     native_images_from_data,
     is_native_platform,
+    is_native_event,
     image_name,
     signal_from_data,
     get_event_attachment,
@@ -24,6 +25,7 @@ from sentry.stacktraces.functions import trim_function_name
 from sentry.stacktraces.processing import find_stacktraces_in_data
 from sentry.utils.compat import zip
 
+
 logger = logging.getLogger(__name__)
 
 
@@ -352,3 +354,16 @@ def process_payload(data):
         sinfo.stacktrace["frames"] = new_frames
 
     return data
+
+
+def get_symbolication_function(data):
+    if is_minidump_event(data):
+        return process_minidump
+    elif is_applecrashreport_event(data):
+        return process_applecrashreport
+    elif is_native_event(data):
+        return process_payload
+
+
+def should_process_with_symbolicator(data):
+    return bool(get_symbolication_function(data))

+ 250 - 20
src/sentry/tasks/store.py

@@ -65,8 +65,28 @@ def should_process(data):
     return False
 
 
-def submit_process(project, from_reprocessing, cache_key, event_id, start_time, data):
+def submit_process(
+    project,
+    from_reprocessing,
+    cache_key,
+    event_id,
+    start_time,
+    data,
+    data_has_changed=None,
+    new_process_behavior=None,
+):
     task = process_event_from_reprocessing if from_reprocessing else process_event
+    task.delay(
+        cache_key=cache_key,
+        start_time=start_time,
+        event_id=event_id,
+        data_has_changed=data_has_changed,
+        new_process_behavior=new_process_behavior,
+    )
+
+
+def submit_symbolicate(project, from_reprocessing, cache_key, event_id, start_time, data):
+    task = symbolicate_event_from_reprocessing if from_reprocessing else symbolicate_event
     task.delay(cache_key=cache_key, start_time=start_time, event_id=event_id)
 
 
@@ -84,6 +104,8 @@ def submit_save_event(project, cache_key, event_id, start_time, data):
 
 
 def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project):
+    from sentry.lang.native.processing import should_process_with_symbolicator
+
     if cache_key and data is None:
         data = default_cache.get(cache_key)
 
@@ -104,9 +126,30 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr
     else:
         assert project.id == project_id, (project.id, project_id)
 
+    from_reprocessing = process_task is process_event_from_reprocessing
+
+    new_process_behavior = bool(options.get("sentry:preprocess-use-new-behavior", False))
+    metrics.incr(
+        "tasks.store.preprocess_event.new_process_behavior", tags={"value": new_process_behavior}
+    )
+
+    if new_process_behavior and should_process_with_symbolicator(data):
+        submit_symbolicate(
+            project, from_reprocessing, cache_key, event_id, start_time, original_data
+        )
+        return
+
     if should_process(data):
-        from_reprocessing = process_task is process_event_from_reprocessing
-        submit_process(project, from_reprocessing, cache_key, event_id, start_time, original_data)
+        submit_process(
+            project,
+            from_reprocessing,
+            cache_key,
+            event_id,
+            start_time,
+            original_data,
+            data_has_changed=False,
+            new_process_behavior=new_process_behavior,
+        )
         return
 
     submit_save_event(project, cache_key, event_id, start_time, original_data)
@@ -145,11 +188,159 @@ def preprocess_event_from_reprocessing(
         data=data,
         start_time=start_time,
         event_id=event_id,
-        process_task=process_event,
+        process_task=process_event_from_reprocessing,
         project=project,
     )
 
 
+def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, data=None):
+    from sentry.lang.native.processing import get_symbolication_function
+
+    if data is None:
+        data = default_cache.get(cache_key)
+
+    if data is None:
+        metrics.incr(
+            "events.failed", tags={"reason": "cache", "stage": "symbolicate"}, skip_internal=False
+        )
+        error_logger.error("symbolicate.failed.empty", extra={"cache_key": cache_key})
+        return
+
+    data = CanonicalKeyDict(data)
+
+    project_id = data["project"]
+    event_id = data["event_id"]
+
+    project = Project.objects.get_from_cache(id=project_id)
+
+    symbolication_function = get_symbolication_function(data)
+
+    has_changed = False
+
+    from_reprocessing = symbolicate_task is symbolicate_event_from_reprocessing
+
+    try:
+        symbolicated_data = safe_execute(
+            symbolication_function, data, _passthrough_errors=(RetrySymbolication,)
+        )
+        if symbolicated_data:
+            data = symbolicated_data
+            has_changed = True
+
+    except RetrySymbolication as e:
+        error_logger.warn("retry symbolication")
+
+        if start_time and (time() - start_time) > settings.SYMBOLICATOR_PROCESS_EVENT_WARN_TIMEOUT:
+            error_logger.warning(
+                "process.slow", extra={"project_id": project_id, "event_id": event_id}
+            )
+
+        if start_time and (time() - start_time) > settings.SYMBOLICATOR_PROCESS_EVENT_HARD_TIMEOUT:
+            # Do not drop event but actually continue with rest of pipeline
+            # (persisting unsymbolicated event)
+            error_logger.exception(
+                "process.failed.infinite_retry",
+                extra={"project_id": project_id, "event_id": event_id},
+            )
+        else:
+            # Requeue the task in the "sleep" queue
+            retry_symbolicate_event.apply_async(
+                args=(),
+                kwargs={
+                    "symbolicate_task_name": symbolicate_task.__name__,
+                    "task_kwargs": {
+                        "cache_key": cache_key,
+                        "event_id": event_id,
+                        "start_time": start_time,
+                    },
+                },
+                countdown=e.retry_after,
+            )
+            return
+
+    # We cannot persist canonical types in the cache, so we need to
+    # downgrade this.
+    if isinstance(data, CANONICAL_TYPES):
+        data = dict(data.items())
+
+    if has_changed:
+        default_cache.set(cache_key, data, 3600)
+
+    submit_process(
+        project,
+        from_reprocessing,
+        cache_key,
+        event_id,
+        start_time,
+        data,
+        has_changed,
+        new_process_behavior=True,
+    )
+
+
+@instrumented_task(
+    name="sentry.tasks.store.symbolicate_event",
+    queue="events.symbolicate_event",
+    time_limit=65,
+    soft_time_limit=60,
+)
+def symbolicate_event(cache_key, start_time=None, event_id=None, **kwargs):
+    """
+    Handles event symbolication using the external service: symbolicator.
+
+    :param string cache_key: the cache key for the event data
+    :param int start_time: the timestamp when the event was ingested
+    :param string event_id: the event identifier
+    """
+    return _do_symbolicate_event(
+        cache_key=cache_key,
+        start_time=start_time,
+        event_id=event_id,
+        symbolicate_task=symbolicate_event,
+    )
+
+
+@instrumented_task(
+    name="sentry.tasks.store.symbolicate_event_from_reprocessing",
+    queue="events.reprocessing.symbolicate_event",
+    time_limit=65,
+    soft_time_limit=60,
+)
+def symbolicate_event_from_reprocessing(cache_key, start_time=None, event_id=None, **kwargs):
+    return _do_symbolicate_event(
+        cache_key=cache_key,
+        start_time=start_time,
+        event_id=event_id,
+        symbolicate_task=symbolicate_event_from_reprocessing,
+    )
+
+
+@instrumented_task(
+    name="sentry.tasks.store.retry_symbolicate_event",
+    queue="sleep",
+    time_limit=(60 * 5) + 5,
+    soft_time_limit=60 * 5,
+)
+def retry_symbolicate_event(symbolicate_task_name, task_kwargs, **kwargs):
+    """
+    The only purpose of this task is be enqueued with some ETA set. This is
+    essentially an implementation of ETAs on top of Celery's existing ETAs, but
+    with the intent of having separate workers wait for those ETAs.
+    """
+    tasks = {
+        "symbolicate_event": symbolicate_event,
+        "symbolicate_event_from_reprocessing": symbolicate_event_from_reprocessing,
+    }
+
+    symbolicate_task = tasks.get(symbolicate_task_name)
+    if not symbolicate_task:
+        raise ValueError(
+            "Invalid argument for symbolicate_task_name: %s" % (symbolicate_task_name,)
+        )
+
+    symbolicate_task.delay(**task_kwargs)
+
+
 @instrumented_task(
     name="sentry.tasks.store.retry_process_event",
     queue="sleep",
@@ -174,7 +365,15 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs):
     process_task.delay(**task_kwargs)
 
 
-def _do_process_event(cache_key, start_time, event_id, process_task, data=None):
+def _do_process_event(
+    cache_key,
+    start_time,
+    event_id,
+    process_task,
+    data=None,
+    data_has_changed=None,
+    new_process_behavior=None,
+):
     from sentry.plugins.base import plugins
 
     if data is None:
@@ -197,20 +396,28 @@ def _do_process_event(cache_key, start_time, event_id, process_task, data=None):
     with configure_scope() as scope:
         scope.set_tag("project", project_id)
 
-    has_changed = False
+    has_changed = bool(data_has_changed)
+    new_process_behavior = bool(new_process_behavior)
+
+    metrics.incr(
+        "tasks.store.process_event.new_process_behavior", tags={"value": new_process_behavior}
+    )
 
     # Fetch the reprocessing revision
     reprocessing_rev = reprocessing.get_reprocessing_revision(project_id)
 
     try:
-        # Event enhancers.  These run before anything else.
-        for plugin in plugins.all(version=2):
-            enhancers = safe_execute(plugin.get_event_enhancers, data=data)
-            for enhancer in enhancers or ():
-                enhanced = safe_execute(enhancer, data, _passthrough_errors=(RetrySymbolication,))
-                if enhanced:
-                    data = enhanced
-                    has_changed = True
+        if not new_process_behavior:
+            # Event enhancers.  These run before anything else.
+            for plugin in plugins.all(version=2):
+                enhancers = safe_execute(plugin.get_event_enhancers, data=data)
+                for enhancer in enhancers or ():
+                    enhanced = safe_execute(
+                        enhancer, data, _passthrough_errors=(RetrySymbolication,)
+                    )
+                    if enhanced:
+                        data = enhanced
+                        has_changed = True
 
         # Stacktrace based event processors.
         new_data = process_stacktraces(data)
@@ -325,9 +532,7 @@ def _do_process_event(cache_key, start_time, event_id, process_task, data=None):
             # If `create_failed_event` indicates that we need to retry we
             # invoke outselves again.  This happens when the reprocessing
             # revision changed while we were processing.
-            from_reprocessing = process_task is process_event_from_reprocessing
-            submit_process(project, from_reprocessing, cache_key, event_id, start_time, data)
-            process_task.delay(cache_key, start_time=start_time, event_id=event_id)
+            _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project)
             return
 
         default_cache.set(cache_key, data, 3600)
@@ -341,9 +546,31 @@ def _do_process_event(cache_key, start_time, event_id, process_task, data=None):
     time_limit=65,
     soft_time_limit=60,
 )
-def process_event(cache_key, start_time=None, event_id=None, **kwargs):
+def process_event(
+    cache_key,
+    start_time=None,
+    event_id=None,
+    data_has_changed=None,
+    new_process_behavior=None,
+    **kwargs
+):
+    """
+    Handles event processing (for those events that need it)
+
+    This excludes symbolication via symbolicator service (see symbolicate_event).
+
+    :param string cache_key: the cache key for the event data
+    :param int start_time: the timestamp when the event was ingested
+    :param string event_id: the event identifier
+    :param boolean data_has_changed: set to True if the event data was changed in previous tasks
+    """
     return _do_process_event(
-        cache_key=cache_key, start_time=start_time, event_id=event_id, process_task=process_event
+        cache_key=cache_key,
+        start_time=start_time,
+        event_id=event_id,
+        process_task=process_event,
+        data_has_changed=data_has_changed,
+        new_process_behavior=new_process_behavior,
     )
 
 
@@ -353,12 +580,15 @@ def process_event(cache_key, start_time=None, event_id=None, **kwargs):
     time_limit=65,
     soft_time_limit=60,
 )
-def process_event_from_reprocessing(cache_key, start_time=None, event_id=None, **kwargs):
+def process_event_from_reprocessing(
+    cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs
+):
     return _do_process_event(
         cache_key=cache_key,
         start_time=start_time,
         event_id=event_id,
         process_task=process_event_from_reprocessing,
+        data_has_changed=data_has_changed,
     )
 
 

+ 104 - 3
tests/sentry/tasks/test_store.py

@@ -3,12 +3,13 @@ from __future__ import absolute_import
 import pytest
 
 from sentry.utils.compat import mock
+from sentry import options
 from time import time
 
 from sentry import quotas
 from sentry.event_manager import EventManager, HashDiscarded
 from sentry.plugins.base.v2 import Plugin2
-from sentry.tasks.store import preprocess_event, process_event, save_event
+from sentry.tasks.store import preprocess_event, process_event, save_event, symbolicate_event
 from sentry.testutils.helpers.features import Feature
 
 EVENT_ID = "cc3e6c2bb6b6498097f336d1e6979f4b"
@@ -63,6 +64,18 @@ def mock_process_event():
         yield m
 
 
+@pytest.fixture
+def mock_symbolicate_event():
+    with mock.patch("sentry.tasks.store.symbolicate_event") as m:
+        yield m
+
+
+@pytest.fixture
+def mock_get_symbolication_function():
+    with mock.patch("sentry.lang.native.processing.get_symbolication_function") as m:
+        yield m
+
+
 @pytest.fixture
 def mock_default_cache():
     with mock.patch("sentry.tasks.store.default_cache") as m:
@@ -77,7 +90,7 @@ def mock_refund():
 
 @pytest.mark.django_db
 def test_move_to_process_event(
-    default_project, mock_process_event, mock_save_event, register_plugin
+    default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin
 ):
     register_plugin(BasicPreprocessorPlugin)
     data = {
@@ -90,12 +103,99 @@ def test_move_to_process_event(
 
     preprocess_event(data=data)
 
+    assert mock_symbolicate_event.delay.call_count == 0
+    assert mock_process_event.delay.call_count == 1
+    assert mock_save_event.delay.call_count == 0
+
+
+@pytest.mark.django_db
+def test_move_to_symbolicate_event(
+    default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin
+):
+    register_plugin(BasicPreprocessorPlugin)
+    data = {
+        "project": default_project.id,
+        "platform": "native",
+        "logentry": {"formatted": "test"},
+        "event_id": EVENT_ID,
+        "extra": {"foo": "bar"},
+    }
+
+    options.set("sentry:preprocess-use-new-behavior", True)
+    preprocess_event(data=data)
+
+    assert mock_symbolicate_event.delay.call_count == 1
+    assert mock_process_event.delay.call_count == 0
+    assert mock_save_event.delay.call_count == 0
+
+
+@pytest.mark.django_db
+def test_move_to_symbolicate_event_old(
+    default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin
+):
+    # Temporarily test old behavior
+    register_plugin(BasicPreprocessorPlugin)
+    data = {
+        "project": default_project.id,
+        "platform": "native",
+        "logentry": {"formatted": "test"},
+        "event_id": EVENT_ID,
+        "extra": {"foo": "bar"},
+    }
+
+    options.set("sentry:preprocess-use-new-behavior", False)
+    preprocess_event(data=data)
+
+    assert mock_symbolicate_event.delay.call_count == 0
     assert mock_process_event.delay.call_count == 1
     assert mock_save_event.delay.call_count == 0
 
 
 @pytest.mark.django_db
-def test_move_to_save_event(default_project, mock_process_event, mock_save_event, register_plugin):
+def test_symbolicate_event_call_process(
+    default_project,
+    mock_default_cache,
+    mock_process_event,
+    mock_save_event,
+    mock_get_symbolication_function,
+    register_plugin,
+):
+    register_plugin(BasicPreprocessorPlugin)
+    data = {
+        "project": default_project.id,
+        "platform": "native",
+        "event_id": EVENT_ID,
+        "extra": {"foo": "bar"},
+    }
+    mock_default_cache.get.return_value = data
+
+    symbolicated_data = {"type": "error"}
+
+    mock_get_symbolication_function.return_value = lambda _: symbolicated_data
+
+    symbolicate_event(cache_key="e:1", start_time=1)
+
+    # The event mutated, so make sure we save it back
+    (_, (key, event, duration), _), = mock_default_cache.set.mock_calls
+
+    assert key == "e:1"
+    assert event == symbolicated_data
+    assert duration == 3600
+
+    assert mock_save_event.delay.call_count == 0
+    mock_process_event.delay.assert_called_once_with(
+        cache_key="e:1",
+        start_time=1,
+        event_id=EVENT_ID,
+        data_has_changed=True,
+        new_process_behavior=True,
+    )
+
+
+@pytest.mark.django_db
+def test_move_to_save_event(
+    default_project, mock_process_event, mock_save_event, mock_symbolicate_event, register_plugin
+):
     register_plugin(BasicPreprocessorPlugin)
     data = {
         "project": default_project.id,
@@ -107,6 +207,7 @@ def test_move_to_save_event(default_project, mock_process_event, mock_save_event
 
     preprocess_event(data=data)
 
+    assert mock_symbolicate_event.delay.call_count == 0
     assert mock_process_event.delay.call_count == 0
     assert mock_save_event.delay.call_count == 1