Browse Source

ref(issue-platform): Generate and process dummy Kafka messages in dev (#61362)

Generate and process dummy Kafka messages in dev instead of duplicating
the logic to evaluate messages of various payload types.

Fixes #61361
Snigdha Sharma 1 year ago
parent
commit
3f702ce610
2 changed files with 25 additions and 47 deletions
  1. 8 36
      src/sentry/issues/producer.py
  2. 17 11
      tests/sentry/issues/test_producer.py

+ 8 - 36
src/sentry/issues/producer.py

@@ -5,11 +5,12 @@ from typing import Any, Dict, MutableMapping, Optional, cast
 
 from arroyo import Topic
 from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
+from arroyo.types import Message, Value
 from django.conf import settings
 
 from sentry import features
 from sentry.issues.issue_occurrence import IssueOccurrence
-from sentry.issues.status_change_consumer import bulk_get_groups_from_fingerprints, update_status
+from sentry.issues.run import process_message
 from sentry.issues.status_change_message import StatusChangeMessage
 from sentry.models.project import Project
 from sentry.services.hybrid_cloud import ValueEqualityEnum
@@ -56,6 +57,12 @@ def produce_occurrence_to_kafka(
         return
 
     payload = KafkaPayload(None, json.dumps(payload_data).encode("utf-8"), [])
+    if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
+        # If we're not running Kafka then we're just in dev.
+        # Skip producing to Kafka and just process the message directly
+        process_message(Message(Value(payload=payload, committable={})))
+        return
+
     _occurrence_producer.produce(Topic(settings.KAFKA_INGEST_OCCURRENCES), payload)
 
 
@@ -66,22 +73,6 @@ def _prepare_occurrence_message(
         raise ValueError("occurrence must be provided")
     if event_data and occurrence.event_id != event_data["event_id"]:
         raise ValueError("Event id on occurrence and event_data must be the same")
-    if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
-        # If we're not running Kafka then we're just in dev. Skip producing to Kafka and just
-        # write to the issue platform directly
-        from sentry.issues.ingest import process_occurrence_data
-        from sentry.issues.occurrence_consumer import (
-            lookup_event_and_process_issue_occurrence,
-            process_event_and_issue_occurrence,
-        )
-
-        occurrence_dict = occurrence.to_dict()
-        process_occurrence_data(occurrence_dict)
-        if event_data:
-            process_event_and_issue_occurrence(occurrence_dict, event_data)
-        else:
-            lookup_event_and_process_issue_occurrence(occurrence_dict)
-        return None
 
     payload_data = cast(MutableMapping[str, Any], occurrence.to_dict())
     payload_data["payload_type"] = PayloadType.OCCURRENCE.value
@@ -101,25 +92,6 @@ def _prepare_status_change_message(
     if not features.has("organizations:issue-platform-api-crons-sd", organization):
         return None
 
-    if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
-        # Do the change
-        # If we're not running Kafka then we're just in dev. Skip producing to Kafka and just
-        # write to the issue platform directly
-        from sentry.issues.ingest import process_occurrence_data
-
-        process_occurrence_data(status_change.to_dict())
-        fingerprint = status_change.fingerprint
-        groups_by_fingerprints = bulk_get_groups_from_fingerprints(
-            [(status_change.project_id, fingerprint)]
-        )
-
-        key = (status_change.project_id, fingerprint[0])
-        group = groups_by_fingerprints.get(key, None)
-        if not group:
-            return None
-        update_status(group, status_change.to_dict())
-        return None
-
     payload_data = cast(MutableMapping[str, Any], status_change.to_dict())
     payload_data["payload_type"] = PayloadType.STATUS_CHANGE.value
     return payload_data

+ 17 - 11
tests/sentry/issues/test_producer.py

@@ -4,12 +4,12 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
+from sentry.issues.ingest import process_occurrence_data
 from sentry.issues.issue_occurrence import IssueOccurrence
 from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
 from sentry.issues.status_change_message import StatusChangeMessage
 from sentry.models.activity import Activity
 from sentry.models.group import GroupStatus
-from sentry.models.grouphash import GroupHash
 from sentry.models.grouphistory import STRING_TO_STATUS_LOOKUP, GroupHistory, GroupHistoryStatus
 from sentry.testutils.cases import TestCase
 from sentry.testutils.helpers.datetime import before_now, iso_format
@@ -23,6 +23,7 @@ from tests.sentry.issues.test_utils import OccurrenceTestMixin
 pytestmark = [requires_snuba]
 
 
+@apply_feature_flag_on_cls("organizations:profile-file-io-main-thread-ingest")
 class TestProduceOccurrenceToKafka(TestCase, OccurrenceTestMixin):
     def test_event_id_mismatch(self) -> None:
         with self.assertRaisesMessage(
@@ -35,7 +36,7 @@ class TestProduceOccurrenceToKafka(TestCase, OccurrenceTestMixin):
             )
 
     def test_with_event(self) -> None:
-        occurrence = self.build_occurrence()
+        occurrence = self.build_occurrence(project_id=self.project.id)
         produce_occurrence_to_kafka(
             payload_type=PayloadType.OCCURRENCE,
             occurrence=occurrence,
@@ -68,18 +69,18 @@ class TestProduceOccurrenceToKafka(TestCase, OccurrenceTestMixin):
 @apply_feature_flag_on_cls("organizations:issue-platform-api-crons-sd")
 class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
     def setUp(self):
+        self.fingerprint = ["group-1"]
         self.event = self.store_event(
             data={
                 "event_id": "a" * 32,
                 "message": "oh no",
                 "timestamp": iso_format(datetime.now()),
-                "fingerprint": ["group-1"],
+                "fingerprint": self.fingerprint,
             },
             project_id=self.project.id,
         )
         self.group = self.event.group
         assert self.group
-        self.group_hash = GroupHash.objects.filter(group=self.group, project=self.project).first()
         self.initial_status = self.group.status
         self.initial_substatus = self.group.substatus
 
@@ -102,7 +103,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
 
     def test_with_no_status_change(self) -> None:
         status_change = StatusChangeMessage(
-            fingerprint=[self.group_hash.hash],
+            fingerprint=self.fingerprint,
             project_id=self.group.project_id,
             new_status=self.initial_status,
             new_substatus=self.initial_substatus,
@@ -120,7 +121,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
 
     def test_with_status_change_resolved(self) -> None:
         status_change = StatusChangeMessage(
-            fingerprint=[self.group_hash.hash],
+            fingerprint=self.fingerprint,
             project_id=self.group.project_id,
             new_status=GroupStatus.RESOLVED,
             new_substatus=None,
@@ -147,7 +148,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
             GroupSubStatus.FOREVER,
         ]:
             status_change = StatusChangeMessage(
-                fingerprint=[self.group_hash.hash],
+                fingerprint=self.fingerprint,
                 project_id=self.group.project_id,
                 new_status=GroupStatus.IGNORED,
                 new_substatus=substatus,
@@ -177,7 +178,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
             (GroupSubStatus.REGRESSED, ActivityType.SET_REGRESSION),
         ]:
             status_change = StatusChangeMessage(
-                fingerprint=[self.group_hash.hash],
+                fingerprint=self.fingerprint,
                 project_id=self.group.project_id,
                 new_status=GroupStatus.UNRESOLVED,
                 new_substatus=substatus,
@@ -215,7 +216,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
             (GroupStatus.UNRESOLVED, GroupSubStatus.NEW, "group.update_status.invalid_substatus"),
         ]:
             bad_status_change = StatusChangeMessage(
-                fingerprint=[self.group_hash.hash],
+                fingerprint=self.fingerprint,
                 project_id=self.group.project_id,
                 new_status=status,
                 new_substatus=substatus,
@@ -224,12 +225,15 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
                 payload_type=PayloadType.STATUS_CHANGE,
                 status_change=bad_status_change,
             )
+            processed_fingerprint = {"fingerprint": ["group-1"]}
+            process_occurrence_data(processed_fingerprint)
+
             self.group.refresh_from_db()
             mock_logger_error.assert_called_with(
                 error_msg,
                 extra={
                     "project_id": self.group.project_id,
-                    "fingerprint": [self.group_hash.hash],
+                    "fingerprint": processed_fingerprint["fingerprint"],
                     "new_status": status,
                     "new_substatus": substatus,
                 },
@@ -252,6 +256,8 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
         assert group
         initial_status = group.status
         initial_substatus = group.substatus
+        wrong_fingerprint = {"fingerprint": ["wronghash"]}
+        process_occurrence_data(wrong_fingerprint)
 
         bad_status_change_resolve = StatusChangeMessage(
             fingerprint=["wronghash"],
@@ -268,7 +274,7 @@ class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
             "grouphash.not_found",
             extra={
                 "project_id": group.project_id,
-                "fingerprint": "wronghash",
+                "fingerprint": wrong_fingerprint["fingerprint"][0],
             },
         )
         assert group.status == initial_status