Browse Source

feat: Basic killswitches for ingest pipeline (#25847)

Markus Unterwaditzer 3 years ago
parent
commit
c5ee7b89ef

+ 1 - 0
mypy.ini

@@ -22,6 +22,7 @@ files = src/sentry/api/bases/external_actor.py,
         src/sentry/snuba/query_subscription_consumer.py,
         src/sentry/utils/avatar.py,
         src/sentry/utils/codecs.py,
+        src/sentry/killswitches.py,
         src/sentry/utils/kvstore,
         src/sentry/utils/snql.py
 

+ 8 - 1
src/sentry/event_manager.py

@@ -35,6 +35,7 @@ from sentry.grouping.api import (
     load_grouping_config,
 )
 from sentry.ingest.inbound_filters import FilterStatKeys
+from sentry.killswitches import killswitch_matches_context
 from sentry.lang.native.utils import STORE_CRASH_REPORTS_ALL, convert_crashreport_count
 from sentry.models import (
     CRASH_REPORT_TYPES,
@@ -981,7 +982,13 @@ def _save_aggregate(event, flat_hashes, hierarchical_hashes, release, **kwargs):
 
     if existing_group_id is None:
 
-        if project.id in (options.get("store.load-shed-group-creation-projects") or ()):
+        if killswitch_matches_context(
+            "store.load-shed-group-creation-projects",
+            {
+                "project_id": project.id,
+                "platform": event.platform,
+            },
+        ):
             raise HashDiscarded("Load shedding group creation")
 
         with sentry_sdk.start_span(

+ 22 - 2
src/sentry/ingest/ingest_consumer.py

@@ -7,12 +7,13 @@ import sentry_sdk
 from django.conf import settings
 from django.core.cache import cache
 
-from sentry import eventstore, features, options
+from sentry import eventstore, features
 from sentry.attachments import CachedAttachment, attachment_cache
 from sentry.event_manager import save_attachment
 from sentry.eventstore.processing import event_processing_store
 from sentry.ingest.types import ConsumerType
 from sentry.ingest.userreport import Conflict, save_userreport
+from sentry.killswitches import killswitch_matches_context
 from sentry.models import Project
 from sentry.signals import event_accepted
 from sentry.tasks.store import preprocess_event
@@ -135,7 +136,14 @@ def _do_process_event(message, projects):
         )
         return  # message already processed do not reprocess
 
-    if project_id in (options.get("store.load-shed-pipeline-projects") or ()):
+    if killswitch_matches_context(
+        "store.load-shed-pipeline-projects",
+        {
+            "project_id": project_id,
+            "event_id": event_id,
+            "has_attachments": bool(attachments),
+        },
+    ):
         # This killswitch is for the worst of scenarios and should probably not
         # cause additional load on our logging infrastructure
         return
@@ -159,6 +167,18 @@ def _do_process_event(message, projects):
             tags={"event_type": data.get("type") or "null"},
         )
 
+    if killswitch_matches_context(
+        "store.load-shed-parsed-pipeline-projects",
+        {
+            "organization_id": project.organization_id,
+            "project_id": project.id,
+            "event_type": data.get("type") or "null",
+            "has_attachments": bool(attachments),
+            "event_id": event_id,
+        },
+    ):
+        return
+
     cache_key = event_processing_store.store(data)
 
     if attachments:

+ 133 - 0
src/sentry/killswitches.py

@@ -0,0 +1,133 @@
+"""
+Very simple "user partitioning" system used to shed load quickly from ingestion
+pipeline if things go wrong. Allows for conditions based on project ID, event
+type and organization ID.
+
+This is similar to existing featureflagging systems we have, but with less
+features and more performant.
+"""
+
+import copy
+from collections import namedtuple
+from typing import Any, Dict, List, Union
+
+from sentry import options
+from sentry.utils import metrics
+
+Condition = Dict[str, str]
+KillswitchConfig = List[Condition]
+LegacyKillswitchConfig = Union[KillswitchConfig, List[int]]
+Context = Dict[str, Any]
+
+KillswitchInfo = namedtuple("KillswitchInfo", ["description", "fields"])
+
+ALL_KILLSWITCH_OPTIONS = {
+    "store.load-shed-group-creation-projects": KillswitchInfo(
+        description="Drop event in save_event before entering transaction to create group",
+        fields=("project_id", "platform"),
+    ),
+    "store.load-shed-pipeline-projects": KillswitchInfo(
+        description="Drop event in ingest consumer. Available fields are severely restricted because nothing is parsed yet.",
+        fields=("project_id", "event_id", "has_attachments"),
+    ),
+    "store.load-shed-parsed-pipeline-projects": KillswitchInfo(
+        description="Drop events in ingest consumer after parsing them. Available fields are more but a bunch of stuff can go wrong before that.",
+        fields=("organization_id", "project_id", "event_type", "has_attachments", "event_id"),
+    ),
+    "store.load-shed-process-event-projects": KillswitchInfo(
+        description="Drop events in process_event.",
+        fields=("project_id", "event_id", "platform"),
+    ),
+    "store.load-shed-symbolicate-event-projects": KillswitchInfo(
+        description="Drop events in symbolicate_event.",
+        fields=("project_id", "event_id", "platform"),
+    ),
+}
+
+
+def validate_user_input(killswitch_name: str, option_value: Any) -> KillswitchConfig:
+    for condition in option_value:
+        valid_options = set(ALL_KILLSWITCH_OPTIONS[killswitch_name].fields)
+        used_options = set(condition)
+        if valid_options - used_options:
+            raise ValueError(f"Missing fields: {valid_options - used_options}")
+
+        if used_options - valid_options:
+            raise ValueError(f"Unknown fields: {used_options - valid_options}")
+
+    return normalize_value(option_value)
+
+
+def normalize_value(option_value: Any) -> KillswitchConfig:
+    rv = []
+    for condition in option_value:
+        if not condition:
+            continue
+        elif isinstance(condition, int):
+            rv.append({"project_id": str(condition)})
+        elif isinstance(condition, dict):
+            condition = {k: str(v) for k, v in condition.items() if v is not None}
+            if condition:
+                rv.append(condition)
+
+    return rv
+
+
+def killswitch_matches_context(option_key: str, context: Context) -> bool:
+    assert option_key in ALL_KILLSWITCH_OPTIONS
+    assert set(ALL_KILLSWITCH_OPTIONS[option_key].fields) == set(context)
+    option_value = options.get(option_key)
+    rv = _value_matches(option_value, context)
+    metrics.incr(
+        "sentry.killswitches.run",
+        tags={"option_key": option_key, "decision": "matched" if rv else "passed"},
+    )
+
+    return rv
+
+
+def _value_matches(raw_option_value: LegacyKillswitchConfig, context: Context) -> bool:
+    option_value = normalize_value(raw_option_value)
+
+    for condition in option_value:
+        for field, matching_value in condition.items():
+            value = context.get(field)
+            if value is None:
+                break
+
+            if str(value) != matching_value:
+                break
+        else:
+            return True
+
+    return False
+
+
+def print_conditions(raw_option_value: LegacyKillswitchConfig) -> str:
+    option_value = normalize_value(raw_option_value)
+
+    if not option_value:
+        return "<disabled entirely>"
+
+    return "DROP DATA WHERE\n  " + " OR\n  ".join(
+        "("
+        + " AND ".join(f"{field} = {matching_value}" for field, matching_value in condition.items())
+        + ")"
+        for condition in option_value
+    )
+
+
+def add_condition(
+    raw_option_value: LegacyKillswitchConfig, condition: Condition
+) -> KillswitchConfig:
+    option_value = copy.deepcopy(normalize_value(raw_option_value))
+    option_value.append(condition)
+    return normalize_value(option_value)
+
+
+def remove_condition(
+    raw_option_value: LegacyKillswitchConfig, condition: Condition
+) -> KillswitchConfig:
+    option_value = copy.deepcopy(normalize_value(raw_option_value))
+    option_value = [m for m in option_value if m != condition]
+    return normalize_value(option_value)

+ 13 - 4
src/sentry/options/defaults.py

@@ -7,7 +7,7 @@ from sentry.options import (
     FLAG_REQUIRED,
     register,
 )
-from sentry.utils.types import Bool, Dict, Int, Sequence, String
+from sentry.utils.types import Any, Bool, Dict, Int, Sequence, String
 
 # Cache
 # register('cache.backend', flags=FLAG_NOSTORE)
@@ -311,10 +311,10 @@ register("store.race-free-group-creation-force-disable", default=False)
 
 
 # Killswitch for dropping events if they were to create groups
-register("store.load-shed-group-creation-projects", type=Sequence, default=[])
+register("store.load-shed-group-creation-projects", type=Any, default=[])
 
-# Killswitch for dropping events in ingest consumer or really anywhere
-register("store.load-shed-pipeline-projects", type=Sequence, default=[])
+# Killswitch for dropping events in ingest consumer
+register("store.load-shed-pipeline-projects", type=Any, default=[])
 
 # Switch for more performant project counter incr
 register("store.projectcounter-modern-upsert-sample-rate", default=0.0)
@@ -327,3 +327,12 @@ register("store.background-grouping-sample-rate", default=0.0)
 
 # True if background grouping should run before secondary and primary grouping
 register("store.background-grouping-before", default=False)
+
+# Killswitch for dropping events in ingest consumer (after parsing them)
+register("store.load-shed-parsed-pipeline-projects", type=Any, default=[])
+
+# Killswitch for dropping events in process_event
+register("store.load-shed-process-event-projects", type=Any, default=[])
+
+# Killswitch for dropping events in symbolicate_event
+register("store.load-shed-symbolicate-event-projects", type=Any, default=[])

+ 1 - 0
src/sentry/runner/__init__.py

@@ -62,6 +62,7 @@ list(
             "sentry.runner.commands.files.files",
             "sentry.runner.commands.help.help",
             "sentry.runner.commands.init.init",
+            "sentry.runner.commands.killswitches.killswitches",
             "sentry.runner.commands.migrations.migrations",
             "sentry.runner.commands.plugins.plugins",
             "sentry.runner.commands.queues.queues",

+ 98 - 0
src/sentry/runner/commands/killswitches.py

@@ -0,0 +1,98 @@
+import click
+import yaml
+
+from sentry.runner.decorators import configuration
+
+
+@click.group()
+def killswitches():
+    "Manage killswitches for ingestion pipeline."
+
+
+def _safe_modify(killswitch_name, modify_func):
+    from sentry import killswitches, options
+
+    option_value = options.get(killswitch_name)
+    new_option_value = modify_func(option_value)
+
+    if option_value == new_option_value:
+        click.echo("No changes!")
+        raise click.Abort()
+
+    click.echo("Before:")
+    click.echo(killswitches.print_conditions(option_value))
+    click.echo("After:")
+    click.echo(killswitches.print_conditions(new_option_value))
+
+    click.confirm("Should the changes be applied?", default=False, show_default=True, abort=True)
+    options.set(killswitch_name, new_option_value)
+
+
+@killswitches.command()
+@click.argument("killswitch", required=True)
+@configuration
+def edit(killswitch):
+    """
+    Edit killswitch conditions all at once using $EDITOR.
+
+    For a list of killswitches to edit, use `sentry killswitches list`.
+
+    For example:
+
+        sentry killswitches edit store.load-shed-pipeline-projects
+    """
+
+    from sentry import killswitches
+
+    def edit(option_value):
+        edit_text = (
+            "# Example, drops transaction events from project 42 and everything from project 43:\n"
+            "#\n"
+            "# - project_id: 42\n"
+            "#   event_type: transaction\n"
+            "# - project_id: 43\n"
+            "#\n"
+            "# After saving and exiting, your killswitch conditions will be printed in faux-SQL\n"
+            "# for you to confirm. The above conditions' preview would be:\n"
+            "#\n"
+            "# DROP DATA WHERE\n"
+            "#   (project_id = 42 AND event_type = transaction) OR\n"
+            "#   (project_id = 43)\n"
+            "#\n"
+            f"# {killswitch}: {killswitches.ALL_KILLSWITCH_OPTIONS[killswitch].description}\n"
+            f"# Required fields:\n"
+        )
+
+        for field in killswitches.ALL_KILLSWITCH_OPTIONS[killswitch].fields:
+            edit_text += f"#  - {field}\n"
+
+        edit_text += "# Wildcards can be specified such as `project_id: ~`"
+
+        if option_value:
+            edit_text += "\n"
+            edit_text += yaml.dump(option_value)
+
+        edited_text = click.edit(edit_text)
+        if edited_text is None:
+            return option_value
+
+        return killswitches.validate_user_input(killswitch, yaml.safe_load(edited_text))
+
+    _safe_modify(killswitch, edit)
+
+
+@killswitches.command()
+@configuration
+def list():
+    """
+    List all killswitches and whether they are enabled (and how).
+    """
+
+    from sentry import killswitches, options
+
+    for name, info in killswitches.ALL_KILLSWITCH_OPTIONS.items():
+        click.echo()
+        click.echo(f"{name}")
+        click.echo(f"  # {info.description}")
+        conditions = killswitches.print_conditions(options.get(name))
+        click.echo(f"{conditions}")

+ 21 - 0
src/sentry/tasks/store.py

@@ -12,6 +12,7 @@ from sentry.attachments import attachment_cache
 from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS
 from sentry.datascrubbing import scrub_data
 from sentry.eventstore.processing import event_processing_store
+from sentry.killswitches import killswitch_matches_context
 from sentry.models import Activity, Organization, Project, ProjectOption
 from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces
 from sentry.tasks.base import instrumented_task
@@ -208,6 +209,16 @@ def _do_symbolicate_event(cache_key, start_time, event_id, symbolicate_task, dat
 
     event_id = data["event_id"]
 
+    if killswitch_matches_context(
+        "store.load-shed-symbolicate-event-projects",
+        {
+            "project_id": project_id,
+            "event_id": event_id,
+            "platform": data.get("platform") or "null",
+        },
+    ):
+        return
+
     symbolication_function = get_symbolication_function(data)
 
     has_changed = False
@@ -396,6 +407,16 @@ def _do_process_event(
 
     event_id = data["event_id"]
 
+    if killswitch_matches_context(
+        "store.load-shed-process-event-projects",
+        {
+            "project_id": project_id,
+            "event_id": event_id,
+            "platform": data.get("platform") or "null",
+        },
+    ):
+        return
+
     with sentry_sdk.start_span(op="tasks.store.process_event.get_project_from_cache"):
         project = Project.objects.get_from_cache(id=project_id)
 

+ 2 - 2
src/sentry/testutils/cases.py

@@ -681,9 +681,9 @@ class CliTestCase(TestCase):
 
     default_args = []
 
-    def invoke(self, *args):
+    def invoke(self, *args, **kwargs):
         args += tuple(self.default_args)
-        return self.runner.invoke(self.command, args, obj={})
+        return self.runner.invoke(self.command, args, obj={}, **kwargs)
 
 
 @pytest.mark.usefixtures("browser")

+ 54 - 0
tests/sentry/runner/commands/test_killswitches.py

@@ -0,0 +1,54 @@
+from sentry.killswitches import KillswitchInfo
+from sentry.runner.commands.killswitches import killswitches
+from sentry.testutils import CliTestCase
+from sentry.utils.compat import mock
+
+OPTION = "store.load-shed-group-creation-projects"
+
+
+class KillswitchesTest(CliTestCase):
+    command = killswitches
+
+    @mock.patch(
+        "sentry.killswitches.ALL_KILLSWITCH_OPTIONS",
+        {OPTION: KillswitchInfo(description="hey", fields=("project_id", "event_type"))},
+    )
+    @mock.patch("click.edit")
+    def test_basic(self, mock_edit):
+        assert self.invoke("list").output == (
+            "\n" "store.load-shed-group-creation-projects\n" "  # hey\n" "<disabled entirely>\n"
+        )
+
+        mock_edit.return_value = "- project_id: 42\n" "  event_type: transaction\n"
+
+        rv = self.invoke("edit", OPTION, input="y\n")
+        assert rv.exit_code == 0
+        assert self.invoke("list").output == (
+            "\n"
+            "store.load-shed-group-creation-projects\n"
+            "  # hey\n"
+            "DROP DATA WHERE\n"
+            "  (project_id = 42 AND event_type = transaction)\n"
+        )
+
+        mock_edit.return_value = (
+            "- project_id: 42\n"
+            "  event_type: transaction\n"
+            "- project_id: 43\n"
+            "  event_type: ~\n"
+        )
+
+        rv = self.invoke(
+            "edit",
+            OPTION,
+            input="y\n",
+        )
+        assert rv.exit_code == 0
+        assert self.invoke("list").output == (
+            "\n"
+            "store.load-shed-group-creation-projects\n"
+            "  # hey\n"
+            "DROP DATA WHERE\n"
+            "  (project_id = 42 AND event_type = transaction) OR\n"
+            "  (project_id = 43)\n"
+        )

Some files were not shown because too many files changed in this diff