Browse Source

fix(issues): Add partition key for status change kafka msg (#71628)

Reopened from https://github.com/getsentry/sentry/pull/71620
Previously status change had no key. There are no order guarantees
across partitions, resulting in status changes being processed before
the first occurrence/group was created.
Andrew Liu 9 months ago
parent
commit
488e686f5b
2 changed files with 80 additions and 0 deletions
  1. 2 0
      src/sentry/issues/producer.py
  2. 78 0
      tests/sentry/issues/test_producer.py

+ 2 - 0
src/sentry/issues/producer.py

@@ -67,6 +67,8 @@ def produce_occurrence_to_kafka(
     partition_key = None
     if occurrence and occurrence.fingerprint:
         partition_key = occurrence.fingerprint[0].encode()
+    elif status_change and status_change.fingerprint:
+        partition_key = status_change.fingerprint[0].encode()
     payload = KafkaPayload(partition_key, json.dumps(payload_data).encode("utf-8"), [])
     if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
         # If we're not running Kafka then we're just in dev.

+ 78 - 0
tests/sentry/issues/test_producer.py

@@ -376,3 +376,81 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
         )
         assert status_change_1.id
         assert status_change_1.id != status_change_2.id
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        status_change = StatusChangeMessage(
+            project_id=self.project.id,
+            fingerprint=["group-1"],
+            new_status=GroupStatus.RESOLVED,
+            new_substatus=GroupSubStatus.FOREVER,
+        )
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=status_change,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                status_change.fingerprint[0].encode(),
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key_no_fingerprint(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        status_change = StatusChangeMessage(
+            project_id=self.project.id,
+            fingerprint=[],
+            new_status=GroupStatus.RESOLVED,
+            new_substatus=GroupSubStatus.FOREVER,
+        )
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=status_change,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                None,
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key_no_status_change(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=None,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                None,
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )