Browse Source

ref: Continue typing eventstream (#38858)

Type the eventstream Snuba backend. Also fixes some incorrect types in
the eventstream base class.
Lyn Nagara 2 years ago
parent
commit
02176bef9a
3 changed files with 111 additions and 66 deletions
  1. 1 0
      mypy.ini
  2. 7 5
      src/sentry/eventstream/base.py
  3. 103 61
      src/sentry/eventstream/snuba.py

+ 1 - 0
mypy.ini

@@ -43,6 +43,7 @@ files = src/sentry/analytics/,
         src/sentry/db/models/utils.py,
         src/sentry/digests/,
         src/sentry/eventstream/base.py,
+        src/sentry/eventstream/snuba.py,
         src/sentry/features/,
         src/sentry/grouping/result.py,
         src/sentry/grouping/strategies/base.py,

+ 7 - 5
src/sentry/eventstream/base.py

@@ -71,7 +71,7 @@ class EventStream(Service):
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,
-        primary_hash: str,
+        primary_hash: Optional[str],
         received_timestamp: float,
         skip_consume: bool = False,
     ) -> None:
@@ -86,7 +86,9 @@ class EventStream(Service):
             skip_consume,
         )
 
-    def start_delete_groups(self, project_id: int, group_ids: Sequence[int]) -> Mapping[str, Any]:
+    def start_delete_groups(
+        self, project_id: int, group_ids: Sequence[int]
+    ) -> Optional[Mapping[str, Any]]:
         pass
 
     def end_delete_groups(self, state: Mapping[str, Any]) -> None:
@@ -94,7 +96,7 @@ class EventStream(Service):
 
     def start_merge(
         self, project_id: int, previous_group_ids: Sequence[int], new_group_id: int
-    ) -> Mapping[str, Any]:
+    ) -> Optional[Mapping[str, Any]]:
         pass
 
     def end_merge(self, state: Mapping[str, Any]) -> None:
@@ -102,13 +104,13 @@ class EventStream(Service):
 
     def start_unmerge(
         self, project_id: int, hashes: Collection[str], previous_group_id: int, new_group_id: int
-    ) -> Mapping[str, Any]:
+    ) -> Optional[Mapping[str, Any]]:
         pass
 
     def end_unmerge(self, state: Mapping[str, Any]) -> None:
         pass
 
-    def start_delete_tag(self, project_id: int, tag: str) -> Mapping[str, Any]:
+    def start_delete_tag(self, project_id: int, tag: str) -> Optional[Mapping[str, Any]]:
         pass
 
     def end_delete_tag(self, state: Mapping[str, Any]) -> None:

+ 103 - 61
src/sentry/eventstream/snuba.py

@@ -1,6 +1,18 @@
+from __future__ import annotations
+
 import logging
 from datetime import datetime
-from typing import Any, Mapping, Optional, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Collection,
+    Mapping,
+    MutableMapping,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+)
 from uuid import uuid4
 
 import pytz
@@ -16,6 +28,9 @@ KW_SKIP_SEMANTIC_PARTITIONING = "skip_semantic_partitioning"
 
 logger = logging.getLogger(__name__)
 
+if TYPE_CHECKING:
+    from sentry.eventstore.models import BaseEvent
+
 
 # Version 1 format: (1, TYPE, [...REST...])
 #   Insert: (1, 'insert', {
@@ -78,31 +93,31 @@ class SnubaProtocolEventStream(EventStream):
 
     def _get_headers_for_insert(
         self,
-        event,
-        is_new,
-        is_regression,
-        is_new_group_environment,
-        primary_hash,
-        received_timestamp,  # type: float
-        skip_consume,
+        event: BaseEvent,
+        is_new: bool,
+        is_regression: bool,
+        is_new_group_environment: bool,
+        primary_hash: Optional[str],
+        received_timestamp: float,
+        skip_consume: bool,
     ) -> Mapping[str, str]:
         return {"Received-Timestamp": str(received_timestamp)}
 
     @staticmethod
-    def _is_transaction_event(event) -> bool:
-        return event.get_event_type() == "transaction"
+    def _is_transaction_event(event: BaseEvent) -> bool:
+        return event.get_event_type() == "transaction"  # type: ignore
 
     def insert(
         self,
-        event,
-        is_new,
-        is_regression,
-        is_new_group_environment,
-        primary_hash,
-        received_timestamp,  # type: float
-        skip_consume=False,
-        **kwargs,
-    ):
+        event: BaseEvent,
+        is_new: bool,
+        is_regression: bool,
+        is_new_group_environment: bool,
+        primary_hash: Optional[str],
+        received_timestamp: float,
+        skip_consume: bool = False,
+        **kwargs: Any,
+    ) -> None:
         project = event.project
         set_current_event_project(project.id)
         retention_days = quotas.get_event_retention(organization=project.organization)
@@ -166,9 +181,11 @@ class SnubaProtocolEventStream(EventStream):
             is_transaction_event=is_transaction_event,
         )
 
-    def start_delete_groups(self, project_id: int, group_ids):
+    def start_delete_groups(
+        self, project_id: int, group_ids: Sequence[int]
+    ) -> Optional[Mapping[str, Any]]:
         if not group_ids:
-            return
+            return None
 
         state = {
             "transaction_id": uuid4().hex,
@@ -181,16 +198,21 @@ class SnubaProtocolEventStream(EventStream):
 
         return state
 
-    def end_delete_groups(self, state):
-        state = state.copy()
-        state["datetime"] = datetime.now(tz=pytz.utc)
+    def end_delete_groups(self, state: Mapping[str, Any]) -> None:
+        state_copy: MutableMapping[str, Any] = {**state}
+        state_copy["datetime"] = datetime.now(tz=pytz.utc)
         self._send(
-            state["project_id"], "end_delete_groups", extra_data=(state,), asynchronous=False
+            state_copy["project_id"],
+            "end_delete_groups",
+            extra_data=(state_copy,),
+            asynchronous=False,
         )
 
-    def start_merge(self, project_id, previous_group_ids, new_group_id):
+    def start_merge(
+        self, project_id: int, previous_group_ids: Sequence[int], new_group_id: int
+    ) -> Optional[Mapping[str, Any]]:
         if not previous_group_ids:
-            return
+            return None
 
         state = {
             "transaction_id": uuid4().hex,
@@ -204,14 +226,18 @@ class SnubaProtocolEventStream(EventStream):
 
         return state
 
-    def end_merge(self, state):
-        state = state.copy()
-        state["datetime"] = datetime.now(tz=pytz.utc)
-        self._send(state["project_id"], "end_merge", extra_data=(state,), asynchronous=False)
+    def end_merge(self, state: Mapping[str, Any]) -> None:
+        state_copy: MutableMapping[str, Any] = {**state}
+        state_copy["datetime"] = datetime.now(tz=pytz.utc)
+        self._send(
+            state_copy["project_id"], "end_merge", extra_data=(state_copy,), asynchronous=False
+        )
 
-    def start_unmerge(self, project_id, hashes, previous_group_id, new_group_id):
+    def start_unmerge(
+        self, project_id: int, hashes: Collection[str], previous_group_id: int, new_group_id: int
+    ) -> Optional[Mapping[str, Any]]:
         if not hashes:
-            return
+            return None
 
         state = {
             "transaction_id": uuid4().hex,
@@ -226,14 +252,16 @@ class SnubaProtocolEventStream(EventStream):
 
         return state
 
-    def end_unmerge(self, state):
-        state = state.copy()
-        state["datetime"] = datetime.now(tz=pytz.utc)
-        self._send(state["project_id"], "end_unmerge", extra_data=(state,), asynchronous=False)
+    def end_unmerge(self, state: Mapping[str, Any]) -> None:
+        state_copy: MutableMapping[str, Any] = {**state}
+        state_copy["datetime"] = datetime.now(tz=pytz.utc)
+        self._send(
+            state_copy["project_id"], "end_unmerge", extra_data=(state_copy,), asynchronous=False
+        )
 
-    def start_delete_tag(self, project_id, tag):
+    def start_delete_tag(self, project_id: int, tag: str) -> Optional[Mapping[str, Any]]:
         if not tag:
-            return
+            return None
 
         state = {
             "transaction_id": uuid4().hex,
@@ -246,14 +274,22 @@ class SnubaProtocolEventStream(EventStream):
 
         return state
 
-    def end_delete_tag(self, state):
-        state = state.copy()
-        state["datetime"] = datetime.now(tz=pytz.utc)
-        self._send(state["project_id"], "end_delete_tag", extra_data=(state,), asynchronous=False)
+    def end_delete_tag(self, state: Mapping[str, Any]) -> None:
+        state_copy: MutableMapping[str, Any] = {**state}
+        state_copy["datetime"] = datetime.now(tz=pytz.utc)
+        self._send(
+            state_copy["project_id"], "end_delete_tag", extra_data=(state_copy,), asynchronous=False
+        )
 
     def tombstone_events_unsafe(
-        self, project_id, event_ids, old_primary_hash=False, from_timestamp=None, to_timestamp=None
-    ):
+        self,
+        project_id: int,
+        event_ids: Sequence[str],
+        old_primary_hash: Union[str, bool] = False,
+        from_timestamp: Optional[datetime] = None,
+        to_timestamp: Optional[datetime] = None,
+    ) -> None:
+
         """
         Tell Snuba to eventually delete these events.
 
@@ -286,8 +322,14 @@ class SnubaProtocolEventStream(EventStream):
         self._send(project_id, "tombstone_events", extra_data=(state,), asynchronous=False)
 
     def replace_group_unsafe(
-        self, project_id, event_ids, new_group_id, from_timestamp=None, to_timestamp=None
-    ):
+        self,
+        project_id: int,
+        event_ids: Sequence[str],
+        new_group_id: int,
+        from_timestamp: Optional[datetime] = None,
+        to_timestamp: Optional[datetime] = None,
+    ) -> None:
+        pass
         """
         Tell Snuba to move events into a new group ID
 
@@ -303,7 +345,7 @@ class SnubaProtocolEventStream(EventStream):
         }
         self._send(project_id, "replace_group", extra_data=(state,), asynchronous=False)
 
-    def exclude_groups(self, project_id, group_ids):
+    def exclude_groups(self, project_id: int, group_ids: Sequence[int]) -> None:
         """
         Exclude a group from queries for a while until event tombstoning takes
         effect. See docstring of `tombstone_events`.
@@ -323,7 +365,7 @@ class SnubaProtocolEventStream(EventStream):
         headers: Optional[Mapping[str, str]] = None,
         skip_semantic_partitioning: bool = False,
         is_transaction_event: bool = False,
-    ):
+    ) -> None:
         raise NotImplementedError
 
 
@@ -337,7 +379,7 @@ class SnubaEventStream(SnubaProtocolEventStream):
         headers: Optional[Mapping[str, str]] = None,
         skip_semantic_partitioning: bool = False,
         is_transaction_event: bool = False,
-    ):
+    ) -> None:
         if headers is None:
             headers = {}
 
@@ -355,24 +397,24 @@ class SnubaEventStream(SnubaProtocolEventStream):
             )
             if resp.status != 200:
                 raise snuba.SnubaError("HTTP %s response from Snuba!" % resp.status)
-            return resp
+            return None
         except urllib3.exceptions.HTTPError as err:
             raise snuba.SnubaError(err)
 
-    def requires_post_process_forwarder(self):
+    def requires_post_process_forwarder(self) -> bool:
         return False
 
     def insert(
         self,
-        event,
-        is_new,
-        is_regression,
-        is_new_group_environment,
-        primary_hash,
-        received_timestamp,  # type: float
-        skip_consume=False,
-        **kwargs,
-    ):
+        event: BaseEvent,
+        is_new: bool,
+        is_regression: bool,
+        is_new_group_environment: bool,
+        primary_hash: Optional[str],
+        received_timestamp: float,
+        skip_consume: bool = False,
+        **kwargs: Any,
+    ) -> None:
         super().insert(
             event,
             is_new,