process_event.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. from collections import defaultdict
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from operator import itemgetter
  5. from typing import Any, Optional, Union
  6. from urllib.parse import urlparse
  7. from django.conf import settings
  8. from django.contrib.postgres.search import SearchVector
  9. from django.db import connection, transaction
  10. from django.db.models import (
  11. Exists,
  12. F,
  13. OuterRef,
  14. Q,
  15. QuerySet,
  16. Value,
  17. )
  18. from django.db.models.functions import Coalesce, Greatest
  19. from django.db.utils import IntegrityError
  20. from django_redis import get_redis_connection
  21. from ninja import Schema
  22. from user_agents import parse
  23. from apps.alerts.constants import ISSUE_IDS_KEY
  24. from apps.alerts.models import Notification
  25. from apps.difs.models import DebugInformationFile
  26. from apps.difs.tasks import event_difs_resolve_stacktrace
  27. from apps.environments.models import Environment, EnvironmentProject
  28. from apps.issue_events.constants import EventStatus, LogLevel
  29. from apps.issue_events.models import (
  30. Issue,
  31. IssueEvent,
  32. IssueEventType,
  33. IssueHash,
  34. TagKey,
  35. TagValue,
  36. )
  37. from apps.performance.models import TransactionEvent, TransactionGroup
  38. from apps.projects.models import Project
  39. from apps.releases.models import Release
  40. from sentry.culprit import generate_culprit
  41. from sentry.eventtypes.error import ErrorEvent
  42. from sentry.utils.strings import truncatechars
  43. from ..shared.schema.contexts import (
  44. BrowserContext,
  45. Contexts,
  46. DeviceContext,
  47. OSContext,
  48. )
  49. from .javascript_event_processor import JavascriptEventProcessor
  50. from .model_functions import PipeConcat
  51. from .schema import (
  52. ErrorIssueEventSchema,
  53. IngestIssueEvent,
  54. InterchangeIssueEvent,
  55. InterchangeTransactionEvent,
  56. )
  57. from .utils import generate_hash, remove_bad_chars, transform_parameterized_message
  58. @dataclass
  59. class ProcessingEvent:
  60. event: InterchangeIssueEvent
  61. issue_hash: str
  62. title: str
  63. transaction: str
  64. metadata: dict[str, Any]
  65. event_data: dict[str, Any]
  66. event_tags: dict[str, str]
  67. level: Optional[LogLevel] = None
  68. issue_id: Optional[int] = None
  69. issue_created = False
  70. release_id: Optional[int] = None
  71. @dataclass
  72. class IssueUpdate:
  73. last_seen: datetime
  74. search_vector: str
  75. added_count: int = 1
  76. def get_search_vector(event: ProcessingEvent) -> str:
  77. return f"{event.title} {event.transaction}"
  78. def update_issues(processing_events: list[ProcessingEvent]):
  79. """
  80. Update any existing issues based on new statistics
  81. """
  82. issues_to_update: dict[int, IssueUpdate] = {}
  83. for processing_event in processing_events:
  84. if processing_event.issue_created:
  85. break
  86. issue_id = processing_event.issue_id
  87. if issue_id in issues_to_update:
  88. issues_to_update[issue_id].added_count += 1
  89. issues_to_update[
  90. issue_id
  91. ].search_vector += f" {get_search_vector(processing_event)}"
  92. if issues_to_update[issue_id].last_seen < processing_event.event.received:
  93. issues_to_update[issue_id].last_seen = processing_event.event.received
  94. elif issue_id:
  95. issues_to_update[issue_id] = IssueUpdate(
  96. last_seen=processing_event.event.received,
  97. search_vector=get_search_vector(processing_event),
  98. )
  99. for issue_id, value in issues_to_update.items():
  100. Issue.objects.filter(id=issue_id).update(
  101. count=F("count") + value.added_count,
  102. search_vector=PipeConcat(
  103. F("search_vector"), SearchVector(Value(value.search_vector))
  104. ),
  105. last_seen=Greatest(F("last_seen"), value.last_seen),
  106. )
  107. def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
  108. """
  109. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  110. """
  111. if isinstance(obj, Schema) and hasattr(obj, "values"):
  112. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  113. elif isinstance(obj, list):
  114. return [
  115. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  116. ]
  117. return None
  118. def generate_contexts(event: IngestIssueEvent) -> Contexts:
  119. """
  120. Add additional contexts if they aren't already set
  121. """
  122. contexts = event.contexts if event.contexts else Contexts({})
  123. if request := event.request:
  124. if isinstance(request.headers, list):
  125. if ua_string := next(
  126. (x[1] for x in request.headers if x[0] == "User-Agent"), None
  127. ):
  128. user_agent = parse(ua_string)
  129. if "browser" not in contexts:
  130. contexts["browser"] = BrowserContext(
  131. name=user_agent.browser.family,
  132. version=user_agent.browser.version_string,
  133. )
  134. if "os" not in contexts:
  135. contexts["os"] = OSContext(
  136. name=user_agent.os.family, version=user_agent.os.version_string
  137. )
  138. if "device" not in contexts:
  139. device = user_agent.device
  140. contexts["device"] = DeviceContext(
  141. family=device.family,
  142. model=device.model,
  143. brand=device.brand,
  144. )
  145. return contexts
  146. def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
  147. """Generate key-value tags based on context and other event data"""
  148. tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
  149. if contexts := event.contexts:
  150. if browser := contexts.get("browser"):
  151. if isinstance(browser, BrowserContext):
  152. tags["browser.name"] = browser.name
  153. tags["browser"] = f"{browser.name} {browser.version}"
  154. if os := contexts.get("os"):
  155. if isinstance(os, OSContext):
  156. tags["os.name"] = os.name
  157. if device := contexts.get("device"):
  158. if isinstance(device, DeviceContext) and device.model:
  159. tags["device"] = device.model
  160. if user := event.user:
  161. if user.id:
  162. tags["user.id"] = user.id
  163. if user.email:
  164. tags["user.email"] = user.email
  165. if user.username:
  166. tags["user.username"] = user.username
  167. if environment := event.environment:
  168. tags["environment"] = environment
  169. if release := event.release:
  170. tags["release"] = release
  171. if server_name := event.server_name:
  172. tags["server_name"] = server_name
  173. # Exclude None values
  174. return {key: value for key, value in tags.items() if value}
  175. def check_set_issue_id(
  176. processing_events: list[ProcessingEvent],
  177. project_id: int,
  178. issue_hash: str,
  179. issue_id: int,
  180. ):
  181. """
  182. It's common to receive two duplicate events at the same time,
  183. where the issue has never been seen before. This is an optimization
  184. that checks if there is a known project/hash. If so, we can infer the
  185. issue_id.
  186. """
  187. for event in processing_events:
  188. if (
  189. event.issue_id is None
  190. and event.event.project_id == project_id
  191. and event.issue_hash == issue_hash
  192. ):
  193. event.issue_id = issue_id
  194. def create_environments(
  195. environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  196. ):
  197. """
  198. Create newly seen environments.
  199. Functions determines which, if any, environments are present in event data
  200. but not the database. Optimized to do a much work in python and reduce queries.
  201. """
  202. environments_to_create = [
  203. Environment(name=name, organization_id=organization_id)
  204. for name, project_id, organization_id in environment_set
  205. if not next(
  206. (
  207. x
  208. for x in projects_with_data
  209. if x["environment_name"] == name and x["id"] == project_id
  210. ),
  211. None,
  212. )
  213. ]
  214. if environments_to_create:
  215. Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True)
  216. query = Q()
  217. for environment in environments_to_create:
  218. query |= Q(
  219. name=environment.name, organization_id=environment.organization_id
  220. )
  221. environments = Environment.objects.filter(query)
  222. environment_projects: list = []
  223. for environment in environments:
  224. project_id = next(
  225. project_id
  226. for (name, project_id, organization_id) in environment_set
  227. if environment.name == name
  228. and environment.organization_id == organization_id
  229. )
  230. environment_projects.append(
  231. EnvironmentProject(project_id=project_id, environment=environment)
  232. )
  233. EnvironmentProject.objects.bulk_create(
  234. environment_projects, ignore_conflicts=True
  235. )
  236. def get_and_create_releases(
  237. release_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  238. ) -> list[tuple[str, int, int]]:
  239. """
  240. Create newly seen releases.
  241. functions determines which, if any, releases are present in event data
  242. but not the database. Optimized to do a much work in python and reduce queries.
  243. Return list of tuples: Release version, project_id, release_id
  244. """
  245. releases_to_create = [
  246. Release(version=release_name, organization_id=organization_id)
  247. for release_name, project_id, organization_id in release_set
  248. if not next(
  249. (
  250. x
  251. for x in projects_with_data
  252. if x["release_name"] == release_name and x["id"] == project_id
  253. ),
  254. None,
  255. )
  256. ]
  257. releases: Union[list, QuerySet] = []
  258. if releases_to_create:
  259. # Create database records for any release that doesn't exist
  260. Release.objects.bulk_create(releases_to_create, ignore_conflicts=True)
  261. query = Q()
  262. for release in releases_to_create:
  263. query |= Q(version=release.version, organization_id=release.organization_id)
  264. releases = Release.objects.filter(query)
  265. ReleaseProject = Release.projects.through
  266. release_projects = [
  267. ReleaseProject(
  268. release=release,
  269. project_id=next(
  270. project_id
  271. for (version, project_id, organization_id) in release_set
  272. if release.version == version
  273. and release.organization_id == organization_id
  274. ),
  275. )
  276. for release in releases
  277. ]
  278. ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True)
  279. return [
  280. (
  281. version,
  282. project_id,
  283. next(
  284. (
  285. project["release_id"]
  286. for project in projects_with_data
  287. if project["release_name"] == version
  288. and project["id"] == project_id
  289. ),
  290. next(
  291. (
  292. release.id
  293. for release in releases
  294. if release.version == version
  295. and release.organization_id == organization_id
  296. ),
  297. 0,
  298. ),
  299. ),
  300. )
  301. for version, project_id, organization_id in release_set
  302. ]
  303. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  304. """
  305. Accepts a list of events to ingest. Events should be:
  306. - Few enough to save in a single DB call
  307. - Permission is already checked, these events are to write to the DB
  308. - Some invalid events are tolerated (ignored), including duplicate event id
  309. When there is an error in this function, care should be taken as to when to log,
  310. error, or ignore. If the SDK sends "weird" data, we want to log that.
  311. It's better to save a minimal event than to ignore it.
  312. """
  313. # Fetch any needed releases, environments, and whether there is a dif file association
  314. # Get unique release/environment for each project_id
  315. release_set = {
  316. (event.payload.release, event.project_id, event.organization_id)
  317. for event in ingest_events
  318. if event.payload.release
  319. }
  320. environment_set = {
  321. (event.payload.environment[:255], event.project_id, event.organization_id)
  322. for event in ingest_events
  323. if event.payload.environment
  324. }
  325. project_set = {project_id for _, project_id, _ in release_set}.union(
  326. {project_id for _, project_id, _ in environment_set}
  327. )
  328. release_version_set = {version for version, _, _ in release_set}
  329. environment_name_set = {name for name, _, _ in environment_set}
  330. projects_with_data = (
  331. Project.objects.filter(id__in=project_set)
  332. .annotate(
  333. has_difs=Exists(
  334. DebugInformationFile.objects.filter(project_id=OuterRef("pk"))
  335. ),
  336. release_id=Coalesce("releases__id", Value(None)),
  337. release_name=Coalesce("releases__version", Value(None)),
  338. environment_id=Coalesce("environment__id", Value(None)),
  339. environment_name=Coalesce("environment__name", Value(None)),
  340. )
  341. .filter(release_name__in=release_version_set.union({None}))
  342. .filter(environment_name__in=environment_name_set.union({None}))
  343. .values(
  344. "id",
  345. "has_difs",
  346. "release_id",
  347. "release_name",
  348. "environment_id",
  349. "environment_name",
  350. )
  351. )
  352. releases = get_and_create_releases(release_set, projects_with_data)
  353. create_environments(environment_set, projects_with_data)
  354. # Collected/calculated event data while processing
  355. processing_events: list[ProcessingEvent] = []
  356. # Collect Q objects for bulk issue hash lookup
  357. q_objects = Q()
  358. for ingest_event in ingest_events:
  359. event_data: dict[str, Any] = {}
  360. event = ingest_event.payload
  361. event.contexts = generate_contexts(event)
  362. event_tags = generate_tags(event)
  363. title = ""
  364. culprit = ""
  365. metadata: dict[str, Any] = {}
  366. release_id = next(
  367. (
  368. release_id
  369. for version, project_id, release_id in releases
  370. if version == event_tags.get("release")
  371. and ingest_event.project_id == project_id
  372. ),
  373. None,
  374. )
  375. if event.platform in ("javascript", "node") and release_id:
  376. JavascriptEventProcessor(release_id, event).transform()
  377. elif (
  378. isinstance(event, ErrorIssueEventSchema)
  379. and event.exception
  380. and next(
  381. (
  382. project["has_difs"]
  383. for project in projects_with_data
  384. if project["id"] == ingest_event.project_id
  385. ),
  386. False,
  387. )
  388. ):
  389. event_difs_resolve_stacktrace(event, ingest_event.project_id)
  390. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  391. sentry_event = ErrorEvent()
  392. metadata = sentry_event.get_metadata(event.dict())
  393. if event.type == IssueEventType.ERROR and metadata:
  394. full_title = sentry_event.get_title(metadata)
  395. else:
  396. message = event.message if event.message else event.logentry
  397. full_title = (
  398. transform_parameterized_message(message)
  399. if message
  400. else "<untitled>"
  401. )
  402. culprit = (
  403. event.transaction
  404. if event.transaction
  405. else generate_culprit(event.dict())
  406. )
  407. title = truncatechars(full_title)
  408. culprit = sentry_event.get_location(event.dict())
  409. elif event.type == IssueEventType.CSP:
  410. humanized_directive = event.csp.effective_directive.replace("-src", "")
  411. uri = urlparse(event.csp.blocked_uri).netloc
  412. full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
  413. culprit = event.csp.effective_directive
  414. event_data["csp"] = event.csp.dict()
  415. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  416. if metadata:
  417. event_data["metadata"] = metadata
  418. if platform := event.platform:
  419. event_data["platform"] = platform
  420. if modules := event.modules:
  421. event_data["modules"] = modules
  422. if sdk := event.sdk:
  423. event_data["sdk"] = sdk.dict(exclude_none=True)
  424. if request := event.request:
  425. event_data["request"] = request.dict(exclude_none=True)
  426. if environment := event.environment:
  427. event_data["environment"] = environment
  428. # Message is str
  429. # Logentry is {"params": etc} Message format
  430. if logentry := event.logentry:
  431. event_data["logentry"] = logentry.dict(exclude_none=True)
  432. elif message := event.message:
  433. if isinstance(message, str):
  434. event_data["logentry"] = {"formatted": message}
  435. else:
  436. event_data["logentry"] = message.dict(exclude_none=True)
  437. if message := event.message:
  438. event_data["message"] = (
  439. message if isinstance(message, str) else message.formatted
  440. )
  441. # When blank, the API will default to the title anyway
  442. elif title != full_title:
  443. # If the title is truncated, store the full title
  444. event_data["message"] = full_title
  445. if breadcrumbs := event.breadcrumbs:
  446. event_data["breadcrumbs"] = devalue(breadcrumbs)
  447. if exception := event.exception:
  448. event_data["exception"] = devalue(exception)
  449. if extra := event.extra:
  450. event_data["extra"] = extra
  451. if user := event.user:
  452. event_data["user"] = user.dict(exclude_none=True)
  453. if contexts := event.contexts:
  454. # Contexts may contain dict or Schema
  455. event_data["contexts"] = {
  456. key: value.dict(exclude_none=True)
  457. if isinstance(value, Schema)
  458. else value
  459. for key, value in contexts.items()
  460. }
  461. processing_events.append(
  462. ProcessingEvent(
  463. event=ingest_event,
  464. issue_hash=issue_hash,
  465. title=title,
  466. level=LogLevel.from_string(event.level) if event.level else None,
  467. transaction=culprit,
  468. metadata=metadata,
  469. event_data=event_data,
  470. event_tags=event_tags,
  471. release_id=release_id,
  472. )
  473. )
  474. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  475. hash_queryset = IssueHash.objects.filter(q_objects).values(
  476. "value", "project_id", "issue_id", "issue__status"
  477. )
  478. issue_events: list[IssueEvent] = []
  479. issues_to_reopen = []
  480. for processing_event in processing_events:
  481. event_type = processing_event.event.payload.type
  482. project_id = processing_event.event.project_id
  483. issue_defaults = {
  484. "type": event_type,
  485. "title": remove_bad_chars(processing_event.title),
  486. "metadata": remove_bad_chars(processing_event.metadata),
  487. "first_seen": processing_event.event.received,
  488. "last_seen": processing_event.event.received,
  489. }
  490. if level := processing_event.level:
  491. issue_defaults["level"] = level
  492. for hash_obj in hash_queryset:
  493. if (
  494. hash_obj["value"].hex == processing_event.issue_hash
  495. and hash_obj["project_id"] == project_id
  496. ):
  497. processing_event.issue_id = hash_obj["issue_id"]
  498. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  499. issues_to_reopen.append(hash_obj["issue_id"])
  500. break
  501. if not processing_event.issue_id:
  502. try:
  503. with transaction.atomic():
  504. issue = Issue.objects.create(
  505. project_id=project_id,
  506. search_vector=SearchVector(Value(issue_defaults["title"])),
  507. **issue_defaults,
  508. )
  509. new_issue_hash = IssueHash.objects.create(
  510. issue=issue,
  511. value=processing_event.issue_hash,
  512. project_id=project_id,
  513. )
  514. check_set_issue_id(
  515. processing_events,
  516. issue.project_id,
  517. new_issue_hash.value,
  518. issue.id,
  519. )
  520. processing_event.issue_id = issue.id
  521. processing_event.issue_created = True
  522. except IntegrityError:
  523. processing_event.issue_id = IssueHash.objects.get(
  524. project_id=project_id, value=processing_event.issue_hash
  525. ).issue_id
  526. issue_events.append(
  527. IssueEvent(
  528. id=processing_event.event.event_id,
  529. issue_id=processing_event.issue_id,
  530. type=event_type,
  531. level=processing_event.level
  532. if processing_event.level
  533. else LogLevel.ERROR,
  534. timestamp=processing_event.event.payload.timestamp,
  535. received=processing_event.event.received,
  536. title=processing_event.title,
  537. transaction=processing_event.transaction,
  538. data=remove_bad_chars(processing_event.event_data),
  539. tags=processing_event.event_tags,
  540. release_id=processing_event.release_id,
  541. )
  542. )
  543. update_issues(processing_events)
  544. if settings.CACHE_IS_REDIS:
  545. # Add set of issue_ids for alerts to process later
  546. with get_redis_connection("default") as con:
  547. if (
  548. con.sadd(
  549. ISSUE_IDS_KEY, *{event.issue_id for event in processing_events}
  550. )
  551. > 0
  552. ):
  553. # Set a long expiration time when a key is added
  554. # We want all keys to have a long "sanity check" TTL to avoid redis out
  555. # of memory errors (we can't ensure end users use all keys lru eviction)
  556. con.expire(ISSUE_IDS_KEY, 3600)
  557. if issues_to_reopen:
  558. Issue.objects.filter(id__in=issues_to_reopen).update(
  559. status=EventStatus.UNRESOLVED
  560. )
  561. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  562. # ignore_conflicts because we could have an invalid duplicate event_id, received
  563. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
  564. # Group events by time and project for event count statistics
  565. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  566. lambda: defaultdict(int)
  567. )
  568. for processing_event in processing_events:
  569. hour_received = processing_event.event.received.replace(
  570. minute=0, second=0, microsecond=0
  571. )
  572. data_stats[hour_received][processing_event.event.project_id] += 1
  573. update_tags(processing_events)
  574. update_statistics(data_stats)
  575. def update_statistics(
  576. project_event_stats: defaultdict[datetime, defaultdict[int, int]], is_issue=True
  577. ):
  578. # Flatten data for a sql param friendly format and sort to mitigate deadlocks
  579. data = sorted(
  580. [
  581. [year, key, value]
  582. for year, inner_dict in project_event_stats.items()
  583. for key, value in inner_dict.items()
  584. ],
  585. key=itemgetter(0, 1),
  586. )
  587. table = (
  588. "projects_issueeventprojecthourlystatistic"
  589. if is_issue
  590. else "projects_transactioneventprojecthourlystatistic"
  591. )
  592. # Django ORM cannot support F functions in a bulk_update
  593. # psycopg does not support execute_values
  594. # https://github.com/psycopg/psycopg/issues/114
  595. with connection.cursor() as cursor:
  596. args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data)
  597. sql = (
  598. f"INSERT INTO {table} (date, project_id, count)\n"
  599. f"VALUES {args_str}\n"
  600. "ON CONFLICT (project_id, date)\n"
  601. f"DO UPDATE SET count = {table}.count + EXCLUDED.count;"
  602. )
  603. cursor.execute(sql)
  604. TagStats = defaultdict[
  605. datetime,
  606. defaultdict[int, defaultdict[int, defaultdict[int, int]]],
  607. ]
  608. def update_tags(processing_events: list[ProcessingEvent]):
  609. keys = sorted({key for d in processing_events for key in d.event_tags.keys()})
  610. values = sorted(
  611. {value for d in processing_events for value in d.event_tags.values()}
  612. )
  613. TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True)
  614. TagValue.objects.bulk_create(
  615. [TagValue(value=value) for value in values], ignore_conflicts=True
  616. )
  617. # Postgres cannot return ids with ignore_conflicts
  618. tag_keys = {
  619. tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values()
  620. }
  621. tag_values = {
  622. tag["value"]: tag["id"]
  623. for tag in TagValue.objects.filter(value__in=values).values()
  624. }
  625. tag_stats: TagStats = defaultdict(
  626. lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
  627. )
  628. for processing_event in processing_events:
  629. if processing_event.issue_id is None:
  630. continue
  631. # Group by day. More granular allows for a better search
  632. # Less granular yields better tag filter performance
  633. minute_received = processing_event.event.received.replace(
  634. hour=0, minute=0, second=0, microsecond=0
  635. )
  636. for key, value in processing_event.event_tags.items():
  637. key_id = tag_keys[key]
  638. value_id = tag_values[value]
  639. tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1
  640. if not tag_stats:
  641. return
  642. # Sort to mitigate deadlocks
  643. data = sorted(
  644. [
  645. [date, issue_id, key_id, value_id, count]
  646. for date, d1 in tag_stats.items()
  647. for issue_id, d2 in d1.items()
  648. for key_id, d3 in d2.items()
  649. for value_id, count in d3.items()
  650. ],
  651. key=itemgetter(0, 1, 2, 3),
  652. )
  653. with connection.cursor() as cursor:
  654. args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data)
  655. sql = (
  656. "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n"
  657. f"VALUES {args_str}\n"
  658. "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n"
  659. "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;"
  660. )
  661. cursor.execute(sql)
  662. # Transactions
  663. def process_transaction_events(ingest_events: list[InterchangeTransactionEvent]):
  664. release_set = {
  665. (event.payload.release, event.project_id, event.organization_id)
  666. for event in ingest_events
  667. if event.payload.release
  668. }
  669. environment_set = {
  670. (event.payload.environment[:255], event.project_id, event.organization_id)
  671. for event in ingest_events
  672. if event.payload.environment
  673. }
  674. project_set = {project_id for _, project_id, _ in release_set}.union(
  675. {project_id for _, project_id, _ in environment_set}
  676. )
  677. release_version_set = {version for version, _, _ in release_set}
  678. environment_name_set = {name for name, _, _ in environment_set}
  679. projects_with_data = (
  680. Project.objects.filter(id__in=project_set)
  681. .annotate(
  682. release_id=Coalesce("releases__id", Value(None)),
  683. release_name=Coalesce("releases__version", Value(None)),
  684. environment_id=Coalesce("environment__id", Value(None)),
  685. environment_name=Coalesce("environment__name", Value(None)),
  686. )
  687. .filter(release_name__in=release_version_set.union({None}))
  688. .filter(environment_name__in=environment_name_set.union({None}))
  689. .values(
  690. "id",
  691. "release_id",
  692. "release_name",
  693. "environment_id",
  694. "environment_name",
  695. )
  696. )
  697. get_and_create_releases(release_set, projects_with_data)
  698. create_environments(environment_set, projects_with_data)
  699. transactions = []
  700. for ingest_event in ingest_events:
  701. event = ingest_event.payload
  702. contexts = event.contexts
  703. request = event.request
  704. trace_id = contexts["trace"]["trace_id"]
  705. op = ""
  706. if isinstance(contexts, dict):
  707. trace = contexts.get("trace", {})
  708. if isinstance(trace, dict):
  709. op = str(trace.get("op", ""))
  710. method: str | None = None
  711. if request:
  712. method = request.method
  713. # TODO tags
  714. group, group_created = TransactionGroup.objects.get_or_create(
  715. project_id=ingest_event.project_id,
  716. transaction=event.transaction[:1024], # Truncate
  717. op=op,
  718. method=method,
  719. )
  720. transactions.append(
  721. TransactionEvent(
  722. group=group,
  723. data={
  724. "request": request.dict() if request else None,
  725. "sdk": event.sdk.dict() if event.sdk else None,
  726. "platform": event.platform,
  727. },
  728. trace_id=trace_id,
  729. event_id=event.event_id,
  730. timestamp=event.timestamp,
  731. start_timestamp=event.start_timestamp,
  732. duration=(event.timestamp - event.start_timestamp).total_seconds()
  733. * 1000,
  734. )
  735. )
  736. TransactionEvent.objects.bulk_create(transactions, ignore_conflicts=True)
  737. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  738. lambda: defaultdict(int)
  739. )
  740. for perf_transaction in transactions:
  741. hour_received = perf_transaction.start_timestamp.replace(
  742. minute=0, second=0, microsecond=0
  743. )
  744. data_stats[hour_received][perf_transaction.group.project_id] += 1
  745. update_statistics(data_stats, False)