Browse Source

ref: fix types for sentry.tasks.store (#71006)

<!-- Describe your PR here. -->
anthony sottile 10 months ago
parent
commit
48d7024e64

+ 1 - 1
pyproject.toml

@@ -449,7 +449,6 @@ module = [
     "sentry.tasks.integrations.sync_status_outbound",
     "sentry.tasks.process_buffer",
     "sentry.tasks.sentry_apps",
-    "sentry.tasks.store",
     "sentry.templatetags.sentry_assets",
     "sentry.templatetags.sentry_helpers",
     "sentry.templatetags.sentry_plugins",
@@ -635,6 +634,7 @@ module = [
     "sentry.tasks.commit_context",
     "sentry.tasks.on_demand_metrics",
     "sentry.tasks.reprocessing2",
+    "sentry.tasks.store",
     "sentry.types.actor",
     "sentry.types.region",
     "sentry.utils.arroyo",

+ 2 - 2
src/sentry/event_manager.py

@@ -409,7 +409,7 @@ class EventManager:
         project_id: int | None,
         raw: bool = False,
         assume_normalized: bool = False,
-        start_time: int | None = None,
+        start_time: float | None = None,
         cache_key: str | None = None,
         skip_send_first_transaction: bool = False,
         has_attachments: bool = False,
@@ -2664,7 +2664,7 @@ def save_attachment(
     event_id: str,
     key_id: int | None = None,
     group_id: int | None = None,
-    start_time: float | int | None = None,
+    start_time: float | None = None,
 ) -> None:
     """
     Persists a cached event attachments into the file store.

+ 3 - 1
src/sentry/reprocessing.py

@@ -1,5 +1,7 @@
 import logging
 import uuid
+from collections.abc import Mapping
+from typing import Any
 
 import sentry_sdk
 
@@ -15,7 +17,7 @@ SENT_NOTIFICATION_OPTION = "sentry:sent_failed_event_hint"
 logger = logging.getLogger("sentry.events")
 
 
-def event_supports_reprocessing(data):
+def event_supports_reprocessing(data: Mapping[str, Any]) -> bool:
     """Only events of a certain format support reprocessing."""
     from sentry.lang.native.utils import NATIVE_PLATFORMS
     from sentry.stacktraces.platform import JAVASCRIPT_PLATFORMS

+ 40 - 40
src/sentry/tasks/store.py

@@ -1,5 +1,6 @@
 import logging
 import random
+from collections.abc import Mapping
 from dataclasses import dataclass
 from datetime import datetime, timezone
 from time import time
@@ -41,7 +42,7 @@ class RetryProcessing(Exception):
     pass
 
 
-def should_process(data: CanonicalKeyDict) -> bool:
+def should_process(data: CanonicalKeyDict[Any]) -> bool:
     """Quick check if processing is needed at all."""
     from sentry.plugins.base import plugins
 
@@ -65,7 +66,7 @@ def submit_process(
     from_reprocessing: bool,
     cache_key: str,
     event_id: str | None,
-    start_time: int | None,
+    start_time: float | None,
     data_has_changed: bool = False,
     from_symbolicate: bool = False,
     has_attachments: bool = False,
@@ -99,7 +100,7 @@ def submit_save_event(
     project_id: int,
     cache_key: str | None,
     event_id: str | None,
-    start_time: int | None,
+    start_time: float | None,
     data: Event | None,
 ) -> None:
     if cache_key:
@@ -125,7 +126,7 @@ def submit_save_event(
 def _do_preprocess_event(
     cache_key: str,
     data: Event | None,
-    start_time: int | None,
+    start_time: float | None,
     event_id: str | None,
     from_reprocessing: bool,
     project: Project | None,
@@ -188,13 +189,12 @@ def _do_preprocess_event(
             reprocessing2.backup_unprocessed_event(data=original_data)
 
             is_low_priority = should_demote_symbolication(project_id)
-            task_kind = SymbolicatorTaskKind(
-                platform=first_platform,
-                is_low_priority=is_low_priority,
-                is_reprocessing=from_reprocessing,
-            )
             submit_symbolicate(
-                task_kind,
+                SymbolicatorTaskKind(
+                    platform=first_platform,
+                    is_low_priority=is_low_priority,
+                    is_reprocessing=from_reprocessing,
+                ),
                 cache_key=cache_key,
                 event_id=event_id,
                 start_time=start_time,
@@ -217,12 +217,11 @@ def _do_preprocess_event(
         )
         return
 
-    task_kind = SaveEventTaskKind(
-        has_attachments=has_attachments,
-        from_reprocessing=from_reprocessing,
-    )
     submit_save_event(
-        task_kind,
+        SaveEventTaskKind(
+            has_attachments=has_attachments,
+            from_reprocessing=from_reprocessing,
+        ),
         project_id=project_id,
         cache_key=cache_key,
         event_id=event_id,
@@ -241,7 +240,7 @@ def _do_preprocess_event(
 def preprocess_event(
     cache_key: str,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project: Project | None = None,
     has_attachments: bool = False,
@@ -315,7 +314,7 @@ def normalize_event(data: Any) -> Any:
 
 def do_process_event(
     cache_key: str,
-    start_time: int | None,
+    start_time: float | None,
     event_id: str | None,
     from_reprocessing: bool,
     data: Event | None = None,
@@ -340,7 +339,7 @@ def do_process_event(
     project_id = data["project"]
     set_current_event_project(project_id)
 
-    event_id = data["event_id"]
+    data_event_id = data["event_id"]
 
     def _continue_to_save_event() -> None:
         task_kind = SaveEventTaskKind(
@@ -351,12 +350,12 @@ def do_process_event(
             task_kind,
             project_id=project_id,
             cache_key=cache_key,
-            event_id=event_id,
+            event_id=data_event_id,
             start_time=start_time,
             data=data,
         )
 
-    if is_process_disabled(project_id, event_id, data.get("platform") or "null"):
+    if is_process_disabled(project_id, data_event_id, data.get("platform") or "null"):
         return _continue_to_save_event()
 
     # NOTE: This span ranges in the 1-2ms range.
@@ -450,7 +449,7 @@ def do_process_event(
                 data,
                 project_id,
                 list(issues.values()),
-                event_id=event_id,
+                event_id=data_event_id,
                 start_time=start_time,
                 reprocessing_rev=reprocessing_rev,
             ):
@@ -463,7 +462,7 @@ def do_process_event(
                 cache_key=cache_key,
                 data=data,
                 start_time=start_time,
-                event_id=event_id,
+                event_id=data_event_id,
                 from_reprocessing=from_reprocessing,
                 project=project,
             )
@@ -483,7 +482,7 @@ def do_process_event(
 )
 def process_event(
     cache_key: str,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     data_has_changed: bool = False,
     from_symbolicate: bool = False,
@@ -520,7 +519,7 @@ def process_event(
 )
 def process_event_proguard(
     cache_key: str,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     data_has_changed: bool = False,
     from_symbolicate: bool = False,
@@ -555,7 +554,7 @@ def process_event_proguard(
 )
 def process_event_from_reprocessing(
     cache_key: str,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     data_has_changed: bool = False,
     from_symbolicate: bool = False,
@@ -600,11 +599,11 @@ def delete_raw_event(project_id: int, event_id: str | None) -> None:
 @sentry_sdk.tracing.trace
 def create_failed_event(
     cache_key: str,
-    data: Event | None,
+    data: Mapping[str, Any],
     project_id: int,
     issues: list[dict[str, str]],
     event_id: str | None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     reprocessing_rev: Any = None,
 ) -> bool:
     """If processing failed we put the original data from the cache into a
@@ -657,22 +656,22 @@ def create_failed_event(
     # from the last processing step because we do not want any
     # modifications to take place.
     delete_raw_event(project_id, event_id)
-    data = processing.event_processing_store.get(cache_key)
+    original_data = processing.event_processing_store.get(cache_key)
 
-    if data is None:
+    if original_data is None:
         metrics.incr("events.failed", tags={"reason": "cache", "stage": "raw"}, skip_internal=False)
         error_logger.error("process.failed_raw.empty", extra={"cache_key": cache_key})
         return True
 
-    data = CanonicalKeyDict(data)
+    original_data = CanonicalKeyDict(original_data)
     from sentry.models.processingissue import ProcessingIssue
     from sentry.models.rawevent import RawEvent
 
     raw_event = RawEvent.objects.create(
         project_id=project_id,
         event_id=event_id,
-        datetime=datetime.fromtimestamp(data["timestamp"], timezone.utc),
-        data=data,
+        datetime=datetime.fromtimestamp(original_data["timestamp"], timezone.utc),
+        data=original_data,
     )
 
     for issue in issues:
@@ -692,7 +691,7 @@ def create_failed_event(
 def _do_save_event(
     cache_key: str | None = None,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project_id: int | None = None,
     has_attachments: bool = False,
@@ -861,7 +860,7 @@ def time_synthetic_monitoring_event(data: Event, project_id: int, start_time: fl
 def save_event(
     cache_key: str | None = None,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project_id: int | None = None,
     **kwargs: Any,
@@ -879,7 +878,7 @@ def save_event(
 def save_event_transaction(
     cache_key: str | None = None,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project_id: int | None = None,
     **kwargs: Any,
@@ -895,10 +894,11 @@ def save_event_transaction(
 )
 def save_event_feedback(
     cache_key: str | None = None,
-    data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
-    project_id: int | None = None,
+    *,
+    data: Mapping[str, Any],
+    project_id: int,
     **kwargs: Any,
 ) -> None:
     create_feedback_issue(data, project_id, FeedbackCreationSource.NEW_FEEDBACK_ENVELOPE)
@@ -914,7 +914,7 @@ def save_event_feedback(
 def save_event_attachments(
     cache_key: str | None = None,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project_id: int | None = None,
     **kwargs: Any,
@@ -934,7 +934,7 @@ def save_event_attachments(
 def save_event_highcpu(
     cache_key: str | None = None,
     data: Event | None = None,
-    start_time: int | None = None,
+    start_time: float | None = None,
     event_id: str | None = None,
     project_id: int | None = None,
     **kwargs: Any,

+ 3 - 3
src/sentry/tasks/symbolication.py

@@ -135,7 +135,7 @@ class SymbolicationTimeout(Exception):
 def _do_symbolicate_event(
     task_kind: SymbolicatorTaskKind,
     cache_key: str,
-    start_time: int | None,
+    start_time: float | None,
     event_id: str | None,
     data: Event | None = None,
     queue_switches: int = 0,
@@ -355,7 +355,7 @@ def submit_symbolicate(
     task_kind: SymbolicatorTaskKind,
     cache_key: str,
     event_id: str | None,
-    start_time: int | None,
+    start_time: float | None,
     queue_switches: int = 0,
     has_attachments: bool = False,
     symbolicate_platforms: list[SymbolicatorPlatform] | None = None,
@@ -403,7 +403,7 @@ def make_task_fn(name: str, queue: str, task_kind: SymbolicatorTaskKind) -> Symb
     )
     def symbolication_fn(
         cache_key: str,
-        start_time: int | None = None,
+        start_time: float | None = None,
         event_id: str | None = None,
         data: Event | None = None,
         queue_switches: int = 0,