process_event.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. from dataclasses import dataclass
  2. from typing import Any, Optional, Union
  3. from urllib.parse import urlparse
  4. from django.db import transaction
  5. from django.db.models import Q
  6. from django.db.utils import IntegrityError
  7. from alerts.models import Notification
  8. from apps.issue_events.constants import EventStatus
  9. from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
  10. # from apps.issue_events.schema import CSPIssueEventDataSchema, IssueEventDataSchema
  11. from sentry.culprit import generate_culprit
  12. from sentry.eventtypes.error import ErrorEvent
  13. from .schema import (
  14. EventMessage,
  15. InterchangeIssueEvent,
  16. ValueEventException,
  17. )
  18. from .utils import generate_hash
  19. @dataclass
  20. class ProcessingEvent:
  21. event: InterchangeIssueEvent
  22. issue_hash: str
  23. title: str
  24. transaction: str
  25. metadata: dict[str, Any]
  26. event_data: dict[str, Any]
  27. issue_id: Optional[int] = None
  28. issue_created = False
  29. def transform_message(message: Union[str, EventMessage]) -> str:
  30. if isinstance(message, str):
  31. return message
  32. if not message.formatted and message.message:
  33. params = message.params
  34. if isinstance(params, list):
  35. return message.message % tuple(params)
  36. elif isinstance(params, dict):
  37. return message.message.format(**params)
  38. return message.formatted
  39. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  40. """
  41. Accepts a list of events to ingest. Events should:
  42. - Few enough to save in a single DB call
  43. - Permission is already checked, these events are to write to the DB
  44. - Some invalid events are tolerated (ignored), including duplicate event id
  45. When there is an error in this function, care should be taken as to when to log,
  46. error, or ignore. If the SDK sends "weird" data, we want to log that.
  47. It's better to save a minimal event than to ignore it.
  48. """
  49. # Collected/calculated event data while processing
  50. processing_events: list[ProcessingEvent] = []
  51. # Collect Q objects for bulk issue hash lookup
  52. q_objects = Q()
  53. for ingest_event in ingest_events:
  54. event_data: dict[str, Any] = {}
  55. event = ingest_event.payload
  56. title = ""
  57. culprit = ""
  58. metadata: dict[str, Any] = {}
  59. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  60. sentry_event = ErrorEvent()
  61. metadata = sentry_event.get_metadata(event.dict())
  62. if event.type == IssueEventType.ERROR:
  63. title = sentry_event.get_title(metadata)
  64. else:
  65. title = (
  66. transform_message(event.message) if event.message else "<untitled>"
  67. )
  68. culprit = (
  69. event.transaction
  70. if event.transaction
  71. else generate_culprit(event.dict())
  72. )
  73. culprit = sentry_event.get_location(event.dict())
  74. elif event.type == IssueEventType.CSP:
  75. humanized_directive = event.csp.effective_directive.replace("-src", "")
  76. uri = urlparse(event.csp.blocked_uri).netloc
  77. title = f"Blocked '{humanized_directive}' from '{uri}'"
  78. culprit = "fake culprit"
  79. event_data["csp"] = event.csp.dict()
  80. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  81. event_data["metadata"] = metadata
  82. # if breadcrumbs := event.breadcrumbs:
  83. # event_data["breadcrumbs"] = [
  84. # breadcrumb.dict() for breadcrumb in breadcrumbs
  85. # ]
  86. if exception := event.exception:
  87. if isinstance(exception, ValueEventException):
  88. event_data["exception"] = exception.dict()["values"]
  89. else:
  90. event_data["exception"] = exception
  91. processing_events.append(
  92. ProcessingEvent(
  93. event=ingest_event,
  94. issue_hash=issue_hash,
  95. title=title,
  96. transaction=culprit,
  97. metadata=metadata,
  98. event_data=event_data,
  99. )
  100. )
  101. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  102. hash_queryset = IssueHash.objects.filter(q_objects).values(
  103. "value", "project_id", "issue_id", "issue__status"
  104. )
  105. issue_events: list[IssueEvent] = []
  106. issues_to_reopen = []
  107. for processing_event in processing_events:
  108. event_type = processing_event.event.payload.type
  109. project_id = processing_event.event.project_id
  110. issue_defaults = {
  111. "type": event_type,
  112. "title": processing_event.title,
  113. "metadata": processing_event.metadata,
  114. }
  115. for hash_obj in hash_queryset:
  116. if (
  117. hash_obj["value"].hex == issue_hash
  118. and hash_obj["project_id"] == project_id
  119. ):
  120. processing_event.issue_id = hash_obj["issue_id"]
  121. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  122. issues_to_reopen.append(hash_obj["issue_id"])
  123. break
  124. if not processing_event.issue_id:
  125. try:
  126. with transaction.atomic():
  127. issue = Issue.objects.create(
  128. project_id=project_id, **issue_defaults
  129. )
  130. IssueHash.objects.create(
  131. issue=issue, value=issue_hash, project_id=project_id
  132. )
  133. processing_event.issue_id = issue.id
  134. processing_event.issue_created = True
  135. except IntegrityError:
  136. processing_event.issue_id = IssueHash.objects.get(
  137. project_id=project_id, value=issue_hash
  138. ).issue_id
  139. issue_events.append(
  140. IssueEvent(
  141. id=processing_event.event.event_id,
  142. issue_id=processing_event.issue_id,
  143. type=event_type,
  144. timestamp=processing_event.event.payload.timestamp,
  145. received=processing_event.event.received,
  146. title=processing_event.title,
  147. transaction=processing_event.transaction,
  148. message="",
  149. data=processing_event.event_data,
  150. )
  151. )
  152. if issues_to_reopen:
  153. Issue.objects.filter(id__in=issues_to_reopen).update(
  154. status=EventStatus.UNRESOLVED
  155. )
  156. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  157. # ignore_conflicts because we could have an invalid duplicate event_id
  158. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)