process_event.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from dataclasses import dataclass
  2. from typing import Any, Optional, Union
  3. from urllib.parse import urlparse
  4. from django.db import transaction
  5. from django.db.models import Q
  6. from django.db.utils import IntegrityError
  7. from alerts.models import Notification
  8. from apps.issue_events.constants import EventStatus
  9. from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
  10. from sentry.culprit import generate_culprit
  11. from .schema import (
  12. EventMessage,
  13. InterchangeIssueEvent,
  14. )
  15. from .utils import generate_hash
  16. @dataclass
  17. class ProcessingEvent:
  18. event: InterchangeIssueEvent
  19. issue_hash: str
  20. title: str
  21. event_data: dict[str, Any]
  22. issue_id: Optional[int] = None
  23. issue_created = False
  24. def transform_message(message: Union[str, EventMessage]) -> str:
  25. if isinstance(message, str):
  26. return message
  27. if not message.formatted and message.message:
  28. params = message.params
  29. if isinstance(params, list):
  30. return message.message % tuple(params)
  31. elif isinstance(params, dict):
  32. return message.message.format(**params)
  33. return message.formatted
  34. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  35. """
  36. Accepts a list of events to ingest. Events should:
  37. - Few enough to save in a single DB call
  38. - Permission is already checked, these events are to write to the DB
  39. - Some invalid events are tolerated (ignored), including duplicate event id
  40. When there is an error in this function, care should be taken as to when to log,
  41. error, or ignore. If the SDK sends "weird" data, we want to log that.
  42. It's better to save a minimal event than to ignore it.
  43. """
  44. # Collected/calculated event data while processing
  45. processing_events: list[ProcessingEvent] = []
  46. # Collect Q objects for bulk issue hash lookup
  47. q_objects = Q()
  48. for ingest_event in ingest_events:
  49. event_data: dict[str, Any] = {}
  50. event = ingest_event.payload
  51. title = ""
  52. culprit = ""
  53. if event.type == IssueEventType.ERROR:
  54. # sentry_event = ErrorEvent()
  55. # metadata = eventtype.get_metadata(data)
  56. title = "fake title"
  57. culprit = "fake culprit"
  58. elif event.type == IssueEventType.CSP:
  59. humanized_directive = event.csp.effective_directive.replace("-src", "")
  60. uri = urlparse(event.csp.blocked_uri).netloc
  61. title = f"Blocked '{humanized_directive}' from '{uri}'"
  62. culprit = "fake culprit"
  63. event_data["csp"] = event.csp.dict()
  64. else: # Default Event Type
  65. title = transform_message(event.message) if event.message else "<untitled>"
  66. culprit = (
  67. event.transaction
  68. if event.transaction
  69. else generate_culprit(event.dict())
  70. )
  71. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  72. processing_events.append(
  73. ProcessingEvent(
  74. event=ingest_event,
  75. issue_hash=issue_hash,
  76. title=title,
  77. event_data=event_data,
  78. )
  79. )
  80. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  81. hash_queryset = IssueHash.objects.filter(q_objects).values(
  82. "value", "project_id", "issue_id", "issue__status"
  83. )
  84. issue_events: list[IssueEvent] = []
  85. issues_to_reopen = []
  86. for processing_event in processing_events:
  87. event_type = processing_event.event.payload.type
  88. project_id = processing_event.event.project_id
  89. issue_defaults = {
  90. "type": event_type,
  91. "title": processing_event.title,
  92. }
  93. for hash_obj in hash_queryset:
  94. if (
  95. hash_obj["value"].hex == issue_hash
  96. and hash_obj["project_id"] == project_id
  97. ):
  98. processing_event.issue_id = hash_obj["issue_id"]
  99. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  100. issues_to_reopen.append(hash_obj["issue_id"])
  101. break
  102. if not processing_event.issue_id:
  103. try:
  104. with transaction.atomic():
  105. issue = Issue.objects.create(
  106. project_id=project_id, **issue_defaults
  107. )
  108. IssueHash.objects.create(
  109. issue=issue, value=issue_hash, project_id=project_id
  110. )
  111. processing_event.issue_id = issue.id
  112. processing_event.issue_created = True
  113. except IntegrityError:
  114. processing_event.issue_id = IssueHash.objects.get(
  115. project_id=project_id, value=issue_hash
  116. ).issue_id
  117. issue_events.append(
  118. IssueEvent(
  119. id=processing_event.event.event_id,
  120. date_received=processing_event.event.received_at,
  121. issue_id=processing_event.issue_id,
  122. type=event_type,
  123. data=processing_event.event_data,
  124. )
  125. )
  126. if issues_to_reopen:
  127. Issue.objects.filter(id__in=issues_to_reopen).update(
  128. status=EventStatus.UNRESOLVED
  129. )
  130. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  131. # ignore_conflicts because we could have an invalid duplicate event_id
  132. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)