process_event.py 13 KB


  1. from dataclasses import dataclass
  2. from datetime import datetime
  3. from typing import Any, Optional, Union
  4. from urllib.parse import urlparse
  5. from django.contrib.postgres.search import SearchVector
  6. from django.db import transaction
  7. from django.db.models import F, Q, Value
  8. from django.db.models.functions import Greatest
  9. from django.db.utils import IntegrityError
  10. from ninja import Schema
  11. from user_agents import parse
  12. from alerts.models import Notification
  13. from apps.issue_events.constants import EventStatus
  14. from apps.issue_events.models import Issue, IssueEvent, IssueEventType, IssueHash
  15. from sentry.culprit import generate_culprit
  16. from sentry.eventtypes.error import ErrorEvent
  17. from sentry.utils.strings import truncatechars
  18. from ..shared.schema.contexts import (
  19. BrowserContext,
  20. ContextsSchema,
  21. DeviceContext,
  22. OSContext,
  23. )
  24. from .model_functions import PipeConcat
  25. from .schema import IngestIssueEvent, InterchangeIssueEvent
  26. from .utils import generate_hash, transform_parameterized_message
  27. @dataclass
  28. class ProcessingEvent:
  29. event: InterchangeIssueEvent
  30. issue_hash: str
  31. title: str
  32. transaction: str
  33. metadata: dict[str, Any]
  34. event_data: dict[str, Any]
  35. event_tags: dict[str, str]
  36. issue_id: Optional[int] = None
  37. issue_created = False
  38. @dataclass
  39. class IssueUpdate:
  40. last_seen: datetime
  41. added_count: int = 1
  42. search_vector: str = ""
  43. def update_issues(processing_events: list[ProcessingEvent]):
  44. """
  45. Update any existing issues based on new statistics
  46. """
  47. issues_to_update: dict[int, IssueUpdate] = {}
  48. for processing_event in processing_events:
  49. if processing_event.issue_created:
  50. break
  51. issue_id = processing_event.issue_id
  52. if issue_id in issues_to_update:
  53. issues_to_update[issue_id].added_count += 1
  54. issues_to_update[issue_id].search_vector += f" {processing_event.title}"
  55. if issues_to_update[issue_id].last_seen < processing_event.event.received:
  56. issues_to_update[issue_id].last_seen = processing_event.event.received
  57. elif issue_id:
  58. issues_to_update[issue_id] = IssueUpdate(
  59. last_seen=processing_event.event.received
  60. )
  61. for issue_id, value in issues_to_update.items():
  62. Issue.objects.filter(id=issue_id).update(
  63. count=F("count") + value.added_count,
  64. search_vector=SearchVector(
  65. PipeConcat(F("search_vector"), SearchVector(Value(value.search_vector)))
  66. ),
  67. last_seen=Greatest(F("last_seen"), value.last_seen),
  68. )
  69. def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
  70. """
  71. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  72. """
  73. if isinstance(obj, Schema) and hasattr(obj, "values"):
  74. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  75. elif isinstance(obj, list):
  76. return [
  77. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  78. ]
  79. return None
  80. def generate_contexts(event: IngestIssueEvent) -> ContextsSchema:
  81. """
  82. Add additional contexts if they aren't already set
  83. """
  84. contexts = event.contexts if event.contexts else ContextsSchema(root={})
  85. if request := event.request:
  86. if isinstance(request.headers, list):
  87. if ua_string := next(
  88. (x[1] for x in request.headers if x[0] == "User-Agent"), None
  89. ):
  90. user_agent = parse(ua_string)
  91. if "browser" not in contexts.root:
  92. contexts.root["browser"] = BrowserContext(
  93. name=user_agent.browser.family,
  94. version=user_agent.browser.version_string,
  95. )
  96. if "os" not in contexts.root:
  97. contexts.root["os"] = OSContext(
  98. name=user_agent.os.family, version=user_agent.os.version_string
  99. )
  100. if "device" not in contexts.root:
  101. device = user_agent.device
  102. contexts.root["device"] = DeviceContext(
  103. family=device.family,
  104. model=device.model,
  105. brand=device.brand,
  106. )
  107. return contexts
  108. def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
  109. """Generate key-value tags based on context and other event data"""
  110. tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
  111. if contexts := event.contexts:
  112. if browser := contexts.root.get("browser"):
  113. if isinstance(browser, BrowserContext):
  114. tags["browser.name"] = browser.name
  115. tags["browser"] = f"{browser.name} {browser.version}"
  116. if os := contexts.root.get("os"):
  117. if isinstance(os, OSContext):
  118. tags["os.name"] = os.name
  119. if device := contexts.root.get("device"):
  120. if isinstance(device, DeviceContext) and device.model:
  121. tags["device"] = device.model
  122. if user := event.user:
  123. if user.id:
  124. tags["user.id"] = user.id
  125. if user.email:
  126. tags["user.email"] = user.email
  127. if user.username:
  128. tags["user.username"] = user.username
  129. if environment := event.environment:
  130. tags["environment"] = environment
  131. if release := event.release:
  132. tags["release"] = release
  133. if server_name := event.server_name:
  134. tags["server_name"] = server_name
  135. # Exclude None values
  136. return {key: value for key, value in tags.items() if value}
  137. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  138. """
  139. Accepts a list of events to ingest. Events should be:
  140. - Few enough to save in a single DB call
  141. - Permission is already checked, these events are to write to the DB
  142. - Some invalid events are tolerated (ignored), including duplicate event id
  143. When there is an error in this function, care should be taken as to when to log,
  144. error, or ignore. If the SDK sends "weird" data, we want to log that.
  145. It's better to save a minimal event than to ignore it.
  146. """
  147. # Collected/calculated event data while processing
  148. processing_events: list[ProcessingEvent] = []
  149. # Collect Q objects for bulk issue hash lookup
  150. q_objects = Q()
  151. for ingest_event in ingest_events:
  152. event_data: dict[str, Any] = {}
  153. event = ingest_event.payload
  154. event.contexts = generate_contexts(event)
  155. event_tags = generate_tags(event)
  156. title = ""
  157. culprit = ""
  158. metadata: dict[str, Any] = {}
  159. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  160. sentry_event = ErrorEvent()
  161. metadata = sentry_event.get_metadata(event.dict())
  162. if event.type == IssueEventType.ERROR and metadata:
  163. full_title = sentry_event.get_title(metadata)
  164. else:
  165. message = event.message if event.message else event.logentry
  166. full_title = (
  167. transform_parameterized_message(message)
  168. if message
  169. else "<untitled>"
  170. )
  171. culprit = (
  172. event.transaction
  173. if event.transaction
  174. else generate_culprit(event.dict())
  175. )
  176. title = truncatechars(full_title)
  177. culprit = sentry_event.get_location(event.dict())
  178. elif event.type == IssueEventType.CSP:
  179. humanized_directive = event.csp.effective_directive.replace("-src", "")
  180. uri = urlparse(event.csp.blocked_uri).netloc
  181. full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
  182. culprit = "fake culprit"
  183. event_data["csp"] = event.csp.dict()
  184. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  185. if metadata:
  186. event_data["metadata"] = metadata
  187. if platform := event.platform:
  188. event_data["platform"] = platform
  189. if modules := event.modules:
  190. event_data["modules"] = modules
  191. if sdk := event.sdk:
  192. event_data["sdk"] = sdk.dict(exclude_none=True)
  193. if request := event.request:
  194. event_data["request"] = request.dict(exclude_none=True)
  195. if environment := event.environment:
  196. event_data["environment"] = environment
  197. # Message is str
  198. # Logentry is {"params": etc} Message format
  199. if logentry := event.logentry:
  200. event_data["logentry"] = logentry.dict(exclude_none=True)
  201. elif message := event.message:
  202. if isinstance(message, str):
  203. event_data["logentry"] = {"formatted": message}
  204. else:
  205. event_data["logentry"] = message.dict(exclude_none=True)
  206. if message := event.message:
  207. event_data["message"] = (
  208. message if isinstance(message, str) else message.formatted
  209. )
  210. # When blank, the API will default to the title anyway
  211. elif title != full_title:
  212. # If the title is truncated, store the full title
  213. event_data["message"] = full_title
  214. if breadcrumbs := event.breadcrumbs:
  215. event_data["breadcrumbs"] = devalue(breadcrumbs)
  216. if exception := event.exception:
  217. event_data["exception"] = devalue(exception)
  218. if extra := event.extra:
  219. event_data["extra"] = extra
  220. if user := event.user:
  221. event_data["user"] = user.dict(exclude_none=True)
  222. if contexts := event.contexts:
  223. event_data["contexts"] = contexts.dict(exclude_none=True)
  224. processing_events.append(
  225. ProcessingEvent(
  226. event=ingest_event,
  227. issue_hash=issue_hash,
  228. title=title,
  229. transaction=culprit,
  230. metadata=metadata,
  231. event_data=event_data,
  232. event_tags=event_tags,
  233. )
  234. )
  235. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  236. hash_queryset = IssueHash.objects.filter(q_objects).values(
  237. "value", "project_id", "issue_id", "issue__status"
  238. )
  239. issue_events: list[IssueEvent] = []
  240. issues_to_reopen = []
  241. for processing_event in processing_events:
  242. event_type = processing_event.event.payload.type
  243. project_id = processing_event.event.project_id
  244. issue_defaults = {
  245. "type": event_type,
  246. "title": processing_event.title,
  247. "metadata": processing_event.metadata,
  248. "first_seen": processing_event.event.received,
  249. "last_seen": processing_event.event.received,
  250. }
  251. for hash_obj in hash_queryset:
  252. if (
  253. hash_obj["value"].hex == issue_hash
  254. and hash_obj["project_id"] == project_id
  255. ):
  256. processing_event.issue_id = hash_obj["issue_id"]
  257. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  258. issues_to_reopen.append(hash_obj["issue_id"])
  259. break
  260. if not processing_event.issue_id:
  261. try:
  262. with transaction.atomic():
  263. issue = Issue.objects.create(
  264. project_id=project_id,
  265. search_vector=SearchVector(Value(issue_defaults["title"])),
  266. **issue_defaults,
  267. )
  268. IssueHash.objects.create(
  269. issue=issue, value=issue_hash, project_id=project_id
  270. )
  271. processing_event.issue_id = issue.id
  272. processing_event.issue_created = True
  273. except IntegrityError:
  274. processing_event.issue_id = IssueHash.objects.get(
  275. project_id=project_id, value=issue_hash
  276. ).issue_id
  277. issue_events.append(
  278. IssueEvent(
  279. id=processing_event.event.event_id,
  280. issue_id=processing_event.issue_id,
  281. type=event_type,
  282. timestamp=processing_event.event.payload.timestamp,
  283. received=processing_event.event.received,
  284. title=processing_event.title,
  285. transaction=processing_event.transaction,
  286. data=processing_event.event_data,
  287. tags=processing_event.event_tags,
  288. )
  289. )
  290. update_issues(processing_events)
  291. if issues_to_reopen:
  292. Issue.objects.filter(id__in=issues_to_reopen).update(
  293. status=EventStatus.UNRESOLVED
  294. )
  295. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  296. # ignore_conflicts because we could have an invalid duplicate event_id
  297. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)