process_event.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. from collections import defaultdict
  2. from dataclasses import dataclass
  3. from datetime import datetime
  4. from operator import itemgetter
  5. from typing import Any, Optional, Union
  6. from urllib.parse import urlparse
  7. from django.contrib.postgres.search import SearchVector
  8. from django.db import connection, transaction
  9. from django.db.models import (
  10. Exists,
  11. F,
  12. OuterRef,
  13. Q,
  14. QuerySet,
  15. Value,
  16. )
  17. from django.db.models.functions import Coalesce, Greatest
  18. from django.db.utils import IntegrityError
  19. from ninja import Schema
  20. from user_agents import parse
  21. from apps.alerts.models import Notification
  22. from apps.difs.models import DebugInformationFile
  23. from apps.difs.tasks import event_difs_resolve_stacktrace
  24. from apps.environments.models import Environment, EnvironmentProject
  25. from apps.issue_events.constants import EventStatus, LogLevel
  26. from apps.issue_events.models import (
  27. Issue,
  28. IssueEvent,
  29. IssueEventType,
  30. IssueHash,
  31. TagKey,
  32. TagValue,
  33. )
  34. from apps.projects.models import Project
  35. from apps.releases.models import Release
  36. from sentry.culprit import generate_culprit
  37. from sentry.eventtypes.error import ErrorEvent
  38. from sentry.utils.strings import truncatechars
  39. from ..shared.schema.contexts import (
  40. BrowserContext,
  41. ContextsSchema,
  42. DeviceContext,
  43. OSContext,
  44. )
  45. from .javascript_event_processor import JavascriptEventProcessor
  46. from .model_functions import PipeConcat
  47. from .schema import ErrorIssueEventSchema, IngestIssueEvent, InterchangeIssueEvent
  48. from .utils import generate_hash, transform_parameterized_message
  49. @dataclass
  50. class ProcessingEvent:
  51. event: InterchangeIssueEvent
  52. issue_hash: str
  53. title: str
  54. transaction: str
  55. metadata: dict[str, Any]
  56. event_data: dict[str, Any]
  57. event_tags: dict[str, str]
  58. level: Optional[LogLevel] = None
  59. issue_id: Optional[int] = None
  60. issue_created = False
  61. release_id: Optional[int] = None
  62. @dataclass
  63. class IssueUpdate:
  64. last_seen: datetime
  65. search_vector: str
  66. added_count: int = 1
  67. def get_search_vector(event: ProcessingEvent) -> str:
  68. return f"{event.title} {event.transaction}"
  69. Replacable = Union[str, dict, list]
  70. def replace(data: Replacable, match: str, repl: str) -> Replacable:
  71. """A recursive replace function"""
  72. if isinstance(data, dict):
  73. return {k: replace(v, match, repl) for k, v in data.items()}
  74. elif isinstance(data, list):
  75. return [replace(i, match, repl) for i in data]
  76. elif isinstance(data, str):
  77. return data.replace(match, repl)
  78. return data
  79. def sanitize_bad_postgres_chars(data: str):
  80. """
  81. Remove values which are not supported by the postgres string data types
  82. """
  83. known_bads = ["\x00"]
  84. for known_bad in known_bads:
  85. data = data.replace(known_bad, " ")
  86. return data
  87. def sanitize_bad_postgres_json(data: Replacable) -> Replacable:
  88. """
  89. Remove values which are not supported by the postgres JSONB data type
  90. """
  91. known_bads = ["\u0000"]
  92. for known_bad in known_bads:
  93. data = replace(data, known_bad, " ")
  94. return data
  95. def update_issues(processing_events: list[ProcessingEvent]):
  96. """
  97. Update any existing issues based on new statistics
  98. """
  99. issues_to_update: dict[int, IssueUpdate] = {}
  100. for processing_event in processing_events:
  101. if processing_event.issue_created:
  102. break
  103. issue_id = processing_event.issue_id
  104. if issue_id in issues_to_update:
  105. issues_to_update[issue_id].added_count += 1
  106. issues_to_update[
  107. issue_id
  108. ].search_vector += f" {get_search_vector(processing_event)}"
  109. if issues_to_update[issue_id].last_seen < processing_event.event.received:
  110. issues_to_update[issue_id].last_seen = processing_event.event.received
  111. elif issue_id:
  112. issues_to_update[issue_id] = IssueUpdate(
  113. last_seen=processing_event.event.received,
  114. search_vector=get_search_vector(processing_event),
  115. )
  116. for issue_id, value in issues_to_update.items():
  117. Issue.objects.filter(id=issue_id).update(
  118. count=F("count") + value.added_count,
  119. search_vector=PipeConcat(
  120. F("search_vector"), SearchVector(Value(value.search_vector))
  121. ),
  122. last_seen=Greatest(F("last_seen"), value.last_seen),
  123. )
  124. def devalue(obj: Union[Schema, list]) -> Optional[Union[dict, list]]:
  125. """
  126. Convert Schema like {"values": []} into list or dict without unnecessary 'values'
  127. """
  128. if isinstance(obj, Schema) and hasattr(obj, "values"):
  129. return obj.dict(mode="json", exclude_none=True, exclude_defaults=True)["values"]
  130. elif isinstance(obj, list):
  131. return [
  132. x.dict(mode="json", exclude_none=True, exclude_defaults=True) for x in obj
  133. ]
  134. return None
  135. def generate_contexts(event: IngestIssueEvent) -> ContextsSchema:
  136. """
  137. Add additional contexts if they aren't already set
  138. """
  139. contexts = event.contexts if event.contexts else ContextsSchema(root={})
  140. if request := event.request:
  141. if isinstance(request.headers, list):
  142. if ua_string := next(
  143. (x[1] for x in request.headers if x[0] == "User-Agent"), None
  144. ):
  145. user_agent = parse(ua_string)
  146. if "browser" not in contexts.root:
  147. contexts.root["browser"] = BrowserContext(
  148. name=user_agent.browser.family,
  149. version=user_agent.browser.version_string,
  150. )
  151. if "os" not in contexts.root:
  152. contexts.root["os"] = OSContext(
  153. name=user_agent.os.family, version=user_agent.os.version_string
  154. )
  155. if "device" not in contexts.root:
  156. device = user_agent.device
  157. contexts.root["device"] = DeviceContext(
  158. family=device.family,
  159. model=device.model,
  160. brand=device.brand,
  161. )
  162. return contexts
  163. def generate_tags(event: IngestIssueEvent) -> dict[str, str]:
  164. """Generate key-value tags based on context and other event data"""
  165. tags: dict[str, Optional[str]] = event.tags if isinstance(event.tags, dict) else {}
  166. if contexts := event.contexts:
  167. if browser := contexts.root.get("browser"):
  168. if isinstance(browser, BrowserContext):
  169. tags["browser.name"] = browser.name
  170. tags["browser"] = f"{browser.name} {browser.version}"
  171. if os := contexts.root.get("os"):
  172. if isinstance(os, OSContext):
  173. tags["os.name"] = os.name
  174. if device := contexts.root.get("device"):
  175. if isinstance(device, DeviceContext) and device.model:
  176. tags["device"] = device.model
  177. if user := event.user:
  178. if user.id:
  179. tags["user.id"] = user.id
  180. if user.email:
  181. tags["user.email"] = user.email
  182. if user.username:
  183. tags["user.username"] = user.username
  184. if environment := event.environment:
  185. tags["environment"] = environment
  186. if release := event.release:
  187. tags["release"] = release
  188. if server_name := event.server_name:
  189. tags["server_name"] = server_name
  190. # Exclude None values
  191. return {key: value for key, value in tags.items() if value}
  192. def check_set_issue_id(
  193. processing_events: list[ProcessingEvent],
  194. project_id: int,
  195. issue_hash: str,
  196. issue_id: int,
  197. ):
  198. """
  199. It's common to receive two duplicate events at the same time,
  200. where the issue has never been seen before. This is an optimization
  201. that checks if there is a known project/hash. If so, we can infer the
  202. issue_id.
  203. """
  204. for event in processing_events:
  205. if (
  206. event.issue_id is None
  207. and event.event.project_id == project_id
  208. and event.issue_hash == issue_hash
  209. ):
  210. event.issue_id = issue_id
  211. def create_environments(
  212. environment_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  213. ):
  214. """
  215. Create newly seen environments.
  216. Functions determines which, if any, environments are present in event data
  217. but not the database. Optimized to do a much work in python and reduce queries.
  218. """
  219. environments_to_create = [
  220. Environment(name=name, organization_id=organization_id)
  221. for name, project_id, organization_id in environment_set
  222. if not next(
  223. (
  224. x
  225. for x in projects_with_data
  226. if x["environment_name"] == name and x["id"] == project_id
  227. ),
  228. None,
  229. )
  230. ]
  231. if environments_to_create:
  232. Environment.objects.bulk_create(environments_to_create, ignore_conflicts=True)
  233. query = Q()
  234. for environment in environments_to_create:
  235. query |= Q(
  236. name=environment.name, organization_id=environment.organization_id
  237. )
  238. environments = Environment.objects.filter(query)
  239. environment_projects: list = []
  240. for environment in environments:
  241. project_id = next(
  242. project_id
  243. for (name, project_id, organization_id) in environment_set
  244. if environment.name == name
  245. and environment.organization_id == organization_id
  246. )
  247. environment_projects.append(
  248. EnvironmentProject(project_id=project_id, environment=environment)
  249. )
  250. EnvironmentProject.objects.bulk_create(
  251. environment_projects, ignore_conflicts=True
  252. )
  253. def get_and_create_releases(
  254. release_set: set[tuple[str, int, int]], projects_with_data: QuerySet
  255. ) -> list[tuple[str, int, int]]:
  256. """
  257. Create newly seen releases.
  258. functions determines which, if any, releases are present in event data
  259. but not the database. Optimized to do a much work in python and reduce queries.
  260. Return list of tuples: Release version, project_id, release_id
  261. """
  262. releases_to_create = [
  263. Release(version=release_name, organization_id=organization_id)
  264. for release_name, project_id, organization_id in release_set
  265. if not next(
  266. (
  267. x
  268. for x in projects_with_data
  269. if x["release_name"] == release_name and x["id"] == project_id
  270. ),
  271. None,
  272. )
  273. ]
  274. releases: Union[list, QuerySet] = []
  275. if releases_to_create:
  276. # Create database records for any release that doesn't exist
  277. Release.objects.bulk_create(releases_to_create, ignore_conflicts=True)
  278. query = Q()
  279. for release in releases_to_create:
  280. query |= Q(version=release.version, organization_id=release.organization_id)
  281. releases = Release.objects.filter(query)
  282. ReleaseProject = Release.projects.through
  283. release_projects = [
  284. ReleaseProject(
  285. release=release,
  286. project_id=next(
  287. project_id
  288. for (version, project_id, organization_id) in release_set
  289. if release.version == version
  290. and release.organization_id == organization_id
  291. ),
  292. )
  293. for release in releases
  294. ]
  295. ReleaseProject.objects.bulk_create(release_projects, ignore_conflicts=True)
  296. return [
  297. (
  298. version,
  299. project_id,
  300. next(
  301. (
  302. project["release_id"]
  303. for project in projects_with_data
  304. if project["release_name"] == version
  305. and project["id"] == project_id
  306. ),
  307. next(
  308. (
  309. release.id
  310. for release in releases
  311. if release.version == version
  312. and release.organization_id == organization_id
  313. ),
  314. 0,
  315. ),
  316. ),
  317. )
  318. for version, project_id, organization_id in release_set
  319. ]
  320. def process_issue_events(ingest_events: list[InterchangeIssueEvent]):
  321. """
  322. Accepts a list of events to ingest. Events should be:
  323. - Few enough to save in a single DB call
  324. - Permission is already checked, these events are to write to the DB
  325. - Some invalid events are tolerated (ignored), including duplicate event id
  326. When there is an error in this function, care should be taken as to when to log,
  327. error, or ignore. If the SDK sends "weird" data, we want to log that.
  328. It's better to save a minimal event than to ignore it.
  329. """
  330. # Fetch any needed releases, environments, and whether there is a dif file association
  331. # Get unique release/environment for each project_id
  332. release_set = {
  333. (event.payload.release, event.project_id, event.organization_id)
  334. for event in ingest_events
  335. if event.payload.release
  336. }
  337. environment_set = {
  338. (event.payload.environment[:255], event.project_id, event.organization_id)
  339. for event in ingest_events
  340. if event.payload.environment
  341. }
  342. project_set = {project_id for _, project_id, _ in release_set}.union(
  343. {project_id for _, project_id, _ in environment_set}
  344. )
  345. release_version_set = {version for version, _, _ in release_set}
  346. environment_name_set = {name for name, _, _ in environment_set}
  347. projects_with_data = (
  348. Project.objects.filter(id__in=project_set)
  349. .annotate(
  350. has_difs=Exists(
  351. DebugInformationFile.objects.filter(project_id=OuterRef("pk"))
  352. ),
  353. release_id=Coalesce("releases__id", Value(None)),
  354. release_name=Coalesce("releases__version", Value(None)),
  355. environment_id=Coalesce("environment__id", Value(None)),
  356. environment_name=Coalesce("environment__name", Value(None)),
  357. )
  358. .filter(release_name__in=release_version_set.union({None}))
  359. .filter(environment_name__in=environment_name_set.union({None}))
  360. .values(
  361. "id",
  362. "has_difs",
  363. "release_id",
  364. "release_name",
  365. "environment_id",
  366. "environment_name",
  367. )
  368. )
  369. releases = get_and_create_releases(release_set, projects_with_data)
  370. create_environments(environment_set, projects_with_data)
  371. # Collected/calculated event data while processing
  372. processing_events: list[ProcessingEvent] = []
  373. # Collect Q objects for bulk issue hash lookup
  374. q_objects = Q()
  375. for ingest_event in ingest_events:
  376. event_data: dict[str, Any] = {}
  377. event = ingest_event.payload
  378. event.contexts = generate_contexts(event)
  379. event_tags = generate_tags(event)
  380. title = ""
  381. culprit = ""
  382. metadata: dict[str, Any] = {}
  383. release_id = next(
  384. (
  385. release_id
  386. for version, project_id, release_id in releases
  387. if version == event_tags.get("release")
  388. and ingest_event.project_id == project_id
  389. ),
  390. None,
  391. )
  392. if event.platform in ("javascript", "node") and release_id:
  393. JavascriptEventProcessor(release_id, event).transform()
  394. elif (
  395. isinstance(event, ErrorIssueEventSchema)
  396. and event.exception
  397. and next(
  398. (
  399. project["has_difs"]
  400. for project in projects_with_data
  401. if project["id"] == ingest_event.project_id
  402. ),
  403. False,
  404. )
  405. ):
  406. event_difs_resolve_stacktrace(event, ingest_event.project_id)
  407. if event.type in [IssueEventType.ERROR, IssueEventType.DEFAULT]:
  408. sentry_event = ErrorEvent()
  409. metadata = sentry_event.get_metadata(event.dict())
  410. if event.type == IssueEventType.ERROR and metadata:
  411. full_title = sentry_event.get_title(metadata)
  412. else:
  413. message = event.message if event.message else event.logentry
  414. full_title = (
  415. transform_parameterized_message(message)
  416. if message
  417. else "<untitled>"
  418. )
  419. culprit = (
  420. event.transaction
  421. if event.transaction
  422. else generate_culprit(event.dict())
  423. )
  424. title = truncatechars(full_title)
  425. culprit = sentry_event.get_location(event.dict())
  426. elif event.type == IssueEventType.CSP:
  427. humanized_directive = event.csp.effective_directive.replace("-src", "")
  428. uri = urlparse(event.csp.blocked_uri).netloc
  429. full_title = title = f"Blocked '{humanized_directive}' from '{uri}'"
  430. culprit = event.csp.effective_directive
  431. event_data["csp"] = event.csp.dict()
  432. issue_hash = generate_hash(title, culprit, event.type, event.fingerprint)
  433. if metadata:
  434. event_data["metadata"] = metadata
  435. if platform := event.platform:
  436. event_data["platform"] = platform
  437. if modules := event.modules:
  438. event_data["modules"] = modules
  439. if sdk := event.sdk:
  440. event_data["sdk"] = sdk.dict(exclude_none=True)
  441. if request := event.request:
  442. event_data["request"] = request.dict(exclude_none=True)
  443. if environment := event.environment:
  444. event_data["environment"] = environment
  445. # Message is str
  446. # Logentry is {"params": etc} Message format
  447. if logentry := event.logentry:
  448. event_data["logentry"] = logentry.dict(exclude_none=True)
  449. elif message := event.message:
  450. if isinstance(message, str):
  451. event_data["logentry"] = {"formatted": message}
  452. else:
  453. event_data["logentry"] = message.dict(exclude_none=True)
  454. if message := event.message:
  455. event_data["message"] = (
  456. message if isinstance(message, str) else message.formatted
  457. )
  458. # When blank, the API will default to the title anyway
  459. elif title != full_title:
  460. # If the title is truncated, store the full title
  461. event_data["message"] = full_title
  462. if breadcrumbs := event.breadcrumbs:
  463. event_data["breadcrumbs"] = devalue(breadcrumbs)
  464. if exception := event.exception:
  465. event_data["exception"] = devalue(exception)
  466. if extra := event.extra:
  467. event_data["extra"] = extra
  468. if user := event.user:
  469. event_data["user"] = user.dict(exclude_none=True)
  470. if contexts := event.contexts:
  471. event_data["contexts"] = contexts.dict(exclude_none=True)
  472. processing_events.append(
  473. ProcessingEvent(
  474. event=ingest_event,
  475. issue_hash=issue_hash,
  476. title=title,
  477. level=LogLevel.from_string(event.level) if event.level else None,
  478. transaction=culprit,
  479. metadata=metadata,
  480. event_data=event_data,
  481. event_tags=event_tags,
  482. release_id=release_id,
  483. )
  484. )
  485. q_objects |= Q(project_id=ingest_event.project_id, value=issue_hash)
  486. hash_queryset = IssueHash.objects.filter(q_objects).values(
  487. "value", "project_id", "issue_id", "issue__status"
  488. )
  489. issue_events: list[IssueEvent] = []
  490. issues_to_reopen = []
  491. for processing_event in processing_events:
  492. event_type = processing_event.event.payload.type
  493. project_id = processing_event.event.project_id
  494. issue_defaults = {
  495. "type": event_type,
  496. "title": sanitize_bad_postgres_chars(processing_event.title),
  497. "metadata": sanitize_bad_postgres_json(processing_event.metadata),
  498. "first_seen": processing_event.event.received,
  499. "last_seen": processing_event.event.received,
  500. }
  501. if level := processing_event.level:
  502. issue_defaults["level"] = level
  503. for hash_obj in hash_queryset:
  504. if (
  505. hash_obj["value"].hex == processing_event.issue_hash
  506. and hash_obj["project_id"] == project_id
  507. ):
  508. processing_event.issue_id = hash_obj["issue_id"]
  509. if hash_obj["issue__status"] == EventStatus.RESOLVED:
  510. issues_to_reopen.append(hash_obj["issue_id"])
  511. break
  512. if not processing_event.issue_id:
  513. try:
  514. with transaction.atomic():
  515. issue = Issue.objects.create(
  516. project_id=project_id,
  517. search_vector=SearchVector(Value(issue_defaults["title"])),
  518. **issue_defaults,
  519. )
  520. new_issue_hash = IssueHash.objects.create(
  521. issue=issue,
  522. value=processing_event.issue_hash,
  523. project_id=project_id,
  524. )
  525. check_set_issue_id(
  526. processing_events,
  527. issue.project_id,
  528. new_issue_hash.value,
  529. issue.id,
  530. )
  531. processing_event.issue_id = issue.id
  532. processing_event.issue_created = True
  533. except IntegrityError:
  534. processing_event.issue_id = IssueHash.objects.get(
  535. project_id=project_id, value=processing_event.issue_hash
  536. ).issue_id
  537. issue_events.append(
  538. IssueEvent(
  539. id=processing_event.event.event_id,
  540. issue_id=processing_event.issue_id,
  541. type=event_type,
  542. level=processing_event.level
  543. if processing_event.level
  544. else LogLevel.ERROR,
  545. timestamp=processing_event.event.payload.timestamp,
  546. received=processing_event.event.received,
  547. title=processing_event.title,
  548. transaction=processing_event.transaction,
  549. data=sanitize_bad_postgres_json(processing_event.event_data),
  550. tags=processing_event.event_tags,
  551. release_id=processing_event.release_id,
  552. )
  553. )
  554. update_issues(processing_events)
  555. if issues_to_reopen:
  556. Issue.objects.filter(id__in=issues_to_reopen).update(
  557. status=EventStatus.UNRESOLVED
  558. )
  559. Notification.objects.filter(issues__in=issues_to_reopen).delete()
  560. # ignore_conflicts because we could have an invalid duplicate event_id, received
  561. IssueEvent.objects.bulk_create(issue_events, ignore_conflicts=True)
  562. # Group events by time and project for event count statistics
  563. data_stats: defaultdict[datetime, defaultdict[int, int]] = defaultdict(
  564. lambda: defaultdict(int)
  565. )
  566. for processing_event in processing_events:
  567. hour_received = processing_event.event.received.replace(
  568. minute=0, second=0, microsecond=0
  569. )
  570. data_stats[hour_received][processing_event.event.project_id] += 1
  571. update_tags(processing_events)
  572. update_statistics(data_stats)
  573. def update_statistics(
  574. project_event_stats: defaultdict[datetime, defaultdict[int, int]],
  575. ):
  576. # Flatten data for a sql param friendly format and sort to mitigate deadlocks
  577. data = sorted(
  578. [
  579. [year, key, value]
  580. for year, inner_dict in project_event_stats.items()
  581. for key, value in inner_dict.items()
  582. ],
  583. key=itemgetter(0, 1),
  584. )
  585. # Django ORM cannot support F functions in a bulk_update
  586. # psycopg3 does not support execute_values
  587. # https://github.com/psycopg/psycopg/issues/114
  588. with connection.cursor() as cursor:
  589. args_str = ",".join(cursor.mogrify("(%s,%s,%s)", x) for x in data)
  590. sql = (
  591. "INSERT INTO projects_issueeventprojecthourlystatistic (date, project_id, count)\n"
  592. f"VALUES {args_str}\n"
  593. "ON CONFLICT (project_id, date)\n"
  594. "DO UPDATE SET count = projects_issueeventprojecthourlystatistic.count + EXCLUDED.count;"
  595. )
  596. cursor.execute(sql)
  597. TagStats = defaultdict[
  598. datetime,
  599. defaultdict[int, defaultdict[int, defaultdict[int, int]]],
  600. ]
  601. def update_tags(processing_events: list[ProcessingEvent]):
  602. keys = sorted({key for d in processing_events for key in d.event_tags.keys()})
  603. values = sorted(
  604. {value for d in processing_events for value in d.event_tags.values()}
  605. )
  606. TagKey.objects.bulk_create([TagKey(key=key) for key in keys], ignore_conflicts=True)
  607. TagValue.objects.bulk_create(
  608. [TagValue(value=value) for value in values], ignore_conflicts=True
  609. )
  610. # Postgres cannot return ids with ignore_conflicts
  611. tag_keys = {
  612. tag["key"]: tag["id"] for tag in TagKey.objects.filter(key__in=keys).values()
  613. }
  614. tag_values = {
  615. tag["value"]: tag["id"]
  616. for tag in TagValue.objects.filter(value__in=values).values()
  617. }
  618. tag_stats: TagStats = defaultdict(
  619. lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
  620. )
  621. for processing_event in processing_events:
  622. if processing_event.issue_id is None:
  623. continue
  624. # Group by day. More granular allows for a better search
  625. # Less granular yields better tag filter performance
  626. minute_received = processing_event.event.received.replace(
  627. hour=0, minute=0, second=0, microsecond=0
  628. )
  629. for key, value in processing_event.event_tags.items():
  630. key_id = tag_keys[key]
  631. value_id = tag_values[value]
  632. tag_stats[minute_received][processing_event.issue_id][key_id][value_id] += 1
  633. if not tag_stats:
  634. return
  635. # Sort to mitigate deadlocks
  636. data = sorted(
  637. [
  638. [date, issue_id, key_id, value_id, count]
  639. for date, d1 in tag_stats.items()
  640. for issue_id, d2 in d1.items()
  641. for key_id, d3 in d2.items()
  642. for value_id, count in d3.items()
  643. ],
  644. key=itemgetter(0, 1, 2, 3),
  645. )
  646. with connection.cursor() as cursor:
  647. args_str = ",".join(cursor.mogrify("(%s,%s,%s,%s,%s)", x) for x in data)
  648. sql = (
  649. "INSERT INTO issue_events_issuetag (date, issue_id, tag_key_id, tag_value_id, count)\n"
  650. f"VALUES {args_str}\n"
  651. "ON CONFLICT (issue_id, date, tag_key_id, tag_value_id)\n"
  652. "DO UPDATE SET count = issue_events_issuetag.count + EXCLUDED.count;"
  653. )
  654. cursor.execute(sql)