process_event.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. from dataclasses import dataclass
  2. from typing import Any, Optional, Union
  3. from urllib.parse import urlparse
  4. from django.db import transaction
  5. from django.db.models import Q
  6. from django.db.utils import IntegrityError
  7. from alerts.models import Notification
  8. from apps.issue_events.constants import EventStatus
  9. from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
  10. from sentry.culprit import generate_culprit
  11. from sentry.eventtypes.error import ErrorEvent
  12. from .schema import (
  13. EventMessage,
  14. InterchangeIssueEvent,
  15. )
  16. from .utils import generate_hash
  17. @dataclass
  18. class ProcessingEvent:
  19. event: InterchangeIssueEvent
  20. issue_hash: str
  21. title: str
  22. metadata: dict[str, Any]
  23. event_data: dict[str, Any]
  24. issue_id: Optional[int] = None
  25. issue_created = False
  26. def transform_message(message: Union[str, EventMessage]) -> str:
  27. if isinstance(message, str):
  28. return message
  29. if not message.formatted and message.message:
  30. params = message.params
  31. if isinstance(params, list):
  32. return message.message % tuple(params)
  33. elif isinstance(params, dict):
  34. return message.message.format(**params)
  35. return message.formatted
  36. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  37. """
  38. Accepts a list of events to ingest. Events should:
  39. - Few enough to save in a single DB call
  40. - Permission is already checked, these events are to write to the DB
  41. - Some invalid events are tolerated (ignored), including duplicate event id
  42. When there is an error in this function, care should be taken as to when to log,
  43. error, or ignore. If the SDK sends "weird" data, we want to log that.
  44. It's better to save a minimal event than to ignore it.
  45. """
  46. # Collected/calculated event data while processing
  47. processing_events: list[ProcessingEvent] = []
  48. # Collect Q objects for bulk issue hash lookup
  49. q_objects = Q()
  50. for ingest_event in ingest_events:
  51. event_data: dict[str, Any] = {}
  52. event = ingest_event.payload
  53. title = ""
  54. culprit = ""
  55. metadata: dict[str, Any] = {}
  56. if event.type == IssueEventType.ERROR:
  57. sentry_event = ErrorEvent()
  58. metadata = sentry_event.get_metadata(event.dict())
  59. title = sentry_event.get_title(metadata)
  60. culprit = sentry_event.get_location(event.dict())
  61. elif event.type == IssueEventType.CSP:
  62. humanized_directive = event.csp.effective_directive.replace("-src", "")
  63. uri = urlparse(event.csp.blocked_uri).netloc
  64. title = f"Blocked '{humanized_directive}' from '{uri}'"
  65. culprit = "fake culprit"
  66. event_data["csp"] = event.csp.dict()
  67. else: # Default Event Type
  68. title = transform_message(event.message) if event.message else "<untitled>"
  69. culprit = (
  70. event.transaction
  71. if event.transaction
  72. else generate_culprit(event.dict())
  73. )
  74. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  75. event_data["culprit"] = culprit
  76. event_data["metadata"] = metadata
  77. # if breadcrumbs := event.breadcrumbs:
  78. # event_data["breadcrumbs"] = [
  79. # breadcrumb.dict() for breadcrumb in breadcrumbs
  80. # ]
  81. if exception := event.exception:
  82. event_data["exception"] = exception.dict()
  83. processing_events.append(
  84. ProcessingEvent(
  85. event=ingest_event,
  86. issue_hash=issue_hash,
  87. title=title,
  88. metadata=metadata,
  89. event_data=event_data,
  90. )
  91. )
  92. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  93. hash_queryset = IssueHash.objects.filter(q_objects).values(
  94. "value", "project_id", "issue_id", "issue__status"
  95. )
  96. issue_events: list[IssueEvent] = []
  97. issues_to_reopen = []
  98. for processing_event in processing_events:
  99. event_type = processing_event.event.payload.type
  100. project_id = processing_event.event.project_id
  101. issue_defaults = {
  102. "type": event_type,
  103. "title": processing_event.title,
  104. "metadata": processing_event.metadata,
  105. }
  106. for hash_obj in hash_queryset:
  107. if (
  108. hash_obj["value"].hex == issue_hash
  109. and hash_obj["project_id"] == project_id
  110. ):
  111. processing_event.issue_id = hash_obj["issue_id"]
  112. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  113. issues_to_reopen.append(hash_obj["issue_id"])
  114. break
  115. if not processing_event.issue_id:
  116. try:
  117. with transaction.atomic():
  118. issue = Issue.objects.create(
  119. project_id=project_id, **issue_defaults
  120. )
  121. IssueHash.objects.create(
  122. issue=issue, value=issue_hash, project_id=project_id
  123. )
  124. processing_event.issue_id = issue.id
  125. processing_event.issue_created = True
  126. except IntegrityError:
  127. processing_event.issue_id = IssueHash.objects.get(
  128. project_id=project_id, value=issue_hash
  129. ).issue_id
  130. processing_event.event_data["title"] = processing_event.title
  131. issue_events.append(
  132. IssueEvent(
  133. id=processing_event.event.event_id,
  134. date_created=processing_event.event.payload.timestamp,
  135. date_received=processing_event.event.received_at,
  136. issue_id=processing_event.issue_id,
  137. type=event_type,
  138. data=processing_event.event_data,
  139. )
  140. )
  141. if issues_to_reopen:
  142. Issue.objects.filter(id__in=issues_to_reopen).update(
  143. status=EventStatus.UNRESOLVED
  144. )
  145. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  146. # ignore_conflicts because we could have an invalid duplicate event_id
  147. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)