Browse Source

fix(issues): Add partition key for status change kafka msg (copy) (#71673)

Copied from https://github.com/getsentry/sentry/pull/71628, which I
reverted due to CI problems after merging yday afternoon. Will be around
to monitor this one fully

Previously status change had no key. There are no order guarantees
across partitions, resulting in status changes being processed before
the first occurrence/group was created.
Andrew Liu 9 months ago
parent
commit
ce4fd9ea68
2 changed files with 80 additions and 0 deletions
  1. 2 0
      src/sentry/issues/producer.py
  2. 78 0
      tests/sentry/issues/test_producer.py

+ 2 - 0
src/sentry/issues/producer.py

@@ -67,6 +67,8 @@ def produce_occurrence_to_kafka(
     partition_key = None
     if occurrence and occurrence.fingerprint:
         partition_key = occurrence.fingerprint[0].encode()
+    elif status_change and status_change.fingerprint:
+        partition_key = status_change.fingerprint[0].encode()
     payload = KafkaPayload(partition_key, json.dumps(payload_data).encode("utf-8"), [])
     if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
         # If we're not running Kafka then we're just in dev.

+ 78 - 0
tests/sentry/issues/test_producer.py

@@ -376,3 +376,81 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
         )
         assert status_change_1.id
         assert status_change_1.id != status_change_2.id
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        status_change = StatusChangeMessage(
+            project_id=self.project.id,
+            fingerprint=["group-1"],
+            new_status=GroupStatus.RESOLVED,
+            new_substatus=GroupSubStatus.FOREVER,
+        )
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=status_change,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                status_change.fingerprint[0].encode(),
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key_no_fingerprint(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        status_change = StatusChangeMessage(
+            project_id=self.project.id,
+            fingerprint=[],
+            new_status=GroupStatus.RESOLVED,
+            new_substatus=GroupSubStatus.FOREVER,
+        )
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=status_change,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                None,
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )
+
+    @patch(
+        "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
+    )
+    @patch("sentry.issues.producer._occurrence_producer.produce")
+    @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
+    def test_payload_sent_to_kafka_with_partition_key_no_status_change(
+        self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
+    ) -> None:
+        produce_occurrence_to_kafka(
+            payload_type=PayloadType.STATUS_CHANGE,
+            status_change=None,
+            event_data={},
+        )
+        mock_produce.assert_called_once_with(
+            ArroyoTopic(name="ingest-occurrences"),
+            KafkaPayload(
+                None,
+                json.dumps({"mock_data": "great"}).encode("utf-8"),
+                [],
+            ),
+        )