process_event.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903
  1. from collections import defaultdict
  2. from dataclasses import dataclass
  3. from datetime import datetime, timedelta
  4. from operator import itemgetter
  5. from typing import Any, Optional
  6. from urllib.parse import urlparse
  7. from django.conf import settings
  8. from django.contrib.postgres.search import SearchVector
  9. from django.db import connection, transaction
  10. from django.db.models import (
  11. Exists,
  12. F,
  13. OuterRef,
  14. Q,
  15. QuerySet,
  16. Value,
  17. )
  18. from django.db.models.functions import Coalesce, Greatest
  19. from django.db.utils import IntegrityError
  20. from django.utils import timezone
  21. from django_redis import get_redis_connection
  22. from ninja import Schema
  23. from user_agents import parse
  24. from apps.alerts.constants import ISSUE_IDS_KEY
  25. from apps.alerts.models import Notification
  26. from apps.difs.models import DebugInformationFile
  27. from apps.difs.tasks import event_difs_resolve_stacktrace
  28. from apps.environments.models import Environment, EnvironmentProject
  29. from apps.issue_events.constants import EventStatus, LogLevel
  30. from apps.issue_events.models import (
  31. Issue,
  32. IssueEvent,
  33. IssueEventType,
  34. IssueHash,
  35. TagKey,
  36. TagValue,
  37. )
  38. from apps.performance.models import TransactionEvent, TransactionGroup
  39. from apps.projects.models import Project
  40. from apps.releases.models import Release
  41. from apps.sourcecode.models import DebugSymbolBundle
  42. from sentry.culprit import generate_culprit
  43. from sentry.eventtypes.error import ErrorEvent
  44. from sentry.utils.strings import truncatechars
  45. from ..shared.schema.contexts import (
  46. BrowserContext,
  47. Contexts,
  48. DeviceContext,
  49. OSContext,
  50. )
  51. from .javascript_event_processor import JavascriptEventProcessor
  52. from .model_functions import PipeConcat
  53. from .schema import (
  54. ErrorIssueEventSchema,
  55. IngestIssueEvent,
  56. InterchangeIssueEvent,
  57. InterchangeTransactionEvent,
  58. IssueEventSchema,
  59. SourceMapImage,
  60. )
  61. from .utils import generate_hash, remove_bad_chars, transform_parameterized_message
  62. @dataclass
  63. class ProcessingEvent:
  64. event: InterchangeIssueEvent
  65. issue_hash: str
  66. title: str
  67. transaction: str
  68. metadata: dict[str, Any]
  69. event_data: dict[str, Any]
  70. event_tags: dict[str, str]
  71. level: Optional[LogLevel] = None
  72. issue_id: Optional[int] = None
  73. issue_created = False
  74. release_id: Optional[int] = None
  75. @dataclass
  76. class IssueUpdate:
  77. last_seen: datetime
  78. search_vector: str
  79. added_count: int = 1
  80. def get_search_vector(event: ProcessingEvent) -> str:
  81. return f"{event.title} {event.transaction}"
  82. def update_issues(processing_events: list[ProcessingEvent]):
  83. """
  84. Update any existing issues based on new statistics
  85. """
  86. issues_to_update: dict[int, IssueUpdate] = {}
  87. for processing_event in processing_events:
  88. if processing_event.issue_created:
  89. break
  90. issue_id = processing_event.issue_id
  91. if issue_id in issues_to_update:
  92. issues_to_update[issue_id].added_count += 1
  93. issues_to_update[
  94. issue_id
  95. ].search_vector += f" {get_search_vector(processing_event)}"
  96. if issues_to_update[issue_id].last_seen < processing_event.event.received:
  97. issues_to_update[issue_id].last_seen = processing_event.event.received
  98. elif issue_id:
  99. issues_to_update[issue_id] = IssueUpdate(
  100. last_seen=processing_event.event.received,
  101. search_vector=get_search_vector(processing_event),
  102. )
  103. for issue_id, value in issues_to_update.items():
  104. Issue.objects.filter(id=issue_id).update(
  105. count=F("count") + value.added_count,
  106. search_vector=PipeConcat(
  107. F("search_vector"), SearchVector(Value(value.search_vector))
  108. ),
  109. last_seen=Greatest(F("last_seen"), value.last_seen),
  110. )
  111. def devalue(obj: Schema | list[Schema]) -> dict | list[dict] | None:
  112. """
  113. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  114. """
  115. if isinstance(obj, Schema) and hasattr(obj, "values"):
  116. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  117. elif isinstance(obj, list):
  118. return [
  119. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  120. ]
  121. return None
  122. def generate_contexts(event: IngestIssueEvent) -> Contexts:
  123. """
  124. Add additional contexts if they aren't already set
  125. """
  126. contexts = event.contexts if event.contexts else Contexts({})
  127. if request := event.request:
  128. if isinstance(request.headers, list):
  129. if ua_string := next(
  130. (x[1] for x in request.headers if x[0] == "User-Agent"), None
  131. ):
  132. user_agent = parse(ua_string)
  133. if "browser" not in contexts:
  134. contexts["browser"] = BrowserContext(
  135. name=user_agent.browser.family,
  136. version=user_agent.browser.version_string,
  137. )
  138. if "os" not in contexts:
  139. contexts["os"] = OSContext(
  140. name=user_agent.os.family, version=user_agent.os.version_string
  141. )
  142. if "device" not in contexts:
  143. device = user_agent.device
  144. contexts["device"] = DeviceContext(
  145. family=device.family,
  146. model=device.model,
  147. brand=device.brand,
  148. )
  149. return contexts
  150. def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
  151. """Generate key-value tags based on context and other event data"""
  152. tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
  153. if contexts := event.contexts:
  154. if browser := contexts.get("browser"):
  155. if isinstance(browser, BrowserContext):
  156. tags["browser.name"] = browser.name
  157. tags["browser"] = f"{browser.name} {browser.version}"
  158. if os := contexts.get("os"):
  159. if isinstance(os, OSContext):
  160. tags["os.name"] = os.name
  161. if device := contexts.get("device"):
  162. if isinstance(device, DeviceContext) and device.model:
  163. tags["device"] = device.model
  164. if user := event.user:
  165. if user.id:
  166. tags["user.id"] = user.id
  167. if user.email:
  168. tags["user.email"] = user.email
  169. if user.username:
  170. tags["user.username"] = user.username
  171. if environment := event.environment:
  172. tags["environment"] = environment
  173. if release := event.release:
  174. tags["release"] = release
  175. if server_name := event.server_name:
  176. tags["server_name"] = server_name
  177. # Exclude None values
  178. return {key: value for key, value in tags.items() if value}
  179. def check_set_issue_id(
  180. processing_events: list[ProcessingEvent],
  181. project_id: int,
  182. issue_hash: str,
  183. issue_id: int,
  184. ):
  185. """
  186. It's common to receive two duplicate events at the same time,
  187. where the issue has never been seen before. This is an optimization
  188. that checks if there is a known project/hash. If so, we can infer the
  189. issue_id.
  190. """
  191. for event in processing_events:
  192. if (
  193. event.issue_id is None
  194. and event.event.project_id == project_id
  195. and event.issue_hash == issue_hash
  196. ):
  197. event.issue_id = issue_id
  198. def create_environments(
  199. environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  200. ):
  201. """
  202. Create newly seen environments.
  203. Functions determines which, if any, environments are present in event data
  204. but not the database. Optimized to do a much work in python and reduce queries.
  205. """
  206. environments_to_create = [
  207. Environment(name=name, organization_id=organization_id)
  208. for name, project_id, organization_id in environment_set
  209. if not next(
  210. (
  211. x
  212. for x in projects_with_data
  213. if x["environment_name"] == name and x["id"] == project_id
  214. ),
  215. None,
  216. )
  217. ]
  218. if environments_to_create:
  219. Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True)
  220. query = Q()
  221. for environment in environments_to_create:
  222. query |= Q(
  223. name=environment.name, organization_id=environment.organization_id
  224. )
  225. environments = Environment.objects.filter(query)
  226. environment_projects: list = []
  227. for environment in environments:
  228. project_id = next(
  229. project_id
  230. for (name, project_id, organization_id) in environment_set
  231. if environment.name == name
  232. and environment.organization_id == organization_id
  233. )
  234. environment_projects.append(
  235. EnvironmentProject(project_id=project_id, environment=environment)
  236. )
  237. EnvironmentProject.objects.bulk_create(
  238. environment_projects, ignore_conflicts=True
  239. )
  240. def get_and_create_releases(
  241. release_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  242. ) -> list[tuple[str, int, int]]:
  243. """
  244. Create newly seen releases.
  245. functions determines which, if any, releases are present in event data
  246. but not the database. Optimized to do a much work in python and reduce queries.
  247. Return list of tuples: Release version, project_id, release_id
  248. """
  249. releases_to_create = [
  250. Release(version=release_name, organization_id=organization_id)
  251. for release_name, project_id, organization_id in release_set
  252. if not next(
  253. (
  254. x
  255. for x in projects_with_data
  256. if x["release_name"] == release_name and x["id"] == project_id
  257. ),
  258. None,
  259. )
  260. ]
  261. releases: list | QuerySet = []
  262. if releases_to_create:
  263. # Create database records for any release that doesn't exist
  264. Release.objects.bulk_create(releases_to_create, ignore_conflicts=True)
  265. query = Q()
  266. for release in releases_to_create:
  267. query |= Q(version=release.version, organization_id=release.organization_id)
  268. releases = Release.objects.filter(query)
  269. ReleaseProject = Release.projects.through
  270. release_projects = [
  271. ReleaseProject(
  272. release=release,
  273. project_id=next(
  274. project_id
  275. for (version, project_id, organization_id) in release_set
  276. if release.version == version
  277. and release.organization_id == organization_id
  278. ),
  279. )
  280. for release in releases
  281. ]
  282. ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True)
  283. return [
  284. (
  285. version,
  286. project_id,
  287. next(
  288. (
  289. project["release_id"]
  290. for project in projects_with_data
  291. if project["release_name"] == version
  292. and project["id"] == project_id
  293. ),
  294. next(
  295. (
  296. release.id
  297. for release in releases
  298. if release.version == version
  299. and release.organization_id == organization_id
  300. ),
  301. 0,
  302. ),
  303. ),
  304. )
  305. for version, project_id, organization_id in release_set
  306. ]
  307. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  308. """
  309. Accepts a list of events to ingest. Events should be:
  310. - Few enough to save in a single DB call
  311. - Permission is already checked, these events are to write to the DB
  312. - Some invalid events are tolerated (ignored), including duplicate event id
  313. When there is an error in this function, care should be taken as to when to log,
  314. error, or ignore. If the SDK sends "weird" data, we want to log that.
  315. It's better to save a minimal event than to ignore it.
  316. """
  317. # Fetch any needed releases, environments, and whether there is a dif file association
  318. # Get unique release/environment for each project_id
  319. release_set = {
  320. (event.payload.release, event.project_id, event.organization_id)
  321. for event in ingest_events
  322. if event.payload.release
  323. }
  324. environment_set = {
  325. (event.payload.environment[:255], event.project_id, event.organization_id)
  326. for event in ingest_events
  327. if event.payload.environment
  328. }
  329. project_set = {project_id for _, project_id, _ in release_set}.union(
  330. {project_id for _, project_id, _ in environment_set}
  331. )
  332. release_version_set = {version for version, _, _ in release_set}
  333. environment_name_set = {name for name, _, _ in environment_set}
  334. projects_with_data = (
  335. Project.objects.filter(id__in=project_set)
  336. .annotate(
  337. has_difs=Exists(
  338. DebugInformationFile.objects.filter(project_id=OuterRef("pk"))
  339. ),
  340. release_id=Coalesce("releases__id", Value(None)),
  341. release_name=Coalesce("releases__version", Value(None)),
  342. environment_id=Coalesce("environment__id", Value(None)),
  343. environment_name=Coalesce("environment__name", Value(None)),
  344. )
  345. .filter(release_name__in=release_version_set.union({None}))
  346. .filter(environment_name__in=environment_name_set.union({None}))
  347. .values(
  348. "id",
  349. "has_difs",
  350. "release_id",
  351. "release_name",
  352. "environment_id",
  353. "environment_name",
  354. )
  355. )
  356. releases = get_and_create_releases(release_set, projects_with_data)
  357. create_environments(environment_set, projects_with_data)
  358. sourcemap_images = [
  359. image
  360. for event in ingest_events
  361. if isinstance(event.payload, ErrorIssueEventSchema) and event.payload.debug_meta
  362. for image in event.payload.debug_meta.images
  363. if isinstance(image, SourceMapImage)
  364. ]
  365. # Get each unique filename from each stacktrace frame
  366. # The nesting is from the variable ways ingest data is accepted
  367. # IMO it's even harder to read unnested...
  368. filename_set = {
  369. frame.filename.split("/")[-1]
  370. for event in ingest_events
  371. if isinstance(event.payload, (ErrorIssueEventSchema, IssueEventSchema))
  372. and event.payload.exception
  373. for exception in (
  374. event.payload.exception
  375. if isinstance(event.payload.exception, list)
  376. else event.payload.exception.values
  377. )
  378. if exception.stacktrace
  379. for frame in exception.stacktrace.frames
  380. if frame.filename
  381. }
  382. debug_files = (
  383. DebugSymbolBundle.objects.filter(
  384. organization__in={event.organization_id for event in ingest_events}
  385. )
  386. .filter(
  387. Q(
  388. release__version__in=release_version_set,
  389. release__projects__in=project_set,
  390. file__name__in=filename_set,
  391. )
  392. | Q(debug_id__in={image.debug_id for image in sourcemap_images})
  393. )
  394. .select_related("file", "sourcemap_file", "release")
  395. )
  396. now = timezone.now()
  397. # Update last used if older than 1 day, to minimize queries
  398. if debug_files:
  399. debug_files.filter(last_used__gt=now - timedelta(days=1)).update(last_used=now)
  400. # Collected/calculated event data while processing
  401. processing_events: list[ProcessingEvent] = []
  402. # Collect Q objects for bulk issue hash lookup
  403. q_objects = Q()
  404. for ingest_event in ingest_events:
  405. event_data: dict[str, Any] = {}
  406. event = ingest_event.payload
  407. event.contexts = generate_contexts(event)
  408. event_tags = generate_tags(event)
  409. title = ""
  410. culprit = ""
  411. metadata: dict[str, Any] = {}
  412. release_id = next(
  413. (
  414. release_id
  415. for version, project_id, release_id in releases
  416. if version == event_tags.get("release")
  417. and ingest_event.project_id == project_id
  418. ),
  419. None,
  420. )
  421. if event.platform in ("javascript", "node"):
  422. event_debug_files = [
  423. debug_file
  424. for debug_file in debug_files
  425. if debug_file.organization_id == ingest_event.organization_id
  426. ]
  427. # Assign code_file to file headers
  428. if event.debug_meta:
  429. for sourcemap_image in [
  430. image
  431. for image in event.debug_meta.images
  432. if isinstance(image, SourceMapImage)
  433. ]:
  434. for debug_file in event_debug_files:
  435. if sourcemap_image.debug_id == debug_file.debug_id:
  436. debug_file.data["code_file"] = sourcemap_image.code_file
  437. JavascriptEventProcessor(
  438. release_id,
  439. event,
  440. [
  441. debug_file
  442. for debug_file in event_debug_files
  443. if debug_file.release_id == release_id
  444. or debug_file.data.get("code_file")
  445. ],
  446. ).transform()
  447. elif (
  448. isinstance(event, ErrorIssueEventSchema)
  449. and event.exception
  450. and next(
  451. (
  452. project["has_difs"]
  453. for project in projects_with_data
  454. if project["id"] == ingest_event.project_id
  455. ),
  456. False,
  457. )
  458. ):
  459. event_difs_resolve_stacktrace(event, ingest_event.project_id)
  460. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  461. sentry_event = ErrorEvent()
  462. metadata = sentry_event.get_metadata(event.dict())
  463. if event.type == IssueEventType.ERROR and metadata:
  464. full_title = sentry_event.get_title(metadata)
  465. else:
  466. message = event.message if event.message else event.logentry
  467. full_title = (
  468. transform_parameterized_message(message)
  469. if message
  470. else "<untitled>"
  471. )
  472. culprit = (
  473. event.transaction
  474. if event.transaction
  475. else generate_culprit(event.dict())
  476. )
  477. title = truncatechars(full_title)
  478. culprit = sentry_event.get_location(event.dict())
  479. elif event.type == IssueEventType.CSP:
  480. humanized_directive = event.csp.effective_directive.replace("-src", "")
  481. uri = urlparse(event.csp.blocked_uri).netloc
  482. full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
  483. culprit = event.csp.effective_directive
  484. event_data["csp"] = event.csp.dict()
  485. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  486. if metadata:
  487. event_data["metadata"] = metadata
  488. if platform := event.platform:
  489. event_data["platform"] = platform
  490. if modules := event.modules:
  491. event_data["modules"] = modules
  492. if sdk := event.sdk:
  493. event_data["sdk"] = sdk.dict(exclude_none=True)
  494. if request := event.request:
  495. event_data["request"] = request.dict(exclude_none=True)
  496. if environment := event.environment:
  497. event_data["environment"] = environment
  498. # Message is str
  499. # Logentry is {"params": etc} Message format
  500. if logentry := event.logentry:
  501. event_data["logentry"] = logentry.dict(exclude_none=True)
  502. elif message := event.message:
  503. if isinstance(message, str):
  504. event_data["logentry"] = {"formatted": message}
  505. else:
  506. event_data["logentry"] = message.dict(exclude_none=True)
  507. if message := event.message:
  508. event_data["message"] = (
  509. message if isinstance(message, str) else message.formatted
  510. )
  511. # When blank, the API will default to the title anyway
  512. elif title != full_title:
  513. # If the title is truncated, store the full title
  514. event_data["message"] = full_title
  515. if breadcrumbs := event.breadcrumbs:
  516. event_data["breadcrumbs"] = devalue(breadcrumbs)
  517. if exception := event.exception:
  518. event_data["exception"] = devalue(exception)
  519. if extra := event.extra:
  520. event_data["extra"] = extra
  521. if user := event.user:
  522. event_data["user"] = user.dict(exclude_none=True)
  523. if contexts := event.contexts:
  524. # Contexts may contain dict or Schema
  525. event_data["contexts"] = {
  526. key: value.dict(exclude_none=True)
  527. if isinstance(value, Schema)
  528. else value
  529. for key, value in contexts.items()
  530. }
  531. processing_events.append(
  532. ProcessingEvent(
  533. event=ingest_event,
  534. issue_hash=issue_hash,
  535. title=title,
  536. level=LogLevel.from_string(event.level) if event.level else None,
  537. transaction=culprit,
  538. metadata=metadata,
  539. event_data=event_data,
  540. event_tags=event_tags,
  541. release_id=release_id,
  542. )
  543. )
  544. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  545. hash_queryset = IssueHash.objects.filter(q_objects).values(
  546. "value", "project_id", "issue_id", "issue__status"
  547. )
  548. issue_events: list[IssueEvent] = []
  549. issues_to_reopen = []
  550. for processing_event in processing_events:
  551. event_type = processing_event.event.payload.type
  552. project_id = processing_event.event.project_id
  553. issue_defaults = {
  554. "type": event_type,
  555. "title": remove_bad_chars(processing_event.title),
  556. "metadata": remove_bad_chars(processing_event.metadata),
  557. "first_seen": processing_event.event.received,
  558. "last_seen": processing_event.event.received,
  559. }
  560. if level := processing_event.level:
  561. issue_defaults["level"] = level
  562. for hash_obj in hash_queryset:
  563. if (
  564. hash_obj["value"].hex == processing_event.issue_hash
  565. and hash_obj["project_id"] == project_id
  566. ):
  567. processing_event.issue_id = hash_obj["issue_id"]
  568. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  569. issues_to_reopen.append(hash_obj["issue_id"])
  570. break
  571. if not processing_event.issue_id:
  572. with connection.cursor() as cursor:
  573. cursor.execute(
  574. """
  575. INSERT INTO projects_projectcounter (project_id, value)
  576. VALUES (%s, 1)
  577. ON CONFLICT (project_id) DO UPDATE
  578. SET value = projects_projectcounter.value + 1
  579. RETURNING value;
  580. """,
  581. [project_id],
  582. )
  583. issue_defaults["short_id"] = cursor.fetchone()[0]
  584. try:
  585. with transaction.atomic():
  586. issue = Issue.objects.create(
  587. project_id=project_id,
  588. search_vector=SearchVector(Value(issue_defaults["title"])),
  589. **issue_defaults,
  590. )
  591. new_issue_hash = IssueHash.objects.create(
  592. issue=issue,
  593. value=processing_event.issue_hash,
  594. project_id=project_id,
  595. )
  596. check_set_issue_id(
  597. processing_events,
  598. issue.project_id,
  599. new_issue_hash.value,
  600. issue.id,
  601. )
  602. processing_event.issue_id = issue.id
  603. processing_event.issue_created = True
  604. except IntegrityError:
  605. processing_event.issue_id = IssueHash.objects.get(
  606. project_id=project_id, value=processing_event.issue_hash
  607. ).issue_id
  608. issue_events.append(
  609. IssueEvent(
  610. id=processing_event.event.event_id,
  611. issue_id=processing_event.issue_id,
  612. type=event_type,
  613. level=processing_event.level
  614. if processing_event.level
  615. else LogLevel.ERROR,
  616. timestamp=processing_event.event.payload.timestamp,
  617. received=processing_event.event.received,
  618. title=processing_event.title,
  619. transaction=processing_event.transaction,
  620. data=remove_bad_chars(processing_event.event_data),
  621. tags=processing_event.event_tags,
  622. release_id=processing_event.release_id,
  623. )
  624. )
  625. update_issues(processing_events)
  626. if settings.CACHE_IS_REDIS:
  627. # Add set of issue_ids for alerts to process later
  628. with get_redis_connection("default") as con:
  629. if (
  630. con.sadd(
  631. ISSUE_IDS_KEY, *{event.issue_id for event in processing_events}
  632. )
  633. > 0
  634. ):
  635. # Set a long expiration time when a key is added
  636. # We want all keys to have a long "sanity check" TTL to avoid redis out
  637. # of memory errors (we can't ensure end users use all keys lru eviction)
  638. con.expire(ISSUE_IDS_KEY, 3600)
  639. if issues_to_reopen:
  640. Issue.objects.filter(id__in=issues_to_reopen).update(
  641. status=EventStatus.UNRESOLVED
  642. )
  643. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  644. # ignore_conflicts because we could have an invalid duplicate event_id, received
  645. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
  646. # Group events by time and project for event count statistics
  647. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  648. lambda: defaultdict(int)
  649. )
  650. for processing_event in processing_events:
  651. hour_received = processing_event.event.received.replace(
  652. minute=0, second=0, microsecond=0
  653. )
  654. data_stats[hour_received][processing_event.event.project_id] += 1
  655. update_tags(processing_events)
  656. update_statistics(data_stats)
  657. def update_statistics(
  658. project_event_stats: defaultdict[datetime, defaultdict[int, int]], is_issue=True
  659. ):
  660. # Flatten data for a sql param friendly format and sort to mitigate deadlocks
  661. data = sorted(
  662. [
  663. [year, key, value]
  664. for year, inner_dict in project_event_stats.items()
  665. for key, value in inner_dict.items()
  666. ],
  667. key=itemgetter(0, 1),
  668. )
  669. table = (
  670. "projects_issueeventprojecthourlystatistic"
  671. if is_issue
  672. else "projects_transactioneventprojecthourlystatistic"
  673. )
  674. # Django ORM cannot support F functions in a bulk_update
  675. # psycopg does not support execute_values
  676. # https://github.com/psycopg/psycopg/issues/114
  677. with connection.cursor() as cursor:
  678. args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data)
  679. sql = (
  680. f"INSERT INTO {table} (date, project_id, count)\n"
  681. f"VALUES {args_str}\n"
  682. "ON CONFLICT (project_id, date)\n"
  683. f"DO UPDATE SET count = {table}.count + EXCLUDED.count;"
  684. )
  685. cursor.execute(sql)
  686. TagStats = defaultdict[
  687. datetime,
  688. defaultdict[int, defaultdict[int, defaultdict[int, int]]],
  689. ]
  690. def update_tags(processing_events: list[ProcessingEvent]):
  691. keys = sorted({key for d in processing_events for key in d.event_tags.keys()})
  692. values = sorted(
  693. {value for d in processing_events for value in d.event_tags.values()}
  694. )
  695. TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True)
  696. TagValue.objects.bulk_create(
  697. [TagValue(value=value) for value in values], ignore_conflicts=True
  698. )
  699. # Postgres cannot return ids with ignore_conflicts
  700. tag_keys = {
  701. tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values()
  702. }
  703. tag_values = {
  704. tag["value"]: tag["id"]
  705. for tag in TagValue.objects.filter(value__in=values).values()
  706. }
  707. tag_stats: TagStats = defaultdict(
  708. lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
  709. )
  710. for processing_event in processing_events:
  711. if processing_event.issue_id is None:
  712. continue
  713. # Group by day. More granular allows for a better search
  714. # Less granular yields better tag filter performance
  715. minute_received = processing_event.event.received.replace(
  716. hour=0, minute=0, second=0, microsecond=0
  717. )
  718. for key, value in processing_event.event_tags.items():
  719. key_id = tag_keys[key]
  720. value_id = tag_values[value]
  721. tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1
  722. if not tag_stats:
  723. return
  724. # Sort to mitigate deadlocks
  725. data = sorted(
  726. [
  727. [date, issue_id, key_id, value_id, count]
  728. for date, d1 in tag_stats.items()
  729. for issue_id, d2 in d1.items()
  730. for key_id, d3 in d2.items()
  731. for value_id, count in d3.items()
  732. ],
  733. key=itemgetter(0, 1, 2, 3),
  734. )
  735. with connection.cursor() as cursor:
  736. args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data)
  737. sql = (
  738. "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n"
  739. f"VALUES {args_str}\n"
  740. "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n"
  741. "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;"
  742. )
  743. cursor.execute(sql)
  744. # Transactions
  745. def process_transaction_events(ingest_events: list[InterchangeTransactionEvent]):
  746. release_set = {
  747. (event.payload.release, event.project_id, event.organization_id)
  748. for event in ingest_events
  749. if event.payload.release
  750. }
  751. environment_set = {
  752. (event.payload.environment[:255], event.project_id, event.organization_id)
  753. for event in ingest_events
  754. if event.payload.environment
  755. }
  756. project_set = {project_id for _, project_id, _ in release_set}.union(
  757. {project_id for _, project_id, _ in environment_set}
  758. )
  759. release_version_set = {version for version, _, _ in release_set}
  760. environment_name_set = {name for name, _, _ in environment_set}
  761. projects_with_data = (
  762. Project.objects.filter(id__in=project_set)
  763. .annotate(
  764. release_id=Coalesce("releases__id", Value(None)),
  765. release_name=Coalesce("releases__version", Value(None)),
  766. environment_id=Coalesce("environment__id", Value(None)),
  767. environment_name=Coalesce("environment__name", Value(None)),
  768. )
  769. .filter(release_name__in=release_version_set.union({None}))
  770. .filter(environment_name__in=environment_name_set.union({None}))
  771. .values(
  772. "id",
  773. "release_id",
  774. "release_name",
  775. "environment_id",
  776. "environment_name",
  777. )
  778. )
  779. get_and_create_releases(release_set, projects_with_data)
  780. create_environments(environment_set, projects_with_data)
  781. transactions = []
  782. for ingest_event in ingest_events:
  783. event = ingest_event.payload
  784. contexts = event.contexts
  785. request = event.request
  786. trace_id = contexts["trace"]["trace_id"]
  787. op = ""
  788. if isinstance(contexts, dict):
  789. trace = contexts.get("trace", {})
  790. if isinstance(trace, dict):
  791. op = str(trace.get("op", ""))
  792. method: str | None = None
  793. if request:
  794. method = request.method
  795. # TODO tags
  796. group, group_created = TransactionGroup.objects.get_or_create(
  797. project_id=ingest_event.project_id,
  798. transaction=event.transaction[:1024], # Truncate
  799. op=op,
  800. method=method,
  801. )
  802. transactions.append(
  803. TransactionEvent(
  804. group=group,
  805. data={
  806. "request": request.dict() if request else None,
  807. "sdk": event.sdk.dict() if event.sdk else None,
  808. "platform": event.platform,
  809. },
  810. trace_id=trace_id,
  811. event_id=event.event_id,
  812. timestamp=event.timestamp,
  813. start_timestamp=event.start_timestamp,
  814. duration=(event.timestamp - event.start_timestamp).total_seconds()
  815. * 1000,
  816. )
  817. )
  818. TransactionEvent.objects.bulk_create(transactions, ignore_conflicts=True)
  819. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  820. lambda: defaultdict(int)
  821. )
  822. for perf_transaction in transactions:
  823. hour_received = perf_transaction.start_timestamp.replace(
  824. minute=0, second=0, microsecond=0
  825. )
  826. data_stats[hour_received][perf_transaction.group.project_id] += 1
  827. update_statistics(data_stats, False)