process_event.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. from collections import defaultdict
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from operator import itemgetter
  5. from typing import Any, Optional, Union
  6. from urllib.parse import urlparse
  7. from django.contrib.postgres.search import SearchVector
  8. from django.db import connection, transaction
  9. from django.db.models import (
  10. Exists,
  11. F,
  12. OuterRef,
  13. Q,
  14. QuerySet,
  15. Value,
  16. )
  17. from django.db.models.functions import Coalesce, Greatest
  18. from django.db.utils import IntegrityError
  19. from ninja import Schema
  20. from user_agents import parse
  21. from apps.alerts.models import Notification
  22. from apps.difs.models import DebugInformationFile
  23. from apps.difs.tasks import event_difs_resolve_stacktrace
  24. from apps.environments.models import Environment, EnvironmentProject
  25. from apps.issue_events.constants import EventStatus, LogLevel
  26. from apps.issue_events.models import (
  27. Issue,
  28. IssueEvent,
  29. IssueEventType,
  30. IssueHash,
  31. TagKey,
  32. TagValue,
  33. )
  34. from apps.performance.models import TransactionEvent, TransactionGroup
  35. from apps.projects.models import Project
  36. from apps.releases.models import Release
  37. from sentry.culprit import generate_culprit
  38. from sentry.eventtypes.error import ErrorEvent
  39. from sentry.utils.strings import truncatechars
  40. from ..shared.schema.contexts import (
  41. BrowserContext,
  42. ContextsSchema,
  43. DeviceContext,
  44. OSContext,
  45. )
  46. from .javascript_event_processor import JavascriptEventProcessor
  47. from .model_functions import PipeConcat
  48. from .schema import (
  49. ErrorIssueEventSchema,
  50. IngestIssueEvent,
  51. InterchangeIssueEvent,
  52. InterchangeTransactionEvent,
  53. )
  54. from .utils import generate_hash, transform_parameterized_message
  55. @dataclass
  56. class ProcessingEvent:
  57. event: InterchangeIssueEvent
  58. issue_hash: str
  59. title: str
  60. transaction: str
  61. metadata: dict[str, Any]
  62. event_data: dict[str, Any]
  63. event_tags: dict[str, str]
  64. level: Optional[LogLevel] = None
  65. issue_id: Optional[int] = None
  66. issue_created = False
  67. release_id: Optional[int] = None
  68. @dataclass
  69. class IssueUpdate:
  70. last_seen: datetime
  71. search_vector: str
  72. added_count: int = 1
  73. def get_search_vector(event: ProcessingEvent) -> str:
  74. return f"{event.title} {event.transaction}"
  75. Replacable = Union[str, dict, list]
  76. def replace(data: Replacable, match: str, repl: str) -> Replacable:
  77. """A recursive replace function"""
  78. if isinstance(data, dict):
  79. return {k: replace(v, match, repl) for k, v in data.items()}
  80. elif isinstance(data, list):
  81. return [replace(i, match, repl) for i in data]
  82. elif isinstance(data, str):
  83. return data.replace(match, repl)
  84. return data
  85. def sanitize_bad_postgres_chars(data: str):
  86. """
  87. Remove values which are not supported by the postgres string data types
  88. """
  89. known_bads = ["\x00"]
  90. for known_bad in known_bads:
  91. data = data.replace(known_bad, " ")
  92. return data
  93. def sanitize_bad_postgres_json(data: Replacable) -> Replacable:
  94. """
  95. Remove values which are not supported by the postgres JSONB data type
  96. """
  97. known_bads = ["\u0000"]
  98. for known_bad in known_bads:
  99. data = replace(data, known_bad, " ")
  100. return data
  101. def update_issues(processing_events: list[ProcessingEvent]):
  102. """
  103. Update any existing issues based on new statistics
  104. """
  105. issues_to_update: dict[int, IssueUpdate] = {}
  106. for processing_event in processing_events:
  107. if processing_event.issue_created:
  108. break
  109. issue_id = processing_event.issue_id
  110. if issue_id in issues_to_update:
  111. issues_to_update[issue_id].added_count += 1
  112. issues_to_update[
  113. issue_id
  114. ].search_vector += f" {get_search_vector(processing_event)}"
  115. if issues_to_update[issue_id].last_seen < processing_event.event.received:
  116. issues_to_update[issue_id].last_seen = processing_event.event.received
  117. elif issue_id:
  118. issues_to_update[issue_id] = IssueUpdate(
  119. last_seen=processing_event.event.received,
  120. search_vector=get_search_vector(processing_event),
  121. )
  122. for issue_id, value in issues_to_update.items():
  123. Issue.objects.filter(id=issue_id).update(
  124. count=F("count") + value.added_count,
  125. search_vector=PipeConcat(
  126. F("search_vector"), SearchVector(Value(value.search_vector))
  127. ),
  128. last_seen=Greatest(F("last_seen"), value.last_seen),
  129. )
  130. def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
  131. """
  132. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  133. """
  134. if isinstance(obj, Schema) and hasattr(obj, "values"):
  135. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  136. elif isinstance(obj, list):
  137. return [
  138. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  139. ]
  140. return None
  141. def generate_contexts(event: IngestIssueEvent) -> ContextsSchema:
  142. """
  143. Add additional contexts if they aren't already set
  144. """
  145. contexts = event.contexts if event.contexts else ContextsSchema(root={})
  146. if request := event.request:
  147. if isinstance(request.headers, list):
  148. if ua_string := next(
  149. (x[1] for x in request.headers if x[0] == "User-Agent"), None
  150. ):
  151. user_agent = parse(ua_string)
  152. if "browser" not in contexts.root:
  153. contexts.root["browser"] = BrowserContext(
  154. name=user_agent.browser.family,
  155. version=user_agent.browser.version_string,
  156. )
  157. if "os" not in contexts.root:
  158. contexts.root["os"] = OSContext(
  159. name=user_agent.os.family, version=user_agent.os.version_string
  160. )
  161. if "device" not in contexts.root:
  162. device = user_agent.device
  163. contexts.root["device"] = DeviceContext(
  164. family=device.family,
  165. model=device.model,
  166. brand=device.brand,
  167. )
  168. return contexts
  169. def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
  170. """Generate key-value tags based on context and other event data"""
  171. tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
  172. if contexts := event.contexts:
  173. if browser := contexts.root.get("browser"):
  174. if isinstance(browser, BrowserContext):
  175. tags["browser.name"] = browser.name
  176. tags["browser"] = f"{browser.name} {browser.version}"
  177. if os := contexts.root.get("os"):
  178. if isinstance(os, OSContext):
  179. tags["os.name"] = os.name
  180. if device := contexts.root.get("device"):
  181. if isinstance(device, DeviceContext) and device.model:
  182. tags["device"] = device.model
  183. if user := event.user:
  184. if user.id:
  185. tags["user.id"] = user.id
  186. if user.email:
  187. tags["user.email"] = user.email
  188. if user.username:
  189. tags["user.username"] = user.username
  190. if environment := event.environment:
  191. tags["environment"] = environment
  192. if release := event.release:
  193. tags["release"] = release
  194. if server_name := event.server_name:
  195. tags["server_name"] = server_name
  196. # Exclude None values
  197. return {key: value for key, value in tags.items() if value}
  198. def check_set_issue_id(
  199. processing_events: list[ProcessingEvent],
  200. project_id: int,
  201. issue_hash: str,
  202. issue_id: int,
  203. ):
  204. """
  205. It's common to receive two duplicate events at the same time,
  206. where the issue has never been seen before. This is an optimization
  207. that checks if there is a known project/hash. If so, we can infer the
  208. issue_id.
  209. """
  210. for event in processing_events:
  211. if (
  212. event.issue_id is None
  213. and event.event.project_id == project_id
  214. and event.issue_hash == issue_hash
  215. ):
  216. event.issue_id = issue_id
  217. def create_environments(
  218. environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  219. ):
  220. """
  221. Create newly seen environments.
  222. Functions determines which, if any, environments are present in event data
  223. but not the database. Optimized to do a much work in python and reduce queries.
  224. """
  225. environments_to_create = [
  226. Environment(name=name, organization_id=organization_id)
  227. for name, project_id, organization_id in environment_set
  228. if not next(
  229. (
  230. x
  231. for x in projects_with_data
  232. if x["environment_name"] == name and x["id"] == project_id
  233. ),
  234. None,
  235. )
  236. ]
  237. if environments_to_create:
  238. Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True)
  239. query = Q()
  240. for environment in environments_to_create:
  241. query |= Q(
  242. name=environment.name, organization_id=environment.organization_id
  243. )
  244. environments = Environment.objects.filter(query)
  245. environment_projects: list = []
  246. for environment in environments:
  247. project_id = next(
  248. project_id
  249. for (name, project_id, organization_id) in environment_set
  250. if environment.name == name
  251. and environment.organization_id == organization_id
  252. )
  253. environment_projects.append(
  254. EnvironmentProject(project_id=project_id, environment=environment)
  255. )
  256. EnvironmentProject.objects.bulk_create(
  257. environment_projects, ignore_conflicts=True
  258. )
  259. def get_and_create_releases(
  260. release_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  261. ) -> list[tuple[str, int, int]]:
  262. """
  263. Create newly seen releases.
  264. functions determines which, if any, releases are present in event data
  265. but not the database. Optimized to do a much work in python and reduce queries.
  266. Return list of tuples: Release version, project_id, release_id
  267. """
  268. releases_to_create = [
  269. Release(version=release_name, organization_id=organization_id)
  270. for release_name, project_id, organization_id in release_set
  271. if not next(
  272. (
  273. x
  274. for x in projects_with_data
  275. if x["release_name"] == release_name and x["id"] == project_id
  276. ),
  277. None,
  278. )
  279. ]
  280. releases: Union[list, QuerySet] = []
  281. if releases_to_create:
  282. # Create database records for any release that doesn't exist
  283. Release.objects.bulk_create(releases_to_create, ignore_conflicts=True)
  284. query = Q()
  285. for release in releases_to_create:
  286. query |= Q(version=release.version, organization_id=release.organization_id)
  287. releases = Release.objects.filter(query)
  288. ReleaseProject = Release.projects.through
  289. release_projects = [
  290. ReleaseProject(
  291. release=release,
  292. project_id=next(
  293. project_id
  294. for (version, project_id, organization_id) in release_set
  295. if release.version == version
  296. and release.organization_id == organization_id
  297. ),
  298. )
  299. for release in releases
  300. ]
  301. ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True)
  302. return [
  303. (
  304. version,
  305. project_id,
  306. next(
  307. (
  308. project["release_id"]
  309. for project in projects_with_data
  310. if project["release_name"] == version
  311. and project["id"] == project_id
  312. ),
  313. next(
  314. (
  315. release.id
  316. for release in releases
  317. if release.version == version
  318. and release.organization_id == organization_id
  319. ),
  320. 0,
  321. ),
  322. ),
  323. )
  324. for version, project_id, organization_id in release_set
  325. ]
  326. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  327. """
  328. Accepts a list of events to ingest. Events should be:
  329. - Few enough to save in a single DB call
  330. - Permission is already checked, these events are to write to the DB
  331. - Some invalid events are tolerated (ignored), including duplicate event id
  332. When there is an error in this function, care should be taken as to when to log,
  333. error, or ignore. If the SDK sends "weird" data, we want to log that.
  334. It's better to save a minimal event than to ignore it.
  335. """
  336. # Fetch any needed releases, environments, and whether there is a dif file association
  337. # Get unique release/environment for each project_id
  338. release_set = {
  339. (event.payload.release, event.project_id, event.organization_id)
  340. for event in ingest_events
  341. if event.payload.release
  342. }
  343. environment_set = {
  344. (event.payload.environment[:255], event.project_id, event.organization_id)
  345. for event in ingest_events
  346. if event.payload.environment
  347. }
  348. project_set = {project_id for _, project_id, _ in release_set}.union(
  349. {project_id for _, project_id, _ in environment_set}
  350. )
  351. release_version_set = {version for version, _, _ in release_set}
  352. environment_name_set = {name for name, _, _ in environment_set}
  353. projects_with_data = (
  354. Project.objects.filter(id__in=project_set)
  355. .annotate(
  356. has_difs=Exists(
  357. DebugInformationFile.objects.filter(project_id=OuterRef("pk"))
  358. ),
  359. release_id=Coalesce("releases__id", Value(None)),
  360. release_name=Coalesce("releases__version", Value(None)),
  361. environment_id=Coalesce("environment__id", Value(None)),
  362. environment_name=Coalesce("environment__name", Value(None)),
  363. )
  364. .filter(release_name__in=release_version_set.union({None}))
  365. .filter(environment_name__in=environment_name_set.union({None}))
  366. .values(
  367. "id",
  368. "has_difs",
  369. "release_id",
  370. "release_name",
  371. "environment_id",
  372. "environment_name",
  373. )
  374. )
  375. releases = get_and_create_releases(release_set, projects_with_data)
  376. create_environments(environment_set, projects_with_data)
  377. # Collected/calculated event data while processing
  378. processing_events: list[ProcessingEvent] = []
  379. # Collect Q objects for bulk issue hash lookup
  380. q_objects = Q()
  381. for ingest_event in ingest_events:
  382. event_data: dict[str, Any] = {}
  383. event = ingest_event.payload
  384. event.contexts = generate_contexts(event)
  385. event_tags = generate_tags(event)
  386. title = ""
  387. culprit = ""
  388. metadata: dict[str, Any] = {}
  389. release_id = next(
  390. (
  391. release_id
  392. for version, project_id, release_id in releases
  393. if version == event_tags.get("release")
  394. and ingest_event.project_id == project_id
  395. ),
  396. None,
  397. )
  398. if event.platform in ("javascript", "node") and release_id:
  399. JavascriptEventProcessor(release_id, event).transform()
  400. elif (
  401. isinstance(event, ErrorIssueEventSchema)
  402. and event.exception
  403. and next(
  404. (
  405. project["has_difs"]
  406. for project in projects_with_data
  407. if project["id"] == ingest_event.project_id
  408. ),
  409. False,
  410. )
  411. ):
  412. event_difs_resolve_stacktrace(event, ingest_event.project_id)
  413. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  414. sentry_event = ErrorEvent()
  415. metadata = sentry_event.get_metadata(event.dict())
  416. if event.type == IssueEventType.ERROR and metadata:
  417. full_title = sentry_event.get_title(metadata)
  418. else:
  419. message = event.message if event.message else event.logentry
  420. full_title = (
  421. transform_parameterized_message(message)
  422. if message
  423. else "<untitled>"
  424. )
  425. culprit = (
  426. event.transaction
  427. if event.transaction
  428. else generate_culprit(event.dict())
  429. )
  430. title = truncatechars(full_title)
  431. culprit = sentry_event.get_location(event.dict())
  432. elif event.type == IssueEventType.CSP:
  433. humanized_directive = event.csp.effective_directive.replace("-src", "")
  434. uri = urlparse(event.csp.blocked_uri).netloc
  435. full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
  436. culprit = event.csp.effective_directive
  437. event_data["csp"] = event.csp.dict()
  438. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  439. if metadata:
  440. event_data["metadata"] = metadata
  441. if platform := event.platform:
  442. event_data["platform"] = platform
  443. if modules := event.modules:
  444. event_data["modules"] = modules
  445. if sdk := event.sdk:
  446. event_data["sdk"] = sdk.dict(exclude_none=True)
  447. if request := event.request:
  448. event_data["request"] = request.dict(exclude_none=True)
  449. if environment := event.environment:
  450. event_data["environment"] = environment
  451. # Message is str
  452. # Logentry is {"params": etc} Message format
  453. if logentry := event.logentry:
  454. event_data["logentry"] = logentry.dict(exclude_none=True)
  455. elif message := event.message:
  456. if isinstance(message, str):
  457. event_data["logentry"] = {"formatted": message}
  458. else:
  459. event_data["logentry"] = message.dict(exclude_none=True)
  460. if message := event.message:
  461. event_data["message"] = (
  462. message if isinstance(message, str) else message.formatted
  463. )
  464. # When blank, the API will default to the title anyway
  465. elif title != full_title:
  466. # If the title is truncated, store the full title
  467. event_data["message"] = full_title
  468. if breadcrumbs := event.breadcrumbs:
  469. event_data["breadcrumbs"] = devalue(breadcrumbs)
  470. if exception := event.exception:
  471. event_data["exception"] = devalue(exception)
  472. if extra := event.extra:
  473. event_data["extra"] = extra
  474. if user := event.user:
  475. event_data["user"] = user.dict(exclude_none=True)
  476. if contexts := event.contexts:
  477. event_data["contexts"] = contexts.dict(exclude_none=True)
  478. processing_events.append(
  479. ProcessingEvent(
  480. event=ingest_event,
  481. issue_hash=issue_hash,
  482. title=title,
  483. level=LogLevel.from_string(event.level) if event.level else None,
  484. transaction=culprit,
  485. metadata=metadata,
  486. event_data=event_data,
  487. event_tags=event_tags,
  488. release_id=release_id,
  489. )
  490. )
  491. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  492. hash_queryset = IssueHash.objects.filter(q_objects).values(
  493. "value", "project_id", "issue_id", "issue__status"
  494. )
  495. issue_events: list[IssueEvent] = []
  496. issues_to_reopen = []
  497. for processing_event in processing_events:
  498. event_type = processing_event.event.payload.type
  499. project_id = processing_event.event.project_id
  500. issue_defaults = {
  501. "type": event_type,
  502. "title": sanitize_bad_postgres_chars(processing_event.title),
  503. "metadata": sanitize_bad_postgres_json(processing_event.metadata),
  504. "first_seen": processing_event.event.received,
  505. "last_seen": processing_event.event.received,
  506. }
  507. if level := processing_event.level:
  508. issue_defaults["level"] = level
  509. for hash_obj in hash_queryset:
  510. if (
  511. hash_obj["value"].hex == processing_event.issue_hash
  512. and hash_obj["project_id"] == project_id
  513. ):
  514. processing_event.issue_id = hash_obj["issue_id"]
  515. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  516. issues_to_reopen.append(hash_obj["issue_id"])
  517. break
  518. if not processing_event.issue_id:
  519. try:
  520. with transaction.atomic():
  521. issue = Issue.objects.create(
  522. project_id=project_id,
  523. search_vector=SearchVector(Value(issue_defaults["title"])),
  524. **issue_defaults,
  525. )
  526. new_issue_hash = IssueHash.objects.create(
  527. issue=issue,
  528. value=processing_event.issue_hash,
  529. project_id=project_id,
  530. )
  531. check_set_issue_id(
  532. processing_events,
  533. issue.project_id,
  534. new_issue_hash.value,
  535. issue.id,
  536. )
  537. processing_event.issue_id = issue.id
  538. processing_event.issue_created = True
  539. except IntegrityError:
  540. processing_event.issue_id = IssueHash.objects.get(
  541. project_id=project_id, value=processing_event.issue_hash
  542. ).issue_id
  543. issue_events.append(
  544. IssueEvent(
  545. id=processing_event.event.event_id,
  546. issue_id=processing_event.issue_id,
  547. type=event_type,
  548. level=processing_event.level
  549. if processing_event.level
  550. else LogLevel.ERROR,
  551. timestamp=processing_event.event.payload.timestamp,
  552. received=processing_event.event.received,
  553. title=processing_event.title,
  554. transaction=processing_event.transaction,
  555. data=sanitize_bad_postgres_json(processing_event.event_data),
  556. tags=processing_event.event_tags,
  557. release_id=processing_event.release_id,
  558. )
  559. )
  560. update_issues(processing_events)
  561. if issues_to_reopen:
  562. Issue.objects.filter(id__in=issues_to_reopen).update(
  563. status=EventStatus.UNRESOLVED
  564. )
  565. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  566. # ignore_conflicts because we could have an invalid duplicate event_id, received
  567. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
  568. # Group events by time and project for event count statistics
  569. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  570. lambda: defaultdict(int)
  571. )
  572. for processing_event in processing_events:
  573. hour_received = processing_event.event.received.replace(
  574. minute=0, second=0, microsecond=0
  575. )
  576. data_stats[hour_received][processing_event.event.project_id] += 1
  577. update_tags(processing_events)
  578. update_statistics(data_stats)
  579. def update_statistics(
  580. project_event_stats: defaultdict[datetime, defaultdict[int, int]], is_issue=True
  581. ):
  582. # Flatten data for a sql param friendly format and sort to mitigate deadlocks
  583. data = sorted(
  584. [
  585. [year, key, value]
  586. for year, inner_dict in project_event_stats.items()
  587. for key, value in inner_dict.items()
  588. ],
  589. key=itemgetter(0, 1),
  590. )
  591. table = (
  592. "projects_issueeventprojecthourlystatistic"
  593. if is_issue
  594. else "projects_transactioneventprojecthourlystatistic"
  595. )
  596. # Django ORM cannot support F functions in a bulk_update
  597. # psycopg does not support execute_values
  598. # https://github.com/psycopg/psycopg/issues/114
  599. with connection.cursor() as cursor:
  600. args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data)
  601. sql = (
  602. f"INSERT INTO {table} (date, project_id, count)\n"
  603. f"VALUES {args_str}\n"
  604. "ON CONFLICT (project_id, date)\n"
  605. f"DO UPDATE SET count = {table}.count + EXCLUDED.count;"
  606. )
  607. cursor.execute(sql)
  608. TagStats = defaultdict[
  609. datetime,
  610. defaultdict[int, defaultdict[int, defaultdict[int, int]]],
  611. ]
  612. def update_tags(processing_events: list[ProcessingEvent]):
  613. keys = sorted({key for d in processing_events for key in d.event_tags.keys()})
  614. values = sorted(
  615. {value for d in processing_events for value in d.event_tags.values()}
  616. )
  617. TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True)
  618. TagValue.objects.bulk_create(
  619. [TagValue(value=value) for value in values], ignore_conflicts=True
  620. )
  621. # Postgres cannot return ids with ignore_conflicts
  622. tag_keys = {
  623. tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values()
  624. }
  625. tag_values = {
  626. tag["value"]: tag["id"]
  627. for tag in TagValue.objects.filter(value__in=values).values()
  628. }
  629. tag_stats: TagStats = defaultdict(
  630. lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
  631. )
  632. for processing_event in processing_events:
  633. if processing_event.issue_id is None:
  634. continue
  635. # Group by day. More granular allows for a better search
  636. # Less granular yields better tag filter performance
  637. minute_received = processing_event.event.received.replace(
  638. hour=0, minute=0, second=0, microsecond=0
  639. )
  640. for key, value in processing_event.event_tags.items():
  641. key_id = tag_keys[key]
  642. value_id = tag_values[value]
  643. tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1
  644. if not tag_stats:
  645. return
  646. # Sort to mitigate deadlocks
  647. data = sorted(
  648. [
  649. [date, issue_id, key_id, value_id, count]
  650. for date, d1 in tag_stats.items()
  651. for issue_id, d2 in d1.items()
  652. for key_id, d3 in d2.items()
  653. for value_id, count in d3.items()
  654. ],
  655. key=itemgetter(0, 1, 2, 3),
  656. )
  657. with connection.cursor() as cursor:
  658. args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data)
  659. sql = (
  660. "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n"
  661. f"VALUES {args_str}\n"
  662. "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n"
  663. "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;"
  664. )
  665. cursor.execute(sql)
  666. # Transactions
  667. def process_transaction_events(ingest_events: list[InterchangeTransactionEvent]):
  668. release_set = {
  669. (event.payload.release, event.project_id, event.organization_id)
  670. for event in ingest_events
  671. if event.payload.release
  672. }
  673. environment_set = {
  674. (event.payload.environment[:255], event.project_id, event.organization_id)
  675. for event in ingest_events
  676. if event.payload.environment
  677. }
  678. project_set = {project_id for _, project_id, _ in release_set}.union(
  679. {project_id for _, project_id, _ in environment_set}
  680. )
  681. release_version_set = {version for version, _, _ in release_set}
  682. environment_name_set = {name for name, _, _ in environment_set}
  683. projects_with_data = (
  684. Project.objects.filter(id__in=project_set)
  685. .annotate(
  686. release_id=Coalesce("releases__id", Value(None)),
  687. release_name=Coalesce("releases__version", Value(None)),
  688. environment_id=Coalesce("environment__id", Value(None)),
  689. environment_name=Coalesce("environment__name", Value(None)),
  690. )
  691. .filter(release_name__in=release_version_set.union({None}))
  692. .filter(environment_name__in=environment_name_set.union({None}))
  693. .values(
  694. "id",
  695. "release_id",
  696. "release_name",
  697. "environment_id",
  698. "environment_name",
  699. )
  700. )
  701. get_and_create_releases(release_set, projects_with_data)
  702. create_environments(environment_set, projects_with_data)
  703. transactions = []
  704. for ingest_event in ingest_events:
  705. event = ingest_event.payload
  706. contexts = event.contexts
  707. request = event.request
  708. trace_id = contexts["trace"]["trace_id"]
  709. op = ""
  710. if isinstance(contexts, dict):
  711. trace = contexts.get("trace", {})
  712. if isinstance(trace, dict):
  713. op = str(trace.get("op", ""))
  714. method: str | None = None
  715. if request:
  716. method = request.method
  717. # TODO tags
  718. group, group_created = TransactionGroup.objects.get_or_create(
  719. project_id=ingest_event.project_id,
  720. transaction=event.transaction,
  721. op=op,
  722. method=method,
  723. )
  724. transactions.append(
  725. TransactionEvent(
  726. group=group,
  727. data={
  728. "request": request.dict() if request else None,
  729. "sdk": event.sdk.dict() if event.sdk else None,
  730. "platform": event.platform,
  731. },
  732. trace_id=trace_id,
  733. event_id=event.event_id,
  734. timestamp=event.timestamp,
  735. start_timestamp=event.start_timestamp,
  736. duration=(event.timestamp - event.start_timestamp).total_seconds()
  737. * 1000,
  738. )
  739. )
  740. TransactionEvent.objects.bulk_create(transactions, ignore_conflicts=True)
  741. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  742. lambda: defaultdict(int)
  743. )
  744. for perf_transaction in transactions:
  745. hour_received = perf_transaction.start_timestamp.replace(
  746. minute=0, second=0, microsecond=0
  747. )
  748. data_stats[hour_received][perf_transaction.group.project_id] += 1
  749. update_statistics(data_stats, False)