123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- from dataclasses import dataclass
- from typing import Any, Optional, Union
- from urllib.parse import urlparse
- from django.db import transaction
- from django.db.models import Q
- from django.db.utils import IntegrityError
- from ninja import Schema
- from alerts.models import Notification
- from apps.issue_events.constants import EventStatus
- from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
- # from apps.issue_events.schema import CSPIssueEventDataSchema, IssueEventDataSchema
- from sentry.culprit import generate_culprit
- from sentry.eventtypes.error import ErrorEvent
- from .schema import (
- InterchangeIssueEvent,
- )
- from .utils import generate_hash, transform_parameterized_message
- @dataclass
- class ProcessingEvent:
- event: InterchangeIssueEvent
- issue_hash: str
- title: str
- transaction: str
- metadata: dict[str, Any]
- event_data: dict[str, Any]
- event_tags: dict[str, str]
- issue_id: Optional[int] = None
- issue_created = False
- def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
- """
- Convert Schema like {"values": []} into list or dict without unnecessary 'values'
- """
- if isinstance(obj, Schema) and hasattr(obj, "values"):
- return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
- elif isinstance(obj, list):
- return [
- x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
- ]
- return None
- def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
- """
- Accepts a list of events to ingest. Events should:
- - Few enough to save in a single DB call
- - Permission is already checked, these events are to write to the DB
- - Some invalid events are tolerated (ignored), including duplicate event id
- When there is an error in this function, care should be taken as to when to log,
- error, or ignore. If the SDK sends "weird" data, we want to log that.
- It's better to save a minimal event than to ignore it.
- """
- # Collected/calculated event data while processing
- processing_events: list[ProcessingEvent] = []
- # Collect Q objects for bulk issue hash lookup
- q_objects = Q()
- for ingest_event in ingest_events:
- event_data: dict[str, Any] = {}
- event = ingest_event.payload
- title = ""
- culprit = ""
- metadata: dict[str, Any] = {}
- if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
- sentry_event = ErrorEvent()
- metadata = sentry_event.get_metadata(event.dict())
- if event.type == IssueEventType.ERROR and metadata:
- title = sentry_event.get_title(metadata)
- else:
- message = event.message if event.message else event.logentry
- title = (
- transform_parameterized_message(message)
- if message
- else "<untitled>"
- )
- culprit = (
- event.transaction
- if event.transaction
- else generate_culprit(event.dict())
- )
- culprit = sentry_event.get_location(event.dict())
- elif event.type == IssueEventType.CSP:
- humanized_directive = event.csp.effective_directive.replace("-src", "")
- uri = urlparse(event.csp.blocked_uri).netloc
- title = f"Blocked '{humanized_directive}' from '{uri}'"
- culprit = "fake culprit"
- event_data["csp"] = event.csp.dict()
- issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
- if metadata:
- event_data["metadata"] = metadata
- if platform := event.platform:
- event_data["platform"] = platform
- if modules := event.modules:
- event_data["modules"] = modules
- if sdk := event.sdk:
- event_data["sdk"] = sdk.dict(exclude_none=True)
- if request := event.request:
- event_data["request"] = request.dict(exclude_none=True)
- if environment := event.environment:
- event_data["environment"] = environment
- # Message is str
- # Logentry is {"params": etc} Message format
- if logentry := event.logentry:
- event_data["logentry"] = logentry.dict(exclude_none=True)
- elif message := event.message:
- if isinstance(message, str):
- event_data["logentry"] = {"formatted": message}
- else:
- event_data["logentry"] = message.dict(exclude_none=True)
- if message := event.message:
- event_data["message"] = (
- message if isinstance(message, str) else message.formatted
- )
- if breadcrumbs := event.breadcrumbs:
- event_data["breadcrumbs"] = devalue(breadcrumbs)
- if exception := event.exception:
- event_data["exception"] = devalue(exception)
- processing_events.append(
- ProcessingEvent(
- event=ingest_event,
- issue_hash=issue_hash,
- title=title,
- transaction=culprit,
- metadata=metadata,
- event_data=event_data,
- event_tags={},
- )
- )
- q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
- hash_queryset = IssueHash.objects.filter(q_objects).values(
- "value", "project_id", "issue_id", "issue__status"
- )
- issue_events: list[IssueEvent] = []
- issues_to_reopen = []
- for processing_event in processing_events:
- event_type = processing_event.event.payload.type
- project_id = processing_event.event.project_id
- issue_defaults = {
- "type": event_type,
- "title": processing_event.title,
- "metadata": processing_event.metadata,
- }
- for hash_obj in hash_queryset:
- if (
- hash_obj["value"].hex == issue_hash
- and hash_obj["project_id"] == project_id
- ):
- processing_event.issue_id = hash_obj["issue_id"]
- if hash_obj["issue__status"] == EventStatus.RESOLVED:
- issues_to_reopen.append(hash_obj["issue_id"])
- break
- if not processing_event.issue_id:
- try:
- with transaction.atomic():
- issue = Issue.objects.create(
- project_id=project_id, **issue_defaults
- )
- IssueHash.objects.create(
- issue=issue, value=issue_hash, project_id=project_id
- )
- processing_event.issue_id = issue.id
- processing_event.issue_created = True
- except IntegrityError:
- processing_event.issue_id = IssueHash.objects.get(
- project_id=project_id, value=issue_hash
- ).issue_id
- issue_events.append(
- IssueEvent(
- id=processing_event.event.event_id,
- issue_id=processing_event.issue_id,
- type=event_type,
- timestamp=processing_event.event.payload.timestamp,
- received=processing_event.event.received,
- title=processing_event.title,
- transaction=processing_event.transaction,
- data=processing_event.event_data,
- tags=processing_event.event_tags,
- )
- )
- if issues_to_reopen:
- Issue.objects.filter(id__in=issues_to_reopen).update(
- status=EventStatus.UNRESOLVED
- )
- Notification.objects.filter(issues__in=issues_to_reopen).delete()
- # ignore_conflicts because we could have an invalid duplicate event_id
- IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
|