from collections import defaultdict from dataclasses import dataclass from datetime import datetime from operator import itemgetter from typing import Any, Optional, Union from urllib.parse import urlparse from django.contrib.postgres.search import SearchVector from django.db import connection, transaction from django.db.models import ( Exists, F, OuterRef, Q, QuerySet, Value, ) from django.db.models.functions import Coalesce, Greatest from django.db.utils import IntegrityError from ninja import Schema from user_agents import parse from apps.alerts.models import Notification from apps.difs.models import DebugInformationFile from apps.difs.tasks import event_difs_resolve_stacktrace from apps.environments.models import Environment, EnvironmentProject from apps.issue_events.constants import EventStatus, LogLevel from apps.issue_events.models import ( Issue, IssueEvent, IssueEventType, IssueHash, TagKey, TagValue, ) from apps.projects.models import Project from apps.releases.models import Release from sentry.culprit import generate_culprit from sentry.eventtypes.error import ErrorEvent from sentry.utils.strings import truncatechars from ..shared.schema.contexts import ( BrowserContext, ContextsSchema, DeviceContext, OSContext, ) from .javascript_event_processor import JavascriptEventProcessor from .model_functions import PipeConcat from .schema import ErrorIssueEventSchema, IngestIssueEvent, InterchangeIssueEvent from .utils import generate_hash, transform_parameterized_message @dataclass class ProcessingEvent: event: InterchangeIssueEvent issue_hash: str title: str transaction: str metadata: dict[str, Any] event_data: dict[str, Any] event_tags: dict[str, str] level: Optional[LogLevel] = None issue_id: Optional[int] = None issue_created = False release_id: Optional[int] = None @dataclass class IssueUpdate: last_seen: datetime search_vector: str added_count: int = 1 def get_search_vector(event: ProcessingEvent) -> str: return f"{event.title} {event.transaction}" Replacable = Union[str, dict, list] def replace(data: Replacable, match: str, repl: str) -> Replacable: """A recursive replace function""" if isinstance(data, dict): return {k: replace(v, match, repl) for k, v in data.items()} elif isinstance(data, list): return [replace(i, match, repl) for i in data] elif isinstance(data, str): return data.replace(match, repl) return data def sanitize_bad_postgres_chars(data: str): """ Remove values which are not supported by the postgres string data types """ known_bads = ["\x00"] for known_bad in known_bads: data = data.replace(known_bad, " ") return data def sanitize_bad_postgres_json(data: Replacable) -> Replacable: """ Remove values which are not supported by the postgres JSONB data type """ known_bads = ["\u0000"] for known_bad in known_bads: data = replace(data, known_bad, " ") return data def update_issues(processing_events: list[ProcessingEvent]): """ Update any existing issues based on new statistics """ issues_to_update: dict[int, IssueUpdate] = {} for processing_event in processing_events: if processing_event.issue_created: break issue_id = processing_event.issue_id if issue_id in issues_to_update: issues_to_update[issue_id].added_count += 1 issues_to_update[ issue_id ].search_vector += f" {get_search_vector(processing_event)}" if issues_to_update[issue_id].last_seen < processing_event.event.received: issues_to_update[issue_id].last_seen = processing_event.event.received elif issue_id: issues_to_update[issue_id] = IssueUpdate( last_seen=processing_event.event.received, search_vector=get_search_vector(processing_event), ) for issue_id, value in issues_to_update.items(): Issue.objects.filter(id=issue_id).update( count=F("count") + value.added_count, search_vector=PipeConcat( F("search_vector"), SearchVector(Value(value.search_vector)) ), last_seen=Greatest(F("last_seen"), value.last_seen), ) def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]: """ Convert Schema like {"values": []} into list or dict without unnecessary 'values' """ if isinstance(obj, Schema) and hasattr(obj, "values"): return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"] elif isinstance(obj, list): return [ x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj ] return None def generate_contexts(event: IngestIssueEvent) -> ContextsSchema: """ Add additional contexts if they aren't already set """ contexts = event.contexts if event.contexts else ContextsSchema(root={}) if request := event.request: if isinstance(request.headers, list): if ua_string := next( (x[1] for x in request.headers if x[0] == "User-Agent"), None ): user_agent = parse(ua_string) if "browser" not in contexts.root: contexts.root["browser"] = BrowserContext( name=user_agent.browser.family, version=user_agent.browser.version_string, ) if "os" not in contexts.root: contexts.root["os"] = OSContext( name=user_agent.os.family, version=user_agent.os.version_string ) if "device" not in contexts.root: device = user_agent.device contexts.root["device"] = DeviceContext( family=device.family, model=device.model, brand=device.brand, ) return contexts def generate_tags(event: IngestIssueEvent) -> dict[str, str]: """Generate key-value tags based on context and other event data""" tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {} if contexts := event.contexts: if browser := contexts.root.get("browser"): if isinstance(browser, BrowserContext): tags["browser.name"] = browser.name tags["browser"] = f"{browser.name} {browser.version}" if os := contexts.root.get("os"): if isinstance(os, OSContext): tags["os.name"] = os.name if device := contexts.root.get("device"): if isinstance(device, DeviceContext) and device.model: tags["device"] = device.model if user := event.user: if user.id: tags["user.id"] = user.id if user.email: tags["user.email"] = user.email if user.username: tags["user.username"] = user.username if environment := event.environment: tags["environment"] = environment if release := event.release: tags["release"] = release if server_name := event.server_name: tags["server_name"] = server_name # Exclude None values return {key: value for key, value in tags.items() if value} def check_set_issue_id( processing_events: list[ProcessingEvent], project_id: int, issue_hash: str, issue_id: int, ): """ It's common to receive two duplicate events at the same time, where the issue has never been seen before. This is an optimization that checks if there is a known project/hash. If so, we can infer the issue_id. """ for event in processing_events: if ( event.issue_id is None and event.event.project_id == project_id and event.issue_hash == issue_hash ): event.issue_id = issue_id def create_environments( environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet ): """ Create newly seen environments. Functions determines which, if any, environments are present in event data but not the database. Optimized to do a much work in python and reduce queries. """ environments_to_create = [ Environment(name=name, organization_id=organization_id) for name, project_id, organization_id in environment_set if not next( ( x for x in projects_with_data if x["environment_name"] == name and x["id"] == project_id ), None, ) ] if environments_to_create: Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True) query = Q() for environment in environments_to_create: query |= Q( name=environment.name, organization_id=environment.organization_id ) environments = Environment.objects.filter(query) environment_projects: list = [] for environment in environments: project_id = next( project_id for (name, project_id, organization_id) in environment_set if environment.name == name and environment.organization_id == organization_id ) environment_projects.append( EnvironmentProject(project_id=project_id, environment=environment) ) EnvironmentProject.objects.bulk_create( environment_projects, ignore_conflicts=True ) def get_and_create_releases( release_set: set[tuple[str, int, int]], projects_with_data: QuerySet ) -> list[tuple[str, int, int]]: """ Create newly seen releases. functions determines which, if any, releases are present in event data but not the database. Optimized to do a much work in python and reduce queries. Return list of tuples: Release version, project_id, release_id """ releases_to_create = [ Release(version=release_name, organization_id=organization_id) for release_name, project_id, organization_id in release_set if not next( ( x for x in projects_with_data if x["release_name"] == release_name and x["id"] == project_id ), None, ) ] releases: Union[list, QuerySet] = [] if releases_to_create: # Create database records for any release that doesn't exist Release.objects.bulk_create(releases_to_create, ignore_conflicts=True) query = Q() for release in releases_to_create: query |= Q(version=release.version, organization_id=release.organization_id) releases = Release.objects.filter(query) ReleaseProject = Release.projects.through release_projects = [ ReleaseProject( release=release, project_id=next( project_id for (version, project_id, organization_id) in release_set if release.version == version and release.organization_id == organization_id ), ) for release in releases ] ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True) return [ ( version, project_id, next( ( project["release_id"] for project in projects_with_data if project["release_name"] == version and project["id"] == project_id ), next( ( release.id for release in releases if release.version == version and release.organization_id == organization_id ), 0, ), ), ) for version, project_id, organization_id in release_set ] def process_issue_events(ingest_events: list[InterchangeIssueEvent]): """ Accepts a list of events to ingest. Events should be: - Few enough to save in a single DB call - Permission is already checked, these events are to write to the DB - Some invalid events are tolerated (ignored), including duplicate event id When there is an error in this function, care should be taken as to when to log, error, or ignore. If the SDK sends "weird" data, we want to log that. It's better to save a minimal event than to ignore it. """ # Fetch any needed releases, environments, and whether there is a dif file association # Get unique release/environment for each project_id release_set = { (event.payload.release, event.project_id, event.organization_id) for event in ingest_events if event.payload.release } environment_set = { (event.payload.environment[:255], event.project_id, event.organization_id) for event in ingest_events if event.payload.environment } project_set = {project_id for _, project_id, _ in release_set}.union( {project_id for _, project_id, _ in environment_set} ) release_version_set = {version for version, _, _ in release_set} environment_name_set = {name for name, _, _ in environment_set} projects_with_data = ( Project.objects.filter(id__in=project_set) .annotate( has_difs=Exists( DebugInformationFile.objects.filter(project_id=OuterRef("pk")) ), release_id=Coalesce("releases__id", Value(None)), release_name=Coalesce("releases__version", Value(None)), environment_id=Coalesce("environment__id", Value(None)), environment_name=Coalesce("environment__name", Value(None)), ) .filter(release_name__in=release_version_set.union({None})) .filter(environment_name__in=environment_name_set.union({None})) .values( "id", "has_difs", "release_id", "release_name", "environment_id", "environment_name", ) ) releases = get_and_create_releases(release_set, projects_with_data) create_environments(environment_set, projects_with_data) # Collected/calculated event data while processing processing_events: list[ProcessingEvent] = [] # Collect Q objects for bulk issue hash lookup q_objects = Q() for ingest_event in ingest_events: event_data: dict[str, Any] = {} event = ingest_event.payload event.contexts = generate_contexts(event) event_tags = generate_tags(event) title = "" culprit = "" metadata: dict[str, Any] = {} release_id = next( ( release_id for version, project_id, release_id in releases if version == event_tags.get("release") and ingest_event.project_id == project_id ), None, ) if event.platform in ("javascript", "node") and release_id: JavascriptEventProcessor(release_id, event).transform() elif ( isinstance(event, ErrorIssueEventSchema) and event.exception and next( ( project["has_difs"] for project in projects_with_data if project["id"] == ingest_event.project_id ), False, ) ): event_difs_resolve_stacktrace(event, ingest_event.project_id) if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]: sentry_event = ErrorEvent() metadata = sentry_event.get_metadata(event.dict()) if event.type == IssueEventType.ERROR and metadata: full_title = sentry_event.get_title(metadata) else: message = event.message if event.message else event.logentry full_title = ( transform_parameterized_message(message) if message else "" ) culprit = ( event.transaction if event.transaction else generate_culprit(event.dict()) ) title = truncatechars(full_title) culprit = sentry_event.get_location(event.dict()) elif event.type == IssueEventType.CSP: humanized_directive = event.csp.effective_directive.replace("-src", "") uri = urlparse(event.csp.blocked_uri).netloc full_title = title = f"Blocked '{humanized_directive}' from '{uri}'" culprit = event.csp.effective_directive event_data["csp"] = event.csp.dict() issue_hash = generate_hash(title, culprit, event.type, event.fingerprint) if metadata: event_data["metadata"] = metadata if platform := event.platform: event_data["platform"] = platform if modules := event.modules: event_data["modules"] = modules if sdk := event.sdk: event_data["sdk"] = sdk.dict(exclude_none=True) if request := event.request: event_data["request"] = request.dict(exclude_none=True) if environment := event.environment: event_data["environment"] = environment # Message is str # Logentry is {"params": etc} Message format if logentry := event.logentry: event_data["logentry"] = logentry.dict(exclude_none=True) elif message := event.message: if isinstance(message, str): event_data["logentry"] = {"formatted": message} else: event_data["logentry"] = message.dict(exclude_none=True) if message := event.message: event_data["message"] = ( message if isinstance(message, str) else message.formatted ) # When blank, the API will default to the title anyway elif title != full_title: # If the title is truncated, store the full title event_data["message"] = full_title if breadcrumbs := event.breadcrumbs: event_data["breadcrumbs"] = devalue(breadcrumbs) if exception := event.exception: event_data["exception"] = devalue(exception) if extra := event.extra: event_data["extra"] = extra if user := event.user: event_data["user"] = user.dict(exclude_none=True) if contexts := event.contexts: event_data["contexts"] = contexts.dict(exclude_none=True) processing_events.append( ProcessingEvent( event=ingest_event, issue_hash=issue_hash, title=title, level=LogLevel.from_string(event.level) if event.level else None, transaction=culprit, metadata=metadata, event_data=event_data, event_tags=event_tags, release_id=release_id, ) ) q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash) hash_queryset = IssueHash.objects.filter(q_objects).values( "value", "project_id", "issue_id", "issue__status" ) issue_events: list[IssueEvent] = [] issues_to_reopen = [] for processing_event in processing_events: event_type = processing_event.event.payload.type project_id = processing_event.event.project_id issue_defaults = { "type": event_type, "title": sanitize_bad_postgres_chars(processing_event.title), "metadata": sanitize_bad_postgres_json(processing_event.metadata), "first_seen": processing_event.event.received, "last_seen": processing_event.event.received, } if level := processing_event.level: issue_defaults["level"] = level for hash_obj in hash_queryset: if ( hash_obj["value"].hex == processing_event.issue_hash and hash_obj["project_id"] == project_id ): processing_event.issue_id = hash_obj["issue_id"] if hash_obj["issue__status"] == EventStatus.RESOLVED: issues_to_reopen.append(hash_obj["issue_id"]) break if not processing_event.issue_id: try: with transaction.atomic(): issue = Issue.objects.create( project_id=project_id, search_vector=SearchVector(Value(issue_defaults["title"])), **issue_defaults, ) new_issue_hash = IssueHash.objects.create( issue=issue, value=processing_event.issue_hash, project_id=project_id, ) check_set_issue_id( processing_events, issue.project_id, new_issue_hash.value, issue.id, ) processing_event.issue_id = issue.id processing_event.issue_created = True except IntegrityError: processing_event.issue_id = IssueHash.objects.get( project_id=project_id, value=processing_event.issue_hash ).issue_id issue_events.append( IssueEvent( id=processing_event.event.event_id, issue_id=processing_event.issue_id, type=event_type, level=processing_event.level if processing_event.level else LogLevel.ERROR, timestamp=processing_event.event.payload.timestamp, received=processing_event.event.received, title=processing_event.title, transaction=processing_event.transaction, data=sanitize_bad_postgres_json(processing_event.event_data), tags=processing_event.event_tags, release_id=processing_event.release_id, ) ) update_issues(processing_events) if issues_to_reopen: Issue.objects.filter(id__in=issues_to_reopen).update( status=EventStatus.UNRESOLVED ) Notification.objects.filter(issues__in=issues_to_reopen).delete() # ignore_conflicts because we could have an invalid duplicate event_id, received IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True) # Group events by time and project for event count statistics data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict( lambda: defaultdict(int) ) for processing_event in processing_events: hour_received = processing_event.event.received.replace( minute=0, second=0, microsecond=0 ) data_stats[hour_received][processing_event.event.project_id] += 1 update_tags(processing_events) update_statistics(data_stats) def update_statistics( project_event_stats: defaultdict[datetime, defaultdict[int, int]], ): # Flatten data for a sql param friendly format and sort to mitigate deadlocks data = sorted( [ [year, key, value] for year, inner_dict in project_event_stats.items() for key, value in inner_dict.items() ], key=itemgetter(0, 1), ) # Django ORM cannot support F functions in a bulk_update # psycopg3 does not support execute_values # https://github.com/psycopg/psycopg/issues/114 with connection.cursor() as cursor: args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data) sql = ( "INSERT INTO projects_issueeventprojecthourlystatistic (date, project_id, count)\n" f"VALUES {args_str}\n" "ON CONFLICT (project_id, date)\n" "DO UPDATE SET count = projects_issueeventprojecthourlystatistic.count + EXCLUDED.count;" ) cursor.execute(sql) TagStats = defaultdict[ datetime, defaultdict[int, defaultdict[int, defaultdict[int, int]]], ] def update_tags(processing_events: list[ProcessingEvent]): keys = sorted({key for d in processing_events for key in d.event_tags.keys()}) values = sorted( {value for d in processing_events for value in d.event_tags.values()} ) TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True) TagValue.objects.bulk_create( [TagValue(value=value) for value in values], ignore_conflicts=True ) # Postgres cannot return ids with ignore_conflicts tag_keys = { tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values() } tag_values = { tag["value"]: tag["id"] for tag in TagValue.objects.filter(value__in=values).values() } tag_stats: TagStats = defaultdict( lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))) ) for processing_event in processing_events: if processing_event.issue_id is None: continue # Group by day. More granular allows for a better search # Less granular yields better tag filter performance minute_received = processing_event.event.received.replace( hour=0, minute=0, second=0, microsecond=0 ) for key, value in processing_event.event_tags.items(): key_id = tag_keys[key] value_id = tag_values[value] tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1 if not tag_stats: return # Sort to mitigate deadlocks data = sorted( [ [date, issue_id, key_id, value_id, count] for date, d1 in tag_stats.items() for issue_id, d2 in d1.items() for key_id, d3 in d2.items() for value_id, count in d3.items() ], key=itemgetter(0, 1, 2, 3), ) with connection.cursor() as cursor: args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data) sql = ( "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n" f"VALUES {args_str}\n" "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n" "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;" ) cursor.execute(sql)