123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722 |
- from collections import defaultdict
- from dataclasses import dataclass
- from datetime import datetime
- from operator import itemgetter
- from typing import Any, Optional, Union
- from urllib.parse import urlparse
- from django.contrib.postgres.search import SearchVector
- from django.db import connection, transaction
- from django.db.models import (
- Exists,
- F,
- OuterRef,
- Q,
- QuerySet,
- Value,
- )
- from django.db.models.functions import Coalesce, Greatest
- from django.db.utils import IntegrityError
- from ninja import Schema
- from user_agents import parse
- from apps.alerts.models import Notification
- from apps.difs.models import DebugInformationFile
- from apps.difs.tasks import event_difs_resolve_stacktrace
- from apps.environments.models import Environment, EnvironmentProject
- from apps.issue_events.constants import EventStatus, LogLevel
- from apps.issue_events.models import (
- Issue,
- IssueEvent,
- IssueEventType,
- IssueHash,
- TagKey,
- TagValue,
- )
- from apps.projects.models import Project
- from apps.releases.models import Release
- from sentry.culprit import generate_culprit
- from sentry.eventtypes.error import ErrorEvent
- from sentry.utils.strings import truncatechars
- from ..shared.schema.contexts import (
- BrowserContext,
- ContextsSchema,
- DeviceContext,
- OSContext,
- )
- from .javascript_event_processor import JavascriptEventProcessor
- from .model_functions import PipeConcat
- from .schema import ErrorIssueEventSchema, IngestIssueEvent, InterchangeIssueEvent
- from .utils import generate_hash, transform_parameterized_message
- @dataclass
- class ProcessingEvent:
- event: InterchangeIssueEvent
- issue_hash: str
- title: str
- transaction: str
- metadata: dict[str, Any]
- event_data: dict[str, Any]
- event_tags: dict[str, str]
- level: Optional[LogLevel] = None
- issue_id: Optional[int] = None
- issue_created = False
- release_id: Optional[int] = None
- @dataclass
- class IssueUpdate:
- last_seen: datetime
- search_vector: str
- added_count: int = 1
- def get_search_vector(event: ProcessingEvent) -> str:
- return f"{event.title} {event.transaction}"
- Replacable = Union[str, dict, list]
- def replace(data: Replacable, match: str, repl: str) -> Replacable:
- """A recursive replace function"""
- if isinstance(data, dict):
- return {k: replace(v, match, repl) for k, v in data.items()}
- elif isinstance(data, list):
- return [replace(i, match, repl) for i in data]
- elif isinstance(data, str):
- return data.replace(match, repl)
- return data
- def sanitize_bad_postgres_chars(data: str):
- """
- Remove values which are not supported by the postgres string data types
- """
- known_bads = ["\x00"]
- for known_bad in known_bads:
- data = data.replace(known_bad, " ")
- return data
- def sanitize_bad_postgres_json(data: Replacable) -> Replacable:
- """
- Remove values which are not supported by the postgres JSONB data type
- """
- known_bads = ["\u0000"]
- for known_bad in known_bads:
- data = replace(data, known_bad, " ")
- return data
- def update_issues(processing_events: list[ProcessingEvent]):
- """
- Update any existing issues based on new statistics
- """
- issues_to_update: dict[int, IssueUpdate] = {}
- for processing_event in processing_events:
- if processing_event.issue_created:
- break
- issue_id = processing_event.issue_id
- if issue_id in issues_to_update:
- issues_to_update[issue_id].added_count += 1
- issues_to_update[
- issue_id
- ].search_vector += f" {get_search_vector(processing_event)}"
- if issues_to_update[issue_id].last_seen < processing_event.event.received:
- issues_to_update[issue_id].last_seen = processing_event.event.received
- elif issue_id:
- issues_to_update[issue_id] = IssueUpdate(
- last_seen=processing_event.event.received,
- search_vector=get_search_vector(processing_event),
- )
- for issue_id, value in issues_to_update.items():
- Issue.objects.filter(id=issue_id).update(
- count=F("count") + value.added_count,
- search_vector=PipeConcat(
- F("search_vector"), SearchVector(Value(value.search_vector))
- ),
- last_seen=Greatest(F("last_seen"), value.last_seen),
- )
- def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
- """
- Convert Schema like {"values": []} into list or dict without unnecessary 'values'
- """
- if isinstance(obj, Schema) and hasattr(obj, "values"):
- return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
- elif isinstance(obj, list):
- return [
- x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
- ]
- return None
- def generate_contexts(event: IngestIssueEvent) -> ContextsSchema:
- """
- Add additional contexts if they aren't already set
- """
- contexts = event.contexts if event.contexts else ContextsSchema(root={})
- if request := event.request:
- if isinstance(request.headers, list):
- if ua_string := next(
- (x[1] for x in request.headers if x[0] == "User-Agent"), None
- ):
- user_agent = parse(ua_string)
- if "browser" not in contexts.root:
- contexts.root["browser"] = BrowserContext(
- name=user_agent.browser.family,
- version=user_agent.browser.version_string,
- )
- if "os" not in contexts.root:
- contexts.root["os"] = OSContext(
- name=user_agent.os.family, version=user_agent.os.version_string
- )
- if "device" not in contexts.root:
- device = user_agent.device
- contexts.root["device"] = DeviceContext(
- family=device.family,
- model=device.model,
- brand=device.brand,
- )
- return contexts
- def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
- """Generate key-value tags based on context and other event data"""
- tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
- if contexts := event.contexts:
- if browser := contexts.root.get("browser"):
- if isinstance(browser, BrowserContext):
- tags["browser.name"] = browser.name
- tags["browser"] = f"{browser.name} {browser.version}"
- if os := contexts.root.get("os"):
- if isinstance(os, OSContext):
- tags["os.name"] = os.name
- if device := contexts.root.get("device"):
- if isinstance(device, DeviceContext) and device.model:
- tags["device"] = device.model
- if user := event.user:
- if user.id:
- tags["user.id"] = user.id
- if user.email:
- tags["user.email"] = user.email
- if user.username:
- tags["user.username"] = user.username
- if environment := event.environment:
- tags["environment"] = environment
- if release := event.release:
- tags["release"] = release
- if server_name := event.server_name:
- tags["server_name"] = server_name
- # Exclude None values
- return {key: value for key, value in tags.items() if value}
- def check_set_issue_id(
- processing_events: list[ProcessingEvent],
- project_id: int,
- issue_hash: str,
- issue_id: int,
- ):
- """
- It's common to receive two duplicate events at the same time,
- where the issue has never been seen before. This is an optimization
- that checks if there is a known project/hash. If so, we can infer the
- issue_id.
- """
- for event in processing_events:
- if (
- event.issue_id is None
- and event.event.project_id == project_id
- and event.issue_hash == issue_hash
- ):
- event.issue_id = issue_id
- def create_environments(
- environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet
- ):
- """
- Create newly seen environments.
- Functions determines which, if any, environments are present in event data
- but not the database. Optimized to do a much work in python and reduce queries.
- """
- environments_to_create = [
- Environment(name=name, organization_id=organization_id)
- for name, project_id, organization_id in environment_set
- if not next(
- (
- x
- for x in projects_with_data
- if x["environment_name"] == name and x["id"] == project_id
- ),
- None,
- )
- ]
- if environments_to_create:
- Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True)
- query = Q()
- for environment in environments_to_create:
- query |= Q(
- name=environment.name, organization_id=environment.organization_id
- )
- environments = Environment.objects.filter(query)
- environment_projects: list = []
- for environment in environments:
- project_id = next(
- project_id
- for (name, project_id, organization_id) in environment_set
- if environment.name == name
- and environment.organization_id == organization_id
- )
- environment_projects.append(
- EnvironmentProject(project_id=project_id, environment=environment)
- )
- EnvironmentProject.objects.bulk_create(
- environment_projects, ignore_conflicts=True
- )
- def get_and_create_releases(
- release_set: set[tuple[str, int, int]], projects_with_data: QuerySet
- ) -> list[tuple[str, int, int]]:
- """
- Create newly seen releases.
- functions determines which, if any, releases are present in event data
- but not the database. Optimized to do a much work in python and reduce queries.
- Return list of tuples: Release version, project_id, release_id
- """
- releases_to_create = [
- Release(version=release_name, organization_id=organization_id)
- for release_name, project_id, organization_id in release_set
- if not next(
- (
- x
- for x in projects_with_data
- if x["release_name"] == release_name and x["id"] == project_id
- ),
- None,
- )
- ]
- releases: Union[list, QuerySet] = []
- if releases_to_create:
- # Create database records for any release that doesn't exist
- Release.objects.bulk_create(releases_to_create, ignore_conflicts=True)
- query = Q()
- for release in releases_to_create:
- query |= Q(version=release.version, organization_id=release.organization_id)
- releases = Release.objects.filter(query)
- ReleaseProject = Release.projects.through
- release_projects = [
- ReleaseProject(
- release=release,
- project_id=next(
- project_id
- for (version, project_id, organization_id) in release_set
- if release.version == version
- and release.organization_id == organization_id
- ),
- )
- for release in releases
- ]
- ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True)
- return [
- (
- version,
- project_id,
- next(
- (
- project["release_id"]
- for project in projects_with_data
- if project["release_name"] == version
- and project["id"] == project_id
- ),
- next(
- (
- release.id
- for release in releases
- if release.version == version
- and release.organization_id == organization_id
- ),
- 0,
- ),
- ),
- )
- for version, project_id, organization_id in release_set
- ]
- def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
- """
- Accepts a list of events to ingest. Events should be:
- - Few enough to save in a single DB call
- - Permission is already checked, these events are to write to the DB
- - Some invalid events are tolerated (ignored), including duplicate event id
- When there is an error in this function, care should be taken as to when to log,
- error, or ignore. If the SDK sends "weird" data, we want to log that.
- It's better to save a minimal event than to ignore it.
- """
- # Fetch any needed releases, environments, and whether there is a dif file association
- # Get unique release/environment for each project_id
- release_set = {
- (event.payload.release, event.project_id, event.organization_id)
- for event in ingest_events
- if event.payload.release
- }
- environment_set = {
- (event.payload.environment[:255], event.project_id, event.organization_id)
- for event in ingest_events
- if event.payload.environment
- }
- project_set = {project_id for _, project_id, _ in release_set}.union(
- {project_id for _, project_id, _ in environment_set}
- )
- release_version_set = {version for version, _, _ in release_set}
- environment_name_set = {name for name, _, _ in environment_set}
- projects_with_data = (
- Project.objects.filter(id__in=project_set)
- .annotate(
- has_difs=Exists(
- DebugInformationFile.objects.filter(project_id=OuterRef("pk"))
- ),
- release_id=Coalesce("releases__id", Value(None)),
- release_name=Coalesce("releases__version", Value(None)),
- environment_id=Coalesce("environment__id", Value(None)),
- environment_name=Coalesce("environment__name", Value(None)),
- )
- .filter(release_name__in=release_version_set.union({None}))
- .filter(environment_name__in=environment_name_set.union({None}))
- .values(
- "id",
- "has_difs",
- "release_id",
- "release_name",
- "environment_id",
- "environment_name",
- )
- )
- releases = get_and_create_releases(release_set, projects_with_data)
- create_environments(environment_set, projects_with_data)
- # Collected/calculated event data while processing
- processing_events: list[ProcessingEvent] = []
- # Collect Q objects for bulk issue hash lookup
- q_objects = Q()
- for ingest_event in ingest_events:
- event_data: dict[str, Any] = {}
- event = ingest_event.payload
- event.contexts = generate_contexts(event)
- event_tags = generate_tags(event)
- title = ""
- culprit = ""
- metadata: dict[str, Any] = {}
- release_id = next(
- (
- release_id
- for version, project_id, release_id in releases
- if version == event_tags.get("release")
- and ingest_event.project_id == project_id
- ),
- None,
- )
- if event.platform in ("javascript", "node") and release_id:
- JavascriptEventProcessor(release_id, event).transform()
- elif (
- isinstance(event, ErrorIssueEventSchema)
- and event.exception
- and next(
- (
- project["has_difs"]
- for project in projects_with_data
- if project["id"] == ingest_event.project_id
- ),
- False,
- )
- ):
- event_difs_resolve_stacktrace(event, ingest_event.project_id)
- if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
- sentry_event = ErrorEvent()
- metadata = sentry_event.get_metadata(event.dict())
- if event.type == IssueEventType.ERROR and metadata:
- full_title = sentry_event.get_title(metadata)
- else:
- message = event.message if event.message else event.logentry
- full_title = (
- transform_parameterized_message(message)
- if message
- else "<untitled>"
- )
- culprit = (
- event.transaction
- if event.transaction
- else generate_culprit(event.dict())
- )
- title = truncatechars(full_title)
- culprit = sentry_event.get_location(event.dict())
- elif event.type == IssueEventType.CSP:
- humanized_directive = event.csp.effective_directive.replace("-src", "")
- uri = urlparse(event.csp.blocked_uri).netloc
- full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
- culprit = event.csp.effective_directive
- event_data["csp"] = event.csp.dict()
- issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
- if metadata:
- event_data["metadata"] = metadata
- if platform := event.platform:
- event_data["platform"] = platform
- if modules := event.modules:
- event_data["modules"] = modules
- if sdk := event.sdk:
- event_data["sdk"] = sdk.dict(exclude_none=True)
- if request := event.request:
- event_data["request"] = request.dict(exclude_none=True)
- if environment := event.environment:
- event_data["environment"] = environment
- # Message is str
- # Logentry is {"params": etc} Message format
- if logentry := event.logentry:
- event_data["logentry"] = logentry.dict(exclude_none=True)
- elif message := event.message:
- if isinstance(message, str):
- event_data["logentry"] = {"formatted": message}
- else:
- event_data["logentry"] = message.dict(exclude_none=True)
- if message := event.message:
- event_data["message"] = (
- message if isinstance(message, str) else message.formatted
- )
- # When blank, the API will default to the title anyway
- elif title != full_title:
- # If the title is truncated, store the full title
- event_data["message"] = full_title
- if breadcrumbs := event.breadcrumbs:
- event_data["breadcrumbs"] = devalue(breadcrumbs)
- if exception := event.exception:
- event_data["exception"] = devalue(exception)
- if extra := event.extra:
- event_data["extra"] = extra
- if user := event.user:
- event_data["user"] = user.dict(exclude_none=True)
- if contexts := event.contexts:
- event_data["contexts"] = contexts.dict(exclude_none=True)
- processing_events.append(
- ProcessingEvent(
- event=ingest_event,
- issue_hash=issue_hash,
- title=title,
- level=LogLevel.from_string(event.level) if event.level else None,
- transaction=culprit,
- metadata=metadata,
- event_data=event_data,
- event_tags=event_tags,
- release_id=release_id,
- )
- )
- q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
- hash_queryset = IssueHash.objects.filter(q_objects).values(
- "value", "project_id", "issue_id", "issue__status"
- )
- issue_events: list[IssueEvent] = []
- issues_to_reopen = []
- for processing_event in processing_events:
- event_type = processing_event.event.payload.type
- project_id = processing_event.event.project_id
- issue_defaults = {
- "type": event_type,
- "title": sanitize_bad_postgres_chars(processing_event.title),
- "metadata": sanitize_bad_postgres_json(processing_event.metadata),
- "first_seen": processing_event.event.received,
- "last_seen": processing_event.event.received,
- }
- if level := processing_event.level:
- issue_defaults["level"] = level
- for hash_obj in hash_queryset:
- if (
- hash_obj["value"].hex == processing_event.issue_hash
- and hash_obj["project_id"] == project_id
- ):
- processing_event.issue_id = hash_obj["issue_id"]
- if hash_obj["issue__status"] == EventStatus.RESOLVED:
- issues_to_reopen.append(hash_obj["issue_id"])
- break
- if not processing_event.issue_id:
- try:
- with transaction.atomic():
- issue = Issue.objects.create(
- project_id=project_id,
- search_vector=SearchVector(Value(issue_defaults["title"])),
- **issue_defaults,
- )
- new_issue_hash = IssueHash.objects.create(
- issue=issue,
- value=processing_event.issue_hash,
- project_id=project_id,
- )
- check_set_issue_id(
- processing_events,
- issue.project_id,
- new_issue_hash.value,
- issue.id,
- )
- processing_event.issue_id = issue.id
- processing_event.issue_created = True
- except IntegrityError:
- processing_event.issue_id = IssueHash.objects.get(
- project_id=project_id, value=processing_event.issue_hash
- ).issue_id
- issue_events.append(
- IssueEvent(
- id=processing_event.event.event_id,
- issue_id=processing_event.issue_id,
- type=event_type,
- level=processing_event.level
- if processing_event.level
- else LogLevel.ERROR,
- timestamp=processing_event.event.payload.timestamp,
- received=processing_event.event.received,
- title=processing_event.title,
- transaction=processing_event.transaction,
- data=sanitize_bad_postgres_json(processing_event.event_data),
- tags=processing_event.event_tags,
- release_id=processing_event.release_id,
- )
- )
- update_issues(processing_events)
- if issues_to_reopen:
- Issue.objects.filter(id__in=issues_to_reopen).update(
- status=EventStatus.UNRESOLVED
- )
- Notification.objects.filter(issues__in=issues_to_reopen).delete()
- # ignore_conflicts because we could have an invalid duplicate event_id, received
- IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
- # Group events by time and project for event count statistics
- data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
- lambda: defaultdict(int)
- )
- for processing_event in processing_events:
- hour_received = processing_event.event.received.replace(
- minute=0, second=0, microsecond=0
- )
- data_stats[hour_received][processing_event.event.project_id] += 1
- update_tags(processing_events)
- update_statistics(data_stats)
- def update_statistics(
- project_event_stats: defaultdict[datetime, defaultdict[int, int]],
- ):
- # Flatten data for a sql param friendly format and sort to mitigate deadlocks
- data = sorted(
- [
- [year, key, value]
- for year, inner_dict in project_event_stats.items()
- for key, value in inner_dict.items()
- ],
- key=itemgetter(0, 1),
- )
- # Django ORM cannot support F functions in a bulk_update
- # psycopg3 does not support execute_values
- # https://github.com/psycopg/psycopg/issues/114
- with connection.cursor() as cursor:
- args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data)
- sql = (
- "INSERT INTO projects_issueeventprojecthourlystatistic (date, project_id, count)\n"
- f"VALUES {args_str}\n"
- "ON CONFLICT (project_id, date)\n"
- "DO UPDATE SET count = projects_issueeventprojecthourlystatistic.count + EXCLUDED.count;"
- )
- cursor.execute(sql)
- TagStats = defaultdict[
- datetime,
- defaultdict[int, defaultdict[int, defaultdict[int, int]]],
- ]
- def update_tags(processing_events: list[ProcessingEvent]):
- keys = sorted({key for d in processing_events for key in d.event_tags.keys()})
- values = sorted(
- {value for d in processing_events for value in d.event_tags.values()}
- )
- TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True)
- TagValue.objects.bulk_create(
- [TagValue(value=value) for value in values], ignore_conflicts=True
- )
- # Postgres cannot return ids with ignore_conflicts
- tag_keys = {
- tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values()
- }
- tag_values = {
- tag["value"]: tag["id"]
- for tag in TagValue.objects.filter(value__in=values).values()
- }
- tag_stats: TagStats = defaultdict(
- lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
- )
- for processing_event in processing_events:
- if processing_event.issue_id is None:
- continue
- # Group by day. More granular allows for a better search
- # Less granular yields better tag filter performance
- minute_received = processing_event.event.received.replace(
- hour=0, minute=0, second=0, microsecond=0
- )
- for key, value in processing_event.event_tags.items():
- key_id = tag_keys[key]
- value_id = tag_values[value]
- tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1
- if not tag_stats:
- return
- # Sort to mitigate deadlocks
- data = sorted(
- [
- [date, issue_id, key_id, value_id, count]
- for date, d1 in tag_stats.items()
- for issue_id, d2 in d1.items()
- for key_id, d3 in d2.items()
- for value_id, count in d3.items()
- ],
- key=itemgetter(0, 1, 2, 3),
- )
- with connection.cursor() as cursor:
- args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data)
- sql = (
- "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n"
- f"VALUES {args_str}\n"
- "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n"
- "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;"
- )
- cursor.execute(sql)
|