Browse Source

feat(eventstream): Guard against `GroupEvent` models being passed to `EventStream.insert`. (#38188)

`Groupevent` is intended to be a view and not written to eventstream.
Guarding against that just to be overcautious.
Dan Fuller 2 years ago
parent
commit
632d6701ef

+ 2 - 2
src/sentry/eventstream/base.py

@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
 
 
 if TYPE_CHECKING:
-    from sentry.eventstore.models import BaseEvent
+    from sentry.eventstore.models import Event
 
 
 class ForwarderNotRequired(NotImplementedError):
@@ -67,7 +67,7 @@ class EventStream(Service):
 
     def insert(
         self,
-        event: BaseEvent,
+        event: Event,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,

+ 12 - 5
src/sentry/eventstream/snuba.py

@@ -19,6 +19,7 @@ import pytz
 import urllib3
 
 from sentry import quotas
+from sentry.eventstore.models import GroupEvent
 from sentry.eventstream.base import EventStream
 from sentry.utils import json, snuba
 from sentry.utils.safe import get_path
@@ -29,7 +30,7 @@ KW_SKIP_SEMANTIC_PARTITIONING = "skip_semantic_partitioning"
 logger = logging.getLogger(__name__)
 
 if TYPE_CHECKING:
-    from sentry.eventstore.models import BaseEvent
+    from sentry.eventstore.models import Event
 
 
 # Version 1 format: (1, TYPE, [...REST...])
@@ -93,7 +94,7 @@ class SnubaProtocolEventStream(EventStream):
 
     def _get_headers_for_insert(
         self,
-        event: BaseEvent,
+        event: Event,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,
@@ -104,12 +105,12 @@ class SnubaProtocolEventStream(EventStream):
         return {"Received-Timestamp": str(received_timestamp)}
 
     @staticmethod
-    def _is_transaction_event(event: BaseEvent) -> bool:
+    def _is_transaction_event(event: Event) -> bool:
         return event.get_event_type() == "transaction"  # type: ignore
 
     def insert(
         self,
-        event: BaseEvent,
+        event: Event,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,
@@ -118,6 +119,12 @@ class SnubaProtocolEventStream(EventStream):
         skip_consume: bool = False,
         **kwargs: Any,
     ) -> None:
+        if isinstance(event, GroupEvent):
+            logger.error(
+                "`GroupEvent` passed to `EventStream.insert`. Only `Event` is allowed here.",
+                exc_info=True,
+            )
+            return
         project = event.project
         set_current_event_project(project.id)
         retention_days = quotas.get_event_retention(organization=project.organization)
@@ -406,7 +413,7 @@ class SnubaEventStream(SnubaProtocolEventStream):
 
     def insert(
         self,
-        event: BaseEvent,
+        event: Event,
         is_new: bool,
         is_regression: bool,
         is_new_group_environment: bool,

+ 22 - 0
tests/snuba/eventstream/test_eventstream.py

@@ -154,3 +154,25 @@ class SnubaEventStreamTest(TestCase, SnubaTestCase):
         )
         assert len(result["data"]) == 1
         assert result["data"][0]["group_ids"] == [self.group.id]
+
+    @patch("sentry.eventstream.snuba.logger")
+    def test_invalid_groupevent_passed(self, logger):
+        event = self.__build_transaction_event()
+        event.group_id = None
+        event.group_ids = [self.group.id]
+        insert_args = ()
+        insert_kwargs = {
+            "event": event.for_group(self.group),
+            "is_new_group_environment": True,
+            "is_new": True,
+            "is_regression": False,
+            "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
+            "skip_consume": False,
+            "received_timestamp": event.data["received"],
+        }
+        self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
+        assert not self.producer_mock.produce.called
+        logger.error.assert_called_with(
+            "`GroupEvent` passed to `EventStream.insert`. Only `Event` is allowed here.",
+            exc_info=True,
+        )