process_event.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. from dataclasses import dataclass
  2. from typing import Any, Optional, Union
  3. from urllib.parse import urlparse
  4. from django.db import transaction
  5. from django.db.models import Q
  6. from django.db.utils import IntegrityError
  7. from ninja import Schema
  8. from alerts.models import Notification
  9. from apps.issue_events.constants import EventStatus
  10. from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
  11. # from apps.issue_events.schema import CSPIssueEventDataSchema, IssueEventDataSchema
  12. from sentry.culprit import generate_culprit
  13. from sentry.eventtypes.error import ErrorEvent
  14. from .schema import (
  15. InterchangeIssueEvent,
  16. )
  17. from .utils import generate_hash, transform_parameterized_message
  18. @dataclass
  19. class ProcessingEvent:
  20. event: InterchangeIssueEvent
  21. issue_hash: str
  22. title: str
  23. transaction: str
  24. metadata: dict[str, Any]
  25. event_data: dict[str, Any]
  26. event_tags: dict[str, str]
  27. issue_id: Optional[int] = None
  28. issue_created = False
  29. def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
  30. """
  31. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  32. """
  33. if isinstance(obj, Schema) and hasattr(obj, "values"):
  34. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  35. elif isinstance(obj, list):
  36. return [
  37. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  38. ]
  39. return None
  40. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  41. """
  42. Accepts a list of events to ingest. Events should:
  43. - Few enough to save in a single DB call
  44. - Permission is already checked, these events are to write to the DB
  45. - Some invalid events are tolerated (ignored), including duplicate event id
  46. When there is an error in this function, care should be taken as to when to log,
  47. error, or ignore. If the SDK sends "weird" data, we want to log that.
  48. It's better to save a minimal event than to ignore it.
  49. """
  50. # Collected/calculated event data while processing
  51. processing_events: list[ProcessingEvent] = []
  52. # Collect Q objects for bulk issue hash lookup
  53. q_objects = Q()
  54. for ingest_event in ingest_events:
  55. event_data: dict[str, Any] = {}
  56. event = ingest_event.payload
  57. title = ""
  58. culprit = ""
  59. metadata: dict[str, Any] = {}
  60. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  61. sentry_event = ErrorEvent()
  62. metadata = sentry_event.get_metadata(event.dict())
  63. if event.type == IssueEventType.ERROR and metadata:
  64. title = sentry_event.get_title(metadata)
  65. else:
  66. message = event.message if event.message else event.logentry
  67. title = (
  68. transform_parameterized_message(message)
  69. if message
  70. else "<untitled>"
  71. )
  72. culprit = (
  73. event.transaction
  74. if event.transaction
  75. else generate_culprit(event.dict())
  76. )
  77. culprit = sentry_event.get_location(event.dict())
  78. elif event.type == IssueEventType.CSP:
  79. humanized_directive = event.csp.effective_directive.replace("-src", "")
  80. uri = urlparse(event.csp.blocked_uri).netloc
  81. title = f"Blocked '{humanized_directive}' from '{uri}'"
  82. culprit = "fake culprit"
  83. event_data["csp"] = event.csp.dict()
  84. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  85. if metadata:
  86. event_data["metadata"] = metadata
  87. if platform := event.platform:
  88. event_data["platform"] = platform
  89. if modules := event.modules:
  90. event_data["modules"] = modules
  91. if sdk := event.sdk:
  92. event_data["sdk"] = sdk.dict(exclude_none=True)
  93. if request := event.request:
  94. event_data["request"] = request.dict(exclude_none=True)
  95. if environment := event.environment:
  96. event_data["environment"] = environment
  97. # Message is str
  98. # Logentry is {"params": etc} Message format
  99. if logentry := event.logentry:
  100. event_data["logentry"] = logentry.dict(exclude_none=True)
  101. elif message := event.message:
  102. if isinstance(message, str):
  103. event_data["logentry"] = {"formatted": message}
  104. else:
  105. event_data["logentry"] = message.dict(exclude_none=True)
  106. if message := event.message:
  107. event_data["message"] = (
  108. message if isinstance(message, str) else message.formatted
  109. )
  110. if breadcrumbs := event.breadcrumbs:
  111. event_data["breadcrumbs"] = devalue(breadcrumbs)
  112. if exception := event.exception:
  113. event_data["exception"] = devalue(exception)
  114. processing_events.append(
  115. ProcessingEvent(
  116. event=ingest_event,
  117. issue_hash=issue_hash,
  118. title=title,
  119. transaction=culprit,
  120. metadata=metadata,
  121. event_data=event_data,
  122. event_tags={},
  123. )
  124. )
  125. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  126. hash_queryset = IssueHash.objects.filter(q_objects).values(
  127. "value", "project_id", "issue_id", "issue__status"
  128. )
  129. issue_events: list[IssueEvent] = []
  130. issues_to_reopen = []
  131. for processing_event in processing_events:
  132. event_type = processing_event.event.payload.type
  133. project_id = processing_event.event.project_id
  134. issue_defaults = {
  135. "type": event_type,
  136. "title": processing_event.title,
  137. "metadata": processing_event.metadata,
  138. }
  139. for hash_obj in hash_queryset:
  140. if (
  141. hash_obj["value"].hex == issue_hash
  142. and hash_obj["project_id"] == project_id
  143. ):
  144. processing_event.issue_id = hash_obj["issue_id"]
  145. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  146. issues_to_reopen.append(hash_obj["issue_id"])
  147. break
  148. if not processing_event.issue_id:
  149. try:
  150. with transaction.atomic():
  151. issue = Issue.objects.create(
  152. project_id=project_id, **issue_defaults
  153. )
  154. IssueHash.objects.create(
  155. issue=issue, value=issue_hash, project_id=project_id
  156. )
  157. processing_event.issue_id = issue.id
  158. processing_event.issue_created = True
  159. except IntegrityError:
  160. processing_event.issue_id = IssueHash.objects.get(
  161. project_id=project_id, value=issue_hash
  162. ).issue_id
  163. issue_events.append(
  164. IssueEvent(
  165. id=processing_event.event.event_id,
  166. issue_id=processing_event.issue_id,
  167. type=event_type,
  168. timestamp=processing_event.event.payload.timestamp,
  169. received=processing_event.event.received,
  170. title=processing_event.title,
  171. transaction=processing_event.transaction,
  172. data=processing_event.event_data,
  173. tags=processing_event.event_tags,
  174. )
  175. )
  176. if issues_to_reopen:
  177. Issue.objects.filter(id__in=issues_to_reopen).update(
  178. status=EventStatus.UNRESOLVED
  179. )
  180. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  181. # ignore_conflicts because we could have an invalid duplicate event_id
  182. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)