Browse Source

ref(typing): add typing to event_manager (#40697)

Add typing for clarity.
Gilbert Szeto 2 years ago
parent
commit
3c612383d0

+ 1 - 0
mypy.ini

@@ -46,6 +46,7 @@ files = fixtures/mypy-stubs,
         src/sentry/db/models/utils.py,
         src/sentry/digests/,
         src/sentry/dynamic_sampling/,
+        src/sentry/event_manager.py,
         src/sentry/eventstream/base.py,
         src/sentry/eventstream/snuba.py,
         src/sentry/eventstream/kafka/consumer_strategy.py,

+ 198 - 114
src/sentry/event_manager.py

@@ -10,7 +10,19 @@ from dataclasses import dataclass
 from datetime import datetime, timedelta
 from hashlib import md5
 from io import BytesIO
-from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, TypedDict
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Sequence,
+    Type,
+    TypedDict,
+    Union,
+    cast,
+)
 
 import sentry_sdk
 from django.conf import settings
@@ -31,7 +43,7 @@ from sentry import (
     reprocessing2,
     tsdb,
 )
-from sentry.attachments import MissingAttachmentChunks, attachment_cache
+from sentry.attachments import CachedAttachment, MissingAttachmentChunks, attachment_cache
 from sentry.constants import (
     DEFAULT_STORE_NORMALIZER_ARGS,
     LOG_LEVELS_MAP,
@@ -46,8 +58,18 @@ from sentry.dynamic_sampling.latest_release_booster import (
     observe_release,
 )
 from sentry.eventstore.processing import event_processing_store
+from sentry.eventtypes import (
+    CspEvent,
+    DefaultEvent,
+    ErrorEvent,
+    ExpectCTEvent,
+    ExpectStapleEvent,
+    HpkpEvent,
+    TransactionEvent,
+)
 from sentry.grouping.api import (
     BackgroundGroupingConfigLoader,
+    GroupingConfig,
     GroupingConfigNotFound,
     SecondaryGroupingConfigLoader,
     apply_server_fingerprinting,
@@ -108,6 +130,7 @@ from sentry.utils import json, metrics
 from sentry.utils.cache import cache_key_for_event
 from sentry.utils.canonical import CanonicalKeyDict
 from sentry.utils.dates import to_datetime, to_timestamp
+from sentry.utils.metrics import MutableTags
 from sentry.utils.outcomes import Outcome, track_outcome
 from sentry.utils.performance_issues.performance_detection import (
     EventPerformanceProblem,
@@ -141,37 +164,38 @@ class GroupInfo:
     is_new_group_environment: bool = False
 
 
-def pop_tag(data, key):
+def pop_tag(data: dict[str, Any], key: str) -> None:
     if "tags" not in data:
         return
 
     data["tags"] = [kv for kv in data["tags"] if kv is None or kv[0] != key]
 
 
-def set_tag(data, key, value):
+def set_tag(data: dict[str, Any], key: str, value: Any) -> None:
     pop_tag(data, key)
     if value is not None:
         data.setdefault("tags", []).append((key, trim(value, MAX_TAG_VALUE_LENGTH)))
 
 
-def get_tag(data, key):
+def get_tag(data: dict[str, Any], key: str) -> Optional[Any]:
     for k, v in get_path(data, "tags", filter=True) or ():
         if k == key:
             return v
+    return None
 
 
-def plugin_is_regression(group, event):
+def plugin_is_regression(group: Group, event: Event) -> bool:
     project = event.project
     for plugin in plugins.for_project(project):
         result = safe_execute(
             plugin.is_regression, group, event, version=1, _with_transaction=False
         )
         if result is not None:
-            return result
+            return bool(result)
     return True
 
 
-def has_pending_commit_resolution(group):
+def has_pending_commit_resolution(group: Group) -> bool:
     """
     Checks that the most recent commit that fixes a group has had a chance to release
     """
@@ -203,18 +227,20 @@ def has_pending_commit_resolution(group):
         return True
 
 
-def get_max_crashreports(model, allow_none=False):
+def get_max_crashreports(
+    model: Union[Project, Organization], allow_none: bool = False
+) -> Optional[int]:
     value = model.get_option("sentry:store_crash_reports")
-    return convert_crashreport_count(value, allow_none=allow_none)
+    return convert_crashreport_count(value, allow_none=allow_none)  # type: ignore
 
 
-def crashreports_exceeded(current_count, max_count):
+def crashreports_exceeded(current_count: int, max_count: int) -> bool:
     if max_count == STORE_CRASH_REPORTS_ALL:
         return False
     return current_count >= max_count
 
 
-def get_stored_crashreports(cache_key, event, max_crashreports):
+def get_stored_crashreports(cache_key: Optional[str], event: Event, max_crashreports: int) -> int:
     # There are two common cases: Storing crash reports is disabled, or is
     # unbounded. In both cases, there is no need in caching values or querying
     # the database.
@@ -223,13 +249,13 @@ def get_stored_crashreports(cache_key, event, max_crashreports):
 
     cached_reports = cache.get(cache_key, None)
     if cached_reports is not None and cached_reports >= max_crashreports:
-        return cached_reports
+        return cached_reports  # type: ignore
 
     # Fall-through if max_crashreports was bumped to get a more accurate number.
     # We don't need the actual number, but just whether it's more or equal to
     # the currently allowed maximum.
     query = EventAttachment.objects.filter(group_id=event.group_id, type__in=CRASH_REPORT_TYPES)
-    return query[:max_crashreports].count()
+    return query[:max_crashreports].count()  # type: ignore
 
 
 class HashDiscarded(Exception):
@@ -241,8 +267,8 @@ class HashDiscarded(Exception):
         self.tombstone_id = tombstone_id
 
 
-class ScoreClause(Func):
-    def __init__(self, group=None, last_seen=None, times_seen=None, *args, **kwargs):
+class ScoreClause(Func):  # type: ignore
+    def __init__(self, group=None, last_seen=None, times_seen=None, *args, **kwargs):  # type: ignore
         self.group = group
         self.last_seen = last_seen
         self.times_seen = times_seen
@@ -251,12 +277,12 @@ class ScoreClause(Func):
             self.times_seen = self.times_seen.rhs.value
         super().__init__(*args, **kwargs)
 
-    def __int__(self):
+    def __int__(self):  # type: ignore
         # Calculate the score manually when coercing to an int.
         # This is used within create_or_update and friends
         return self.group.get_score() if self.group else 0
 
-    def as_sql(self, compiler, connection, function=None, template=None):
+    def as_sql(self, compiler, connection, function=None, template=None):  # type: ignore
         has_values = self.last_seen is not None and self.times_seen is not None
         if has_values:
             sql = "log(times_seen + %d) * 600 + %d" % (
@@ -269,6 +295,11 @@ class ScoreClause(Func):
         return (sql, [])
 
 
+ProjectsMapping = Mapping[int, Project]
+
+Job = MutableMapping[str, Any]
+
+
 class EventManager:
     """
     Handles normalization in both the store endpoint and the save task. The
@@ -277,19 +308,19 @@ class EventManager:
 
     def __init__(
         self,
-        data,
-        version="5",
-        project=None,
-        grouping_config=None,
-        client_ip=None,
-        user_agent=None,
-        auth=None,
-        key=None,
-        content_encoding=None,
-        is_renormalize=False,
-        remove_other=None,
-        project_config=None,
-        sent_at=None,
+        data: dict[str, Any],
+        version: str = "5",
+        project: Optional[Project] = None,
+        grouping_config: Optional[GroupingConfig] = None,
+        client_ip: Optional[str] = None,
+        user_agent: Optional[str] = None,
+        auth: Optional[Any] = None,
+        key: Optional[Any] = None,
+        content_encoding: Optional[str] = None,
+        is_renormalize: bool = False,
+        remove_other: Optional[bool] = None,
+        project_config: Optional[Any] = None,
+        sent_at: Optional[datetime] = None,
     ):
         self._data = CanonicalKeyDict(data)
         self.version = version
@@ -312,11 +343,11 @@ class EventManager:
         self.project_config = project_config
         self.sent_at = sent_at
 
-    def normalize(self, project_id=None):
+    def normalize(self, project_id: Optional[int] = None) -> None:
         with metrics.timer("events.store.normalize.duration"):
             self._normalize_impl(project_id=project_id)
 
-    def _normalize_impl(self, project_id=None):
+    def _normalize_impl(self, project_id: Optional[int] = None) -> None:
         if self._project and project_id and project_id != self._project.id:
             raise RuntimeError(
                 "Initialized EventManager with one project ID and called save() with another one"
@@ -345,20 +376,20 @@ class EventManager:
 
         self._data = CanonicalKeyDict(rust_normalizer.normalize_event(dict(self._data)))
 
-    def get_data(self):
+    def get_data(self) -> CanonicalKeyDict:
         return self._data
 
     @metrics.wraps("event_manager.save")
     def save(
         self,
-        project_id,
-        raw=False,
-        assume_normalized=False,
-        start_time=None,
-        cache_key=None,
-        skip_send_first_transaction=False,
-        auto_upgrade_grouping=False,
-    ):
+        project_id: Optional[int],
+        raw: bool = False,
+        assume_normalized: bool = False,
+        start_time: Optional[int] = None,
+        cache_key: Optional[str] = None,
+        skip_send_first_transaction: bool = False,
+        auto_upgrade_grouping: bool = False,
+    ) -> Event:
         """
         After normalizing and processing an event, save adjacent models such as
         releases and environments to postgres and write the event into
@@ -472,7 +503,7 @@ class EventManager:
             hashes = _calculate_event_grouping(project, job["event"], grouping_config)
 
         hashes = CalculatedHashes(
-            hashes=hashes.hashes + (secondary_hashes and secondary_hashes.hashes or []),
+            hashes=list(hashes.hashes) + list(secondary_hashes and secondary_hashes.hashes or []),
             hierarchical_hashes=hashes.hierarchical_hashes,
             tree_labels=hashes.tree_labels,
         )
@@ -609,14 +640,14 @@ class EventManager:
         return job["event"]
 
 
-def _project_should_update_grouping(project):
+def _project_should_update_grouping(project: Project) -> bool:
     should_update_org = (
         project.organization_id % 1000 < float(settings.SENTRY_GROUPING_AUTO_UPDATE_ENABLED) * 1000
     )
-    return project.get_option("sentry:grouping_auto_update") and should_update_org
+    return bool(project.get_option("sentry:grouping_auto_update")) and should_update_org
 
 
-def _auto_update_grouping(project):
+def _auto_update_grouping(project: Project) -> None:
     old_grouping = project.get_option("sentry:grouping_config")
     new_grouping = DEFAULT_GROUPING_CONFIG
 
@@ -660,11 +691,13 @@ def _auto_update_grouping(project):
 
 
 @metrics.wraps("event_manager.background_grouping")
-def _calculate_background_grouping(project, event, config):
+def _calculate_background_grouping(
+    project: Project, event: Event, config: GroupingConfig
+) -> CalculatedHashes:
     return _calculate_event_grouping(project, event, config)
 
 
-def _run_background_grouping(project, job):
+def _run_background_grouping(project: Project, job: Job) -> None:
     """Optionally run a fraction of events with a third grouping config
     This can be helpful to measure its performance impact.
     This does not affect actual grouping.
@@ -680,7 +713,7 @@ def _run_background_grouping(project, job):
         sentry_sdk.capture_exception()
 
 
-def _get_job_category(data):
+def _get_job_category(data: Mapping[str, Any]) -> DataCategory:
     event_type = data.get("type")
     if event_type == "transaction":
         # TODO: This logic should move into sentry-relay, but I'm not sure
@@ -693,7 +726,7 @@ def _get_job_category(data):
 
 
 @metrics.wraps("save_event.pull_out_data")
-def _pull_out_data(jobs, projects):
+def _pull_out_data(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     """
     A bunch of (probably) CPU bound stuff.
     """
@@ -746,11 +779,11 @@ def _pull_out_data(jobs, projects):
         )
 
 
-def _is_commit_sha(version: str):
+def _is_commit_sha(version: str) -> bool:
     return re.match(r"[0-9a-f]{40}", version) is not None
 
 
-def _associate_commits_with_release(release: Release, project: Project):
+def _associate_commits_with_release(release: Release, project: Project) -> None:
     previous_release = release.get_previous_release(project)
     possible_repos = (
         RepositoryProjectPathConfig.objects.select_related(
@@ -794,9 +827,9 @@ def _associate_commits_with_release(release: Release, project: Project):
 
 
 @metrics.wraps("save_event.get_or_create_release_many")
-def _get_or_create_release_many(jobs, projects):
-    jobs_with_releases = {}
-    release_date_added = {}
+def _get_or_create_release_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
+    jobs_with_releases: dict[tuple[int, Release], list[Job]] = {}
+    release_date_added: dict[tuple[int, Release], datetime] = {}
 
     for job in jobs:
         if not job["release"]:
@@ -888,7 +921,7 @@ def _get_or_create_release_many(jobs, projects):
 
 
 @metrics.wraps("save_event.get_event_user_many")
-def _get_event_user_many(jobs, projects):
+def _get_event_user_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         data = job["data"]
         user = _get_event_user(projects[job["project_id"]], data)
@@ -901,7 +934,7 @@ def _get_event_user_many(jobs, projects):
 
 
 @metrics.wraps("save_event.derive_plugin_tags_many")
-def _derive_plugin_tags_many(jobs, projects):
+def _derive_plugin_tags_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     # XXX: We ought to inline or remove this one for sure
     plugins_for_projects = {p.id: plugins.for_project(p, version=None) for p in projects.values()}
 
@@ -917,7 +950,7 @@ def _derive_plugin_tags_many(jobs, projects):
 
 
 @metrics.wraps("save_event.derive_interface_tags_many")
-def _derive_interface_tags_many(jobs):
+def _derive_interface_tags_many(jobs: Sequence[Job]) -> None:
     # XXX: We ought to inline or remove this one for sure
     for job in jobs:
         data = job["data"]
@@ -931,7 +964,7 @@ def _derive_interface_tags_many(jobs):
 
 
 @metrics.wraps("save_event.materialize_metadata_many")
-def _materialize_metadata_many(jobs):
+def _materialize_metadata_many(jobs: Sequence[Job]) -> None:
     for job in jobs:
         # we want to freeze not just the metadata and type in but also the
         # derived attributes.  The reason for this is that we push this
@@ -958,7 +991,7 @@ def _materialize_metadata_many(jobs):
         job["culprit"] = data["culprit"]
 
 
-def _create_kwargs(job):
+def _create_kwargs(job: Union[Job, PerformanceJob]) -> dict[str, Any]:
     kwargs = {
         "platform": job["platform"],
         "message": job["event"].search_message,
@@ -976,7 +1009,7 @@ def _create_kwargs(job):
 
 
 @metrics.wraps("save_event.get_or_create_environment_many")
-def _get_or_create_environment_many(jobs, projects):
+def _get_or_create_environment_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         job["environment"] = Environment.get_or_create(
             project=projects[job["project_id"]], name=job["environment"]
@@ -984,7 +1017,7 @@ def _get_or_create_environment_many(jobs, projects):
 
 
 @metrics.wraps("save_event.get_or_create_group_environment_many")
-def _get_or_create_group_environment_many(jobs, projects):
+def _get_or_create_group_environment_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         for group_info in job["groups"]:
             group_info.is_new_group_environment = GroupEnvironment.get_or_create(
@@ -995,7 +1028,9 @@ def _get_or_create_group_environment_many(jobs, projects):
 
 
 @metrics.wraps("save_event.get_or_create_release_associated_models")
-def _get_or_create_release_associated_models(jobs, projects):
+def _get_or_create_release_associated_models(
+    jobs: Sequence[Job], projects: ProjectsMapping
+) -> None:
     # XXX: This is possibly unnecessarily detached from
     # _get_or_create_release_many, but we do not want to destroy order of
     # execution right now
@@ -1041,7 +1076,7 @@ def _get_or_create_release_associated_models(jobs, projects):
 
 
 @metrics.wraps("save_event.get_or_create_group_release_many")
-def _get_or_create_group_release_many(jobs, projects):
+def _get_or_create_group_release_many(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         if job["release"]:
             for group_info in job["groups"]:
@@ -1054,7 +1089,7 @@ def _get_or_create_group_release_many(jobs, projects):
 
 
 @metrics.wraps("save_event.tsdb_record_all_metrics")
-def _tsdb_record_all_metrics(jobs):
+def _tsdb_record_all_metrics(jobs: Sequence[Job]) -> None:
     """
     Do all tsdb-related things for save_event in here s.t. we can potentially
     put everything in a single redis pipeline someday.
@@ -1111,7 +1146,7 @@ def _tsdb_record_all_metrics(jobs):
 
 
 @metrics.wraps("save_event.nodestore_save_many")
-def _nodestore_save_many(jobs):
+def _nodestore_save_many(jobs: Sequence[Job]) -> None:
     inserted_time = datetime.utcnow().replace(tzinfo=UTC).timestamp()
     for job in jobs:
         # Write the event to Nodestore
@@ -1132,7 +1167,7 @@ def _nodestore_save_many(jobs):
 
 
 @metrics.wraps("save_event.eventstream_insert_many")
-def _eventstream_insert_many(jobs):
+def _eventstream_insert_many(jobs: Sequence[Job]) -> None:
     for job in jobs:
         if job["event"].project_id == settings.SENTRY_PROJECT:
             metrics.incr(
@@ -1184,7 +1219,7 @@ def _eventstream_insert_many(jobs):
 
 
 @metrics.wraps("save_event.track_outcome_accepted_many")
-def _track_outcome_accepted_many(jobs):
+def _track_outcome_accepted_many(jobs: Sequence[Job]) -> None:
     for job in jobs:
         event = job["event"]
 
@@ -1201,7 +1236,7 @@ def _track_outcome_accepted_many(jobs):
 
 
 @metrics.wraps("event_manager.get_event_instance")
-def _get_event_instance(data, project_id):
+def _get_event_instance(data: Mapping[str, Any], project_id: int) -> Event:
     event_id = data.get("event_id")
 
     return eventstore.create_event(
@@ -1212,16 +1247,18 @@ def _get_event_instance(data, project_id):
     )
 
 
-def _get_event_user(project, data):
+def _get_event_user(project: Project, data: Mapping[str, Any]) -> Optional[EventUser]:
     with metrics.timer("event_manager.get_event_user") as metrics_tags:
         return _get_event_user_impl(project, data, metrics_tags)
 
 
-def _get_event_user_impl(project, data, metrics_tags):
+def _get_event_user_impl(
+    project: Project, data: Mapping[str, Any], metrics_tags: MutableTags
+) -> Optional[EventUser]:
     user_data = data.get("user")
     if not user_data:
         metrics_tags["event_has_user"] = "false"
-        return
+        return None
 
     metrics_tags["event_has_user"] = "true"
 
@@ -1243,7 +1280,7 @@ def _get_event_user_impl(project, data, metrics_tags):
     )
     euser.set_hash()
     if not euser.hash:
-        return
+        return None
 
     cache_key = f"euserid:1:{project.id}:{euser.hash}"
     euser_id = cache.get(cache_key)
@@ -1279,11 +1316,27 @@ def _get_event_user_impl(project, data, metrics_tags):
     return euser
 
 
-def get_event_type(data):
+EventType = Union[
+    DefaultEvent,
+    ErrorEvent,
+    CspEvent,
+    HpkpEvent,
+    ExpectCTEvent,
+    ExpectStapleEvent,
+    TransactionEvent,
+]
+
+
+def get_event_type(data: Mapping[str, Any]) -> EventType:
     return eventtypes.get(data.get("type", "default"))()
 
 
-def materialize_metadata(data, event_type, event_metadata):
+EventMetadata = Dict[str, Any]
+
+
+def materialize_metadata(
+    data: Mapping[str, Any], event_type: EventType, event_metadata: Mapping[str, Any]
+) -> EventMetadata:
     """Returns the materialized metadata to be merged with group or
     event data.  This currently produces the keys `type`, `culprit`,
     `metadata`, `title` and `location`.
@@ -1301,21 +1354,30 @@ def materialize_metadata(data, event_type, event_metadata):
     }
 
 
-def inject_performance_problem_metadata(metadata, problem: PerformanceProblem):
+def inject_performance_problem_metadata(
+    metadata: dict[str, Any], problem: PerformanceProblem
+) -> dict[str, Any]:
     # TODO make type here dynamic, pull it from group type
     metadata["value"] = problem.desc
     metadata["title"] = "N+1 Query"
     return metadata
 
 
-def get_culprit(data):
+def get_culprit(data: Mapping[str, Any]) -> str:
     """Helper to calculate the default culprit"""
-    return force_text(
-        data.get("culprit") or data.get("transaction") or generate_culprit(data) or ""
+    return str(
+        force_text(data.get("culprit") or data.get("transaction") or generate_culprit(data) or "")
     )
 
 
-def _save_aggregate(event, hashes, release, metadata, received_timestamp, **kwargs) -> GroupInfo:
+def _save_aggregate(
+    event: Event,
+    hashes: CalculatedHashes,
+    release: Release,
+    metadata: dict[str, Any],
+    received_timestamp: Union[int, float],
+    **kwargs: dict[str, Any],
+) -> Optional[GroupInfo]:
     project = event.project
 
     flat_grouphashes = [
@@ -1434,7 +1496,7 @@ def _save_aggregate(event, hashes, release, metadata, received_timestamp, **kwar
                 "event_type": "error",
             },
         )
-        return
+        return None
 
     is_new = False
 
@@ -1486,10 +1548,10 @@ def _save_aggregate(event, hashes, release, metadata, received_timestamp, **kwar
 
 
 def _find_existing_grouphash(
-    project,
-    flat_grouphashes,
-    hierarchical_hashes,
-):
+    project: Project,
+    flat_grouphashes: Sequence[GroupHash],
+    hierarchical_hashes: Optional[Sequence[str]],
+) -> tuple[Optional[GroupHash], Optional[str]]:
     all_grouphashes = []
     root_hierarchical_hash = None
 
@@ -1565,7 +1627,7 @@ def _find_existing_grouphash(
     return None, root_hierarchical_hash
 
 
-def _create_group(project, event, **kwargs):
+def _create_group(project: Project, event: Event, **kwargs: dict[str, Any]) -> Group:
     try:
         short_id = project.next_short_id()
     except OperationalError:
@@ -1580,33 +1642,36 @@ def _create_group(project, event, **kwargs):
     # when we queried for the release and now, so
     # make sure it still exists
     first_release = kwargs.pop("first_release", None)
+    first_release_id = (
+        Release.objects.filter(id=cast(Release, first_release).id)
+        .values_list("id", flat=True)
+        .first()
+        if first_release
+        else None
+    )
 
     return Group.objects.create(
         project=project,
         short_id=short_id,
-        first_release_id=Release.objects.filter(id=first_release.id)
-        .values_list("id", flat=True)
-        .first()
-        if first_release
-        else None,
+        first_release_id=first_release_id,
         **kwargs,
     )
 
 
-def _handle_regression(group, event, release):
+def _handle_regression(group: Group, event: Event, release: Release) -> Optional[bool]:
     if not group.is_resolved():
-        return
+        return None
 
     # we only mark it as a regression if the event's release is newer than
     # the release which we originally marked this as resolved
     elif GroupResolution.has_resolution(group, release):
-        return
+        return None
 
     elif has_pending_commit_resolution(group):
-        return
+        return None
 
     if not plugin_is_regression(group, event):
-        return
+        return None
 
     # we now think its a regression, rely on the database to validate that
     # no one beat us to this
@@ -1696,7 +1761,9 @@ def _handle_regression(group, event, release):
     return is_regression
 
 
-def _process_existing_aggregate(group, event, data, release):
+def _process_existing_aggregate(
+    group: Group, event: Event, data: Mapping[str, Any], release: Release
+) -> bool:
     date = max(event.datetime, group.last_seen)
     extra = {"last_seen": date, "data": data["data"]}
     if event.search_message and event.search_message != group.message:
@@ -1716,10 +1783,13 @@ def _process_existing_aggregate(group, event, data, release):
 
     buffer_incr(Group, update_kwargs, {"id": group.id}, extra)
 
-    return is_regression
+    return bool(is_regression)
+
 
+Attachment = Type[CachedAttachment]
 
-def discard_event(job, attachments):
+
+def discard_event(job: Job, attachments: Sequence[Attachment]) -> None:
     """
     Refunds consumed quotas for an event and its attachments.
 
@@ -1784,7 +1854,7 @@ def discard_event(job, attachments):
     )
 
 
-def get_attachments(cache_key, job):
+def get_attachments(cache_key: Optional[str], job: Job) -> list[Attachment]:
     """
     Retrieves the list of attachments for this event.
 
@@ -1809,7 +1879,7 @@ def get_attachments(cache_key, job):
     return [attachment for attachment in attachments if not attachment.rate_limited]
 
 
-def filter_attachments_for_group(attachments, job):
+def filter_attachments_for_group(attachments: list[Attachment], job: Job) -> list[Attachment]:
     """
     Removes crash reports exceeding the group-limit.
 
@@ -1835,6 +1905,10 @@ def filter_attachments_for_group(attachments, job):
     if max_crashreports is None:
         max_crashreports = get_max_crashreports(project.organization)
 
+    max_crashreports = cast(
+        int, max_crashreports
+    )  # this is safe since the second call doesn't allow None
+
     # The number of crash reports is cached per group
     crashreports_key = get_crashreport_key(event.group_id)
 
@@ -1901,8 +1975,14 @@ def filter_attachments_for_group(attachments, job):
 
 
 def save_attachment(
-    cache_key, attachment, project, event_id, key_id=None, group_id=None, start_time=None
-):
+    cache_key: Optional[str],
+    attachment: Attachment,
+    project: Project,
+    event_id: str,
+    key_id: Optional[int] = None,
+    group_id: Optional[int] = None,
+    start_time: Optional[Union[float, int]] = None,
+) -> None:
     """
     Persists a cached event attachments into the file store.
 
@@ -1974,13 +2054,14 @@ def save_attachment(
     )
 
 
-def save_attachments(cache_key, attachments, job):
+def save_attachments(cache_key: Optional[str], attachments: list[Attachment], job: Job) -> None:
     """
     Persists cached event attachments into the file store.
 
     Emits one outcome per attachment, either ACCEPTED on success or
     INVALID(missing_chunks) if retrieving the attachment fails.
-
+    :param cache_key:  The cache key at which the attachment is stored for
+                       debugging purposes.
     :param attachments: A filtered list of attachments to save.
     :param job:         The job context container.
     """
@@ -2000,7 +2081,7 @@ def save_attachments(cache_key, attachments, job):
 
 
 @metrics.wraps("event_manager.save_transactions.materialize_event_metrics")
-def _materialize_event_metrics(jobs):
+def _materialize_event_metrics(jobs: Sequence[Job]) -> None:
     for job in jobs:
         # Ensure the _metrics key exists. This is usually created during
         # and prefilled with ingestion sizes.
@@ -2018,7 +2099,9 @@ def _materialize_event_metrics(jobs):
 
 
 @metrics.wraps("save_event.calculate_event_grouping")
-def _calculate_event_grouping(project, event, grouping_config) -> CalculatedHashes:
+def _calculate_event_grouping(
+    project: Project, event: Event, grouping_config: GroupingConfig
+) -> CalculatedHashes:
     """
     Main entrypoint for modifying/enhancing and grouping an event, writes
     hashes back into event payload.
@@ -2061,11 +2144,11 @@ def _calculate_event_grouping(project, event, grouping_config) -> CalculatedHash
             hashes = event.get_hashes()
 
     hashes.write_to_event(event.data)
-    return hashes
+    return cast(CalculatedHashes, hashes)
 
 
 @metrics.wraps("save_event.calculate_span_grouping")
-def _calculate_span_grouping(jobs, projects):
+def _calculate_span_grouping(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         # Make sure this snippet doesn't crash ingestion
         # as the feature is under development.
@@ -2112,7 +2195,7 @@ def _calculate_span_grouping(jobs, projects):
 
 
 @metrics.wraps("save_event.detect_performance_problems")
-def _detect_performance_problems(jobs, projects):
+def _detect_performance_problems(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
     for job in jobs:
         job["performance_problems"] = detect_performance_problems(job["data"])
 
@@ -2120,6 +2203,7 @@ def _detect_performance_problems(jobs, projects):
 class PerformanceJob(TypedDict, total=False):
     performance_problems: Sequence[PerformanceProblem]
     event: Event
+    groups: list[GroupInfo]
     culprit: str
     received_timestamp: float
     event_metadata: Mapping[str, Any]
@@ -2130,7 +2214,7 @@ class PerformanceJob(TypedDict, total=False):
 
 
 def _save_grouphash_and_group(
-    project: Project, event: Event, new_grouphash: str, **group_kwargs
+    project: Project, event: Event, new_grouphash: str, **group_kwargs: dict[str, Any]
 ) -> Group:
     group = None
     with transaction.atomic():
@@ -2149,7 +2233,7 @@ def _save_grouphash_and_group(
 
 
 @metrics.wraps("save_event.save_aggregate_performance")
-def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects):
+def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects: ProjectsMapping) -> None:
 
     MAX_GROUPS = (
         10  # safety check in case we are passed too many. constant will live somewhere else tbd
@@ -2289,7 +2373,7 @@ def _save_aggregate_performance(jobs: Sequence[PerformanceJob], projects):
 
 
 @metrics.wraps("event_manager.save_transaction_events")
-def save_transaction_events(jobs, projects):
+def save_transaction_events(jobs: Sequence[Job], projects: ProjectsMapping) -> Sequence[Job]:
     with metrics.timer("event_manager.save_transactions.collect_organization_ids"):
         organization_ids = {project.organization_id for project in projects.values()}
 
@@ -2315,7 +2399,7 @@ def save_transaction_events(jobs, projects):
     _calculate_span_grouping(jobs, projects)
     _detect_performance_problems(jobs, projects)
     _materialize_metadata_many(jobs)
-    _save_aggregate_performance(jobs, projects)
+    _save_aggregate_performance(jobs, projects)  # type: ignore
     _get_or_create_environment_many(jobs, projects)
     _get_or_create_group_environment_many(jobs, projects)
     _get_or_create_release_associated_models(jobs, projects)

+ 7 - 1
src/sentry/grouping/api.py

@@ -1,4 +1,5 @@
 import re
+from typing import TypedDict
 
 from sentry import options
 from sentry.grouping.component import GroupingComponent
@@ -47,12 +48,17 @@ class GroupingConfigNotFound(LookupError):
     pass
 
 
+class GroupingConfig(TypedDict):
+    id: str
+    enhancements: Enhancements
+
+
 class GroupingConfigLoader:
     """Load a grouping config based on global or project options"""
 
     cache_prefix: str  # Set in subclasses
 
-    def get_config_dict(self, project):
+    def get_config_dict(self, project) -> GroupingConfig:
         return {
             "id": self._get_config_id(project),
             "enhancements": self._get_enhancements(project),

+ 2 - 1
src/sentry/lang/native/utils.py

@@ -1,5 +1,6 @@
 import logging
 import re
+from typing import Optional
 
 from sentry.attachments import attachment_cache
 from sentry.stacktraces.processing import find_stacktraces_in_data
@@ -114,7 +115,7 @@ def get_event_attachment(data, attachment_type):
     return next((a for a in attachments if a.type == attachment_type), None)
 
 
-def convert_crashreport_count(value, allow_none=False):
+def convert_crashreport_count(value, allow_none=False) -> Optional[int]:
     """
     Shim to read both legacy and new `sentry:store_crash_reports` project and
     organization options.

+ 1 - 1
tests/sentry/event_manager/test_event_manager.py

@@ -1342,7 +1342,7 @@ class EventManagerTest(TestCase, SnubaTestCase, EventManagerTestMixin):
 
         group_states2 = {
             "is_new": False,
-            "is_regression": None,  # XXX: wut
+            "is_regression": False,
             "is_new_group_environment": False,
         }