@@ -6,8 +6,9 @@ import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
+from hashlib import md5
from io import BytesIO
-from typing import Optional
+from typing import Optional, Sequence, TypedDict
import sentry_sdk
from django.conf import settings
@@ -83,7 +84,7 @@ from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
from sentry.models.integrations.repository_project_path_config import RepositoryProjectPathConfig
from sentry.plugins.base import plugins
from sentry.projectoptions.defaults import BETA_GROUPING_CONFIG, DEFAULT_GROUPING_CONFIG
-from sentry.ratelimits.sliding_windows import RedisSlidingWindowRateLimiter
+from sentry.ratelimits.sliding_windows import Quota, RedisSlidingWindowRateLimiter, RequestedQuota
from sentry.reprocessing2 import is_reprocessed_event, save_unprocessed_event
from sentry.shared_integrations.exceptions import ApiError
from sentry.signals import first_event_received, first_transaction_received, issue_unresolved
@@ -96,7 +97,10 @@ from sentry.utils.cache import cache_key_for_event
from sentry.utils.canonical import CanonicalKeyDict
from sentry.utils.dates import to_datetime, to_timestamp
from sentry.utils.outcomes import Outcome, track_outcome
-from sentry.utils.performance_issues.performance_detection import detect_performance_problems
+from sentry.utils.performance_issues.performance_detection import (
+ PerformanceProblem,
+ detect_performance_problems,
from sentry.utils.safe import get_path, safe_execute, setdefault_path, trim
logger = logging.getLogger("sentry.events")
@@ -106,6 +110,11 @@ SECURITY_REPORT_INTERFACES = ("csp", "hpkp", "expectct", "expectstaple")
# Timeout for cached group crash report counts
CRASH_REPORT_TIMEOUT = 24 * 3600 # one day
+issue_rate_limiter = RedisSlidingWindowRateLimiter(
+PERFORMANCE_ISSUE_QUOTA = Quota(3600, 60, 60)
class GroupInfo:
@@ -943,7 +952,6 @@ def _tsdb_record_all_metrics(jobs):
incrs = []
frequencies = []
records = []
incrs.append((tsdb.models.project, job["project_id"]))
event = job["event"]
release = job["release"]
@@ -995,8 +1003,9 @@ def _nodestore_save_many(jobs):
# Write the event to Nodestore
subkeys = {}
- if job["groups"]:
- event = job["event"]
+ event = job["event"]
+ # We only care about `unprocessed` for error events
+ if event.get_event_type() != "transaction" and job["groups"]:
unprocessed = event_processing_store.get(
cache_key_for_event({"project": event.project_id, "event_id": event.event_id}),
@@ -1935,6 +1944,123 @@ def _detect_performance_problems(jobs, projects):
job["performance_problems"] = detect_performance_problems(job["data"])
+class Performance_Job(TypedDict, total=False):
+ performance_problems: Sequence[PerformanceProblem]
+def _save_aggregate_performance(jobs: Sequence[Performance_Job], projects):
+ 10 # safety check in case we are passed too many. constant will live somewhere else tbd
+ )
+ for job in jobs:
+ job["groups"] = []
+ event = job["event"]
+ project = event.project
+ # General system-wide option
+ rate = options.get("performance.issues.all.problem-creation") or 0
+ # More granular, per-project option
+ per_project_rate = project.get_option("sentry:performance_issue_creation_rate", 0)
+ if rate > random.random() and per_project_rate > random.random():
+ kwargs = _create_kwargs(job)
+ kwargs["culprit"] = job["culprit"]
+ kwargs["data"] = materialize_metadata(
+ event.data,
+ get_event_type(event.data),
+ dict(job["event_metadata"]),
+ )
+ kwargs["data"]["last_received"] = job["received_timestamp"]
+ performance_problems = job["performance_problems"]
+ for problem in performance_problems:
+ problem.fingerprint = md5(problem.fingerprint.encode("utf-8")).hexdigest()
+ performance_problems_by_fingerprint = {p.fingerprint: p for p in performance_problems}
+ all_group_hashes = [problem.fingerprint for problem in performance_problems]
+ group_hashes = all_group_hashes[:MAX_GROUPS]
+ existing_grouphashes = GroupHash.objects.filter(
+ project=project, hash__in=group_hashes
+ ).select_related("group")
+ new_grouphashes = set(group_hashes) - {hash.hash for hash in existing_grouphashes}
+ new_grouphashes_count = len(new_grouphashes)
+ if new_grouphashes:
+ granted_quota = issue_rate_limiter.check_and_use_quotas(
+ [
+ RequestedQuota(
+ f"performance-issues:{project.id}",
+ new_grouphashes_count,
+ )
+ ]
+ )[0]
+ # Log how many groups didn't get created because of rate limiting
+ _dropped_group_hash_count = new_grouphashes_count - granted_quota.granted
+ metrics.incr("performance.performance_issue.dropped", _dropped_group_hash_count)
+ for new_grouphash in list(new_grouphashes)[: granted_quota.granted]:
+ with sentry_sdk.start_span(
+ op="event_manager.create_group_transaction"
+ ) as span, metrics.timer(
+ "event_manager.create_group_transaction"
+ ) as metric_tags, transaction.atomic():
+ span.set_tag("create_group_transaction.outcome", "no_group")
+ metric_tags["create_group_transaction.outcome"] = "no_group"
+ problem = performance_problems_by_fingerprint[new_grouphash]
+ kwargs["type"] = problem.type.value
+ kwargs["data"]["metadata"]["title"] = f"N+1 Query:{problem.desc}"
+ group = _create_group(project, event, **kwargs)
+ GroupHash.objects.create(project=project, hash=new_grouphash, group=group)
+ is_new = True
+ is_regression = False
+ span.set_tag("create_group_transaction.outcome", "new_group")
+ metric_tags["create_group_transaction.outcome"] = "new_group"
+ metrics.incr(
+ "group.created",
+ skip_internal=True,
+ tags={"platform": job["platform"] or "unknown"},
+ )
+ job["groups"].append(
+ GroupInfo(group=group, is_new=is_new, is_regression=is_regression)
+ )
+ if existing_grouphashes:
+ for existing_grouphash in existing_grouphashes:
+ group = existing_grouphash.group
+ is_new = False
+ description = performance_problems_by_fingerprint[existing_grouphash.hash].desc
+ kwargs["data"]["metadata"]["title"] = f"N+1 Query:{description}"
+ is_regression = _process_existing_aggregate(
+ group=group, event=job["event"], data=kwargs, release=job["release"]
+ )
+ job["groups"].append(
+ GroupInfo(group=group, is_new=is_new, is_regression=is_regression)
+ )
+ job["event"].groups = [group_info.group for group_info in job["groups"]]
def save_transaction_events(jobs, projects):
with metrics.timer("event_manager.save_transactions.collect_organization_ids"):
@@ -1954,17 +2080,6 @@ def save_transaction_events(jobs, projects):
except KeyError:
- with metrics.timer("event_manager.save_transactions.prepare_jobs"):
- for job in jobs:
- job["project_id"] = job["data"]["project"]
- job["raw"] = False
- job["group"] = None
- # XXX: Temporary hack so that `groups` is always present
- job["groups"] = []
- job["is_new"] = False
- job["is_regression"] = False
- job["is_new_group_environment"] = False
_pull_out_data(jobs, projects)
_get_or_create_release_many(jobs, projects)
_get_event_user_many(jobs, projects)
@@ -1973,16 +2088,14 @@ def save_transaction_events(jobs, projects):
_calculate_span_grouping(jobs, projects)
_detect_performance_problems(jobs, projects)
+ _save_aggregate_performance(jobs, projects)
_get_or_create_environment_many(jobs, projects)
+ _get_or_create_group_environment_many(jobs, projects)
_get_or_create_release_associated_models(jobs, projects)
+ _get_or_create_group_release_many(jobs, projects)
return jobs
-issue_rate_limiter = RedisSlidingWindowRateLimiter(