Browse Source

fix(post-process-forwarder): Fix parsing of non insert messages (#27939)

This change ensures that `operation` and `version` are present on
all messages sent on the events topic (not just the "insert" messages).

Now `get_task_kwargs_for_message_from_headers` should not throw an exception
on non insert messages that are missing expected headers and fall back to the
previous implementation. Since we don't need event_data or task_state for
other message types, we can also skip parsing the message values for these types.

Fixes SENTRY-RWM
Lyn Nagara 3 years ago
parent
commit
5df7be5285
2 changed files with 52 additions and 46 deletions
  1. 2 2
      src/sentry/eventstream/kafka/backend.py
  2. 50 44
      src/sentry/eventstream/kafka/protocol.py

+ 2 - 2
src/sentry/eventstream/kafka/backend.py

@@ -73,8 +73,6 @@ class KafkaEventStream(SnubaProtocolEventStream):
                     "is_new": encode_bool(is_new),
                     "is_new_group_environment": encode_bool(is_new_group_environment),
                     "is_regression": encode_bool(is_regression),
-                    "version": str(self.EVENT_PROTOCOL_VERSION),
-                    "operation": "insert",
                     "skip_consume": encode_bool(skip_consume),
                 }
             )
@@ -100,6 +98,8 @@ class KafkaEventStream(SnubaProtocolEventStream):
     ):
         if headers is None:
             headers = {}
+        headers["operation"] = _type
+        headers["version"] = str(self.EVENT_PROTOCOL_VERSION)
 
         # Polling the producer is required to ensure callbacks are fired. This
         # means that the latency between a message being delivered (or failing

+ 50 - 44
src/sentry/eventstream/kafka/protocol.py

@@ -104,59 +104,65 @@ def get_task_kwargs_for_message_from_headers(headers: Sequence[Tuple[str, Option
     Same as get_task_kwargs_for_message but gets the required information from
     the kafka message headers.
     """
-    try:
-        header_data = {k: v for k, v in headers}
-        if "group_id" not in header_data:
-            header_data["group_id"] = None
-        if "primary_hash" not in header_data:
-            header_data["primary_hash"] = None
 
-        def decode_str(value: Optional[bytes]) -> str:
-            assert isinstance(value, bytes)
-            return value.decode("utf-8")
+    def decode_str(value: Optional[bytes]) -> str:
+        assert isinstance(value, bytes)
+        return value.decode("utf-8")
 
-        def decode_optional_str(value: Optional[bytes]) -> Optional[str]:
-            if value is None:
-                return None
-            return decode_str(value)
+    def decode_optional_str(value: Optional[bytes]) -> Optional[str]:
+        if value is None:
+            return None
+        return decode_str(value)
 
-        def decode_int(value: Optional[bytes]) -> int:
-            assert isinstance(value, bytes)
-            return int(value)
+    def decode_int(value: Optional[bytes]) -> int:
+        assert isinstance(value, bytes)
+        return int(value)
 
-        def decode_optional_int(value: Optional[bytes]) -> Optional[int]:
-            if value is None:
-                return None
-            return decode_int(value)
+    def decode_optional_int(value: Optional[bytes]) -> Optional[int]:
+        if value is None:
+            return None
+        return decode_int(value)
 
-        def decode_bool(value: bytes) -> bool:
-            return bool(int(decode_str(value)))
+    def decode_bool(value: bytes) -> bool:
+        return bool(int(decode_str(value)))
 
+    try:
+        header_data = {k: v for k, v in headers}
         version = decode_int(header_data["version"])
         operation = decode_str(header_data["operation"])
-        primary_hash = decode_optional_str(header_data["primary_hash"])
-        event_id = decode_str(header_data["event_id"])
-        group_id = decode_optional_int(header_data["group_id"])
-        project_id = decode_int(header_data["project_id"])
-
-        event_data = {
-            "event_id": event_id,
-            "group_id": group_id,
-            "project_id": project_id,
-            "primary_hash": primary_hash,
-        }
-
-        skip_consume = decode_bool(header_data["skip_consume"])
-        is_new = decode_bool(header_data["is_new"])
-        is_regression = decode_bool(header_data["is_regression"])
-        is_new_group_environment = decode_bool(header_data["is_new_group_environment"])
 
-        task_state = {
-            "skip_consume": skip_consume,
-            "is_new": is_new,
-            "is_regression": is_regression,
-            "is_new_group_environment": is_new_group_environment,
-        }
+        if operation == "insert":
+            if "group_id" not in header_data:
+                header_data["group_id"] = None
+            if "primary_hash" not in header_data:
+                header_data["primary_hash"] = None
+
+            primary_hash = decode_optional_str(header_data["primary_hash"])
+            event_id = decode_str(header_data["event_id"])
+            group_id = decode_optional_int(header_data["group_id"])
+            project_id = decode_int(header_data["project_id"])
+
+            event_data = {
+                "event_id": event_id,
+                "group_id": group_id,
+                "project_id": project_id,
+                "primary_hash": primary_hash,
+            }
+
+            skip_consume = decode_bool(header_data["skip_consume"])
+            is_new = decode_bool(header_data["is_new"])
+            is_regression = decode_bool(header_data["is_regression"])
+            is_new_group_environment = decode_bool(header_data["is_new_group_environment"])
+
+            task_state = {
+                "skip_consume": skip_consume,
+                "is_new": is_new,
+                "is_regression": is_regression,
+                "is_new_group_environment": is_new_group_environment,
+            }
+        else:
+            event_data = {}
+            task_state = {}
 
     except Exception:
         raise InvalidPayload("Received event payload with unexpected structure")