Browse Source

feat(store): Capture sizes of stored event and attachment (#16693)

Jan Michael Auer 5 years ago
parent
commit
72e31a9e77
2 changed files with 153 additions and 123 deletions
  1. 151 3
      src/sentry/event_manager.py
  2. 2 120
      src/sentry/tasks/store.py

+ 151 - 3
src/sentry/event_manager.py

@@ -13,7 +13,8 @@ from django.db import connection, IntegrityError, router, transaction
 from django.db.models import Func
 from django.utils.encoding import force_text
 
-from sentry import buffer, eventstore, eventtypes, eventstream, tsdb
+from sentry import buffer, eventstore, eventtypes, eventstream, features, tsdb
+from sentry.attachments import attachment_cache
 from sentry.constants import (
     DEFAULT_STORE_NORMALIZER_ARGS,
     LOG_LEVELS,
@@ -41,12 +42,15 @@ from sentry.coreapi import (
     safely_load_json_string,
 )
 from sentry.interfaces.base import get_interface
+from sentry.lang.native.utils import STORE_CRASH_REPORTS_ALL, convert_crashreport_count
 from sentry.models import (
     Activity,
     Environment,
+    EventAttachment,
     EventDict,
     EventError,
     EventUser,
+    File,
     Group,
     GroupEnvironment,
     GroupHash,
@@ -61,11 +65,13 @@ from sentry.models import (
     ReleaseProjectEnvironment,
     UserReport,
     Organization,
+    CRASH_REPORT_TYPES,
+    get_crashreport_key,
 )
 from sentry.plugins.base import plugins
 from sentry.signals import event_discarded, event_saved, first_event_received
 from sentry.tasks.integrations import kick_off_status_syncs
-from sentry.utils import metrics
+from sentry.utils import json, metrics
 from sentry.utils.canonical import CanonicalKeyDict
 from sentry.utils.data_filters import (
     is_valid_ip,
@@ -82,6 +88,9 @@ logger = logging.getLogger("sentry.events")
 
 SECURITY_REPORT_INTERFACES = ("csp", "hpkp", "expectct", "expectstaple")
 
+# Timeout for cached group crash report counts
+CRASH_REPORT_TIMEOUT = 24 * 3600  # one day
+
 
 def pop_tag(data, key):
     data["tags"] = [kv for kv in data["tags"] if kv is None or kv[0] != key]
@@ -163,6 +172,34 @@ def has_pending_commit_resolution(group):
     )
 
 
+def get_max_crashreports(model):
+    value = model.get_option("sentry:store_crash_reports")
+    return convert_crashreport_count(value)
+
+
+def crashreports_exceeded(current_count, max_count):
+    if max_count == STORE_CRASH_REPORTS_ALL:
+        return False
+    return current_count >= max_count
+
+
+def get_stored_crashreports(cache_key, event, max_crashreports):
+    # There are two common cases: Storing crash reports is disabled, or is
+    # unbounded. In both cases, there is no need in caching values or querying
+    # the database.
+    if max_crashreports in (0, STORE_CRASH_REPORTS_ALL):
+        return max_crashreports
+
+    cached_reports = cache.get(cache_key, None)
+    if cached_reports >= max_crashreports:
+        return cached_reports
+
+    # Fall-through if max_crashreports was bumped to get a more accurate number.
+    return EventAttachment.objects.filter(
+        group_id=event.group_id, file__type__in=CRASH_REPORT_TYPES
+    ).count()
+
+
 class HashDiscarded(Exception):
     pass
 
@@ -414,7 +451,98 @@ class EventManager(object):
             "location": event_type.get_location(event_metadata),
         }
 
-    def save(self, project_id, raw=False, assume_normalized=False):
+    def get_attachments(self, cache_key, event):
+        """
+        Computes a list of attachments that should be stored.
+
+        This method checks whether event attachments are available and sends them to
+        the blob store. There is special handling for crash reports which may
+        contain unstripped PII. If the project or organization is configured to
+        limit the amount of crash reports per group, the number of stored crashes is
+        limited.
+
+        :param cache_key: The cache key at which the event payload is stored in the
+                        cache. This is used to retrieve attachments.
+        :param event:     The event model instance.
+        """
+        filtered = []
+
+        if cache_key is None:
+            return filtered
+
+        project = event.project
+        if not features.has("organizations:event-attachments", project.organization, actor=None):
+            return filtered
+
+        attachments = list(attachment_cache.get(cache_key))
+        if not attachments:
+            return filtered
+
+        # The setting is both an organization and project setting. The project
+        # setting strictly overrides the organization setting, unless set to the
+        # default.
+        max_crashreports = get_max_crashreports(project)
+        if not max_crashreports:
+            max_crashreports = get_max_crashreports(project.organization)
+
+        # The number of crash reports is cached per group
+        crashreports_key = get_crashreport_key(event.group_id)
+
+        # Only fetch the number of stored crash reports if there is a crash report
+        # in the list of attachments. Otherwise, we won't require this number.
+        if any(attachment.type in CRASH_REPORT_TYPES for attachment in attachments):
+            cached_reports = get_stored_crashreports(crashreports_key, event, max_crashreports)
+        else:
+            cached_reports = 0
+        stored_reports = cached_reports
+
+        for attachment in attachments:
+            # If the attachment is a crash report (e.g. minidump), we need to honor
+            # the store_crash_reports setting. Otherwise, we assume that the client
+            # has already verified PII and just store the attachment.
+            if attachment.type in CRASH_REPORT_TYPES:
+                if crashreports_exceeded(stored_reports, max_crashreports):
+                    continue
+                stored_reports += 1
+
+            filtered.append(attachment)
+
+        # Check if we have exceeded the stored crash reports count. If so, we
+        # persist the current maximum (not the actual number!) into the cache. Next
+        # time when loading from the cache, we will validate that this number has
+        # not changed, or otherwise re-fetch from the database.
+        if (
+            crashreports_exceeded(stored_reports, max_crashreports)
+            and stored_reports > cached_reports
+        ):
+            cache.set(crashreports_key, max_crashreports, CRASH_REPORT_TIMEOUT)
+
+        return filtered
+
+    def save_attachments(self, attachments, event):
+        """
+        Persists cached event attachments into the file store.
+
+        :param attachments: A filtered list of attachments to save.
+        :param event:       The event model instance.
+        """
+        for attachment in attachments:
+            file = File.objects.create(
+                name=attachment.name,
+                type=attachment.type,
+                headers={"Content-Type": attachment.content_type},
+            )
+            file.putfile(six.BytesIO(attachment.data))
+
+            EventAttachment.objects.create(
+                event_id=event.event_id,
+                project_id=event.project_id,
+                group_id=event.group_id,
+                name=attachment.name,
+                file=file,
+            )
+
+    def save(self, project_id, raw=False, assume_normalized=False, cache_key=None):
         """
         We re-insert events with duplicate IDs into Snuba, which is responsible
         for deduplicating events. Since deduplication in Snuba is on the primary
@@ -668,6 +796,22 @@ class EventManager(object):
                 group=group, environment=environment
             )
 
+        # Enusre the _metrics key exists. This is usually created during
+        # and prefilled with ingestion sizes.
+        event_metrics = event.data.get("_metrics") or {}
+        event.data["_metrics"] = event_metrics
+
+        # Capture the actual size that goes into node store.
+        event_metrics["bytes.stored.event"] = len(json.dumps(dict(event.data.items())))
+
+        # Load attachments first, but persist them at the very last after
+        # posting to eventstream to make sure all counters and eventstream are
+        # incremented for sure.
+        attachments = self.get_attachments(cache_key, event)
+        for attachment in attachments:
+            key = "bytes.stored.%s" % (attachment.type,)
+            event_metrics[key] = (event_metrics.get(key) or 0) + len(attachment.data)
+
         # Write the event to Nodestore
         event.data.save()
 
@@ -721,6 +865,10 @@ class EventManager(object):
             skip_consume=raw,
         )
 
+        # Do this last to ensure signals get emitted even if connection to the
+        # file store breaks temporarily.
+        self.save_attachments(attachments, event)
+
         metric_tags = {"from_relay": "_relay_processed" in self._data}
 
         metrics.timing("events.latency", received_timestamp - recorded_timestamp, tags=metric_tags)

+ 2 - 120
src/sentry/tasks/store.py

@@ -2,10 +2,8 @@ from __future__ import absolute_import
 
 import logging
 from datetime import datetime
-import six
 
 from time import time
-from django.core.cache import cache
 from django.utils import timezone
 
 from sentry_relay.processing import StoreNormalizer
@@ -14,7 +12,6 @@ from sentry import features, reprocessing
 from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS
 from sentry.attachments import attachment_cache
 from sentry.cache import default_cache
-from sentry.lang.native.utils import STORE_CRASH_REPORTS_ALL, convert_crashreport_count
 from sentry.tasks.base import instrumented_task
 from sentry.utils import metrics
 from sentry.utils.safe import safe_execute
@@ -23,15 +20,7 @@ from sentry.utils.data_filters import FilterStatKeys
 from sentry.utils.canonical import CanonicalKeyDict, CANONICAL_TYPES
 from sentry.utils.dates import to_datetime
 from sentry.utils.sdk import configure_scope
-from sentry.models import (
-    EventAttachment,
-    File,
-    ProjectOption,
-    Activity,
-    Project,
-    CRASH_REPORT_TYPES,
-    get_crashreport_key,
-)
+from sentry.models import ProjectOption, Activity, Project
 
 error_logger = logging.getLogger("sentry.errors.events")
 info_logger = logging.getLogger("sentry.store")
@@ -39,9 +28,6 @@ info_logger = logging.getLogger("sentry.store")
 # Is reprocessing on or off by default?
 REPROCESSING_DEFAULT = False
 
-# Timeout for cached group crash report counts
-CRASH_REPORT_TIMEOUT = 24 * 3600  # one day
-
 
 class RetryProcessing(Exception):
     pass
@@ -429,105 +415,6 @@ def create_failed_event(
     return True
 
 
-def get_max_crashreports(model):
-    value = model.get_option("sentry:store_crash_reports")
-    return convert_crashreport_count(value)
-
-
-def crashreports_exceeded(current_count, max_count):
-    if max_count == STORE_CRASH_REPORTS_ALL:
-        return False
-    return current_count >= max_count
-
-
-def get_stored_crashreports(cache_key, event, max_crashreports):
-    # There are two common cases: Storing crash reports is disabled, or is
-    # unbounded. In both cases, there is no need in caching values or querying
-    # the database.
-    if max_crashreports in (0, STORE_CRASH_REPORTS_ALL):
-        return max_crashreports
-
-    cached_reports = cache.get(cache_key, None)
-    if cached_reports >= max_crashreports:
-        return cached_reports
-
-    # Fall-through if max_crashreports was bumped to get a more accurate number.
-    return EventAttachment.objects.filter(
-        group_id=event.group_id, file__type__in=CRASH_REPORT_TYPES
-    ).count()
-
-
-def save_attachments(cache_key, event):
-    """
-    Persists cached event attachments into the file store.
-
-    This method checks whether event attachments are available and sends them to
-    the blob store. There is special handling for crash reports which may
-    contain unstripped PII. If the project or organization is configured to
-    limit the amount of crash reports per group, the number of stored crashes is
-    limited.
-
-    :param cache_key: The cache key at which the event payload is stored in the
-                      cache. This is used to retrieve attachments.
-    :param event:     The event model instance.
-    """
-    if not features.has("organizations:event-attachments", event.project.organization, actor=None):
-        return
-
-    attachments = list(attachment_cache.get(cache_key))
-    if not attachments:
-        return
-
-    # The setting is both an organization and project setting. The project
-    # setting strictly overrides the organization setting, unless set to the
-    # default.
-    max_crashreports = get_max_crashreports(event.project)
-    if not max_crashreports:
-        max_crashreports = get_max_crashreports(event.project.organization)
-
-    # The number of crash reports is cached per group
-    crashreports_key = get_crashreport_key(event.group_id)
-
-    # Only fetch the number of stored crash reports if there is a crash report
-    # in the list of attachments. Otherwise, we won't require this number.
-    if any(attachment.type in CRASH_REPORT_TYPES for attachment in attachments):
-        cached_reports = get_stored_crashreports(crashreports_key, event, max_crashreports)
-    else:
-        cached_reports = 0
-    stored_reports = cached_reports
-
-    for attachment in attachments:
-        # If the attachment is a crash report (e.g. minidump), we need to honor
-        # the store_crash_reports setting. Otherwise, we assume that the client
-        # has already verified PII and just store the attachment.
-        if attachment.type in CRASH_REPORT_TYPES:
-            if crashreports_exceeded(stored_reports, max_crashreports):
-                continue
-            stored_reports += 1
-
-        file = File.objects.create(
-            name=attachment.name,
-            type=attachment.type,
-            headers={"Content-Type": attachment.content_type},
-        )
-        file.putfile(six.BytesIO(attachment.data))
-
-        EventAttachment.objects.create(
-            event_id=event.event_id,
-            project_id=event.project_id,
-            group_id=event.group_id,
-            name=attachment.name,
-            file=file,
-        )
-
-    # Check if we have exceeded the stored crash reports count. If so, we
-    # persist the current maximum (not the actual number!) into the cache. Next
-    # time when loading from the cache, we will validate that this number has
-    # not changed, or otherwise re-fetch from the database.
-    if crashreports_exceeded(stored_reports, max_crashreports) and stored_reports > cached_reports:
-        cache.set(crashreports_key, max_crashreports, CRASH_REPORT_TIMEOUT)
-
-
 def _do_save_event(
     cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs
 ):
@@ -588,7 +475,7 @@ def _do_save_event(
     try:
         manager = EventManager(data)
         # event.project.organization is populated after this statement.
-        event = manager.save(project_id, assume_normalized=True)
+        event = manager.save(project_id, assume_normalized=True, cache_key=cache_key)
 
         # This is where we can finally say that we have accepted the event.
         track_outcome(
@@ -628,11 +515,6 @@ def _do_save_event(
             event_id,
         )
 
-    else:
-        if cache_key:
-            # Note that event is now a model, and no longer the data
-            save_attachments(cache_key, event)
-
     finally:
         if cache_key:
             default_cache.delete(cache_key)