Browse Source

fix(replays): add exception logging for async tasks (#37892)

* fix(replays): add exception logging for async tasks

* correct usage of logger functions
Josh Ferge 2 years ago
parent
commit
643ed77a54

+ 12 - 6
src/sentry/replays/consumers/recording/process_recording.py

@@ -8,7 +8,6 @@ from io import BytesIO
 from typing import Callable, Deque, Mapping, MutableMapping, NamedTuple, Optional, cast
 
 import msgpack
-import sentry_sdk
 from arroyo import Partition
 from arroyo.backends.kafka.consumer import KafkaPayload
 from arroyo.processing.strategies.abstract import ProcessingStrategy
@@ -168,10 +167,11 @@ class ProcessRecordingSegmentStrategy(ProcessingStrategy[KafkaPayload]):  # type
                 self._process_chunk(cast(RecordingSegmentChunkMessage, message_dict), message)
             if message_dict["type"] == "replay_recording":
                 self._process_recording(cast(RecordingSegmentMessage, message_dict), message)
-        except Exception as e:
+        except Exception:
             # avoid crash looping on bad messsages for now
-            logger.exception("Failed to process message")
-            sentry_sdk.capture_exception(e)
+            logger.exception(
+                "Failed to process replay recording message", extra={"offset": message.offset}
+            )
 
     def join(self, timeout: Optional[float] = None) -> None:
         wait([f for _, f in self.__futures], timeout=timeout, return_when=ALL_COMPLETED)
@@ -189,11 +189,17 @@ class ProcessRecordingSegmentStrategy(ProcessingStrategy[KafkaPayload]):  # type
         committable: MutableMapping[Partition, Message[KafkaPayload]] = {}
 
         while self.__futures and self.__futures[0].future.done():
-            message, _ = self.__futures.popleft()
+            message, result = self.__futures.popleft()
+
+            if result.exception() is not None:
+                logger.error(
+                    "replay recording error in async future",
+                    exc_info=result.exception(),
+                    extra={"offset": message.offset},
+                )
             # overwrite any existing message as we assume the deque is in order
             # committing offset x means all offsets up to and including x are processed
             committable[message.partition] = message
-
         # Commit the latest offset that has its corresponding produce finished, per partition
 
         if committable:

+ 59 - 1
tests/sentry/replays/consumers/recording_consumer/test_consumer.py

@@ -10,7 +10,7 @@ from arroyo.backends.kafka import KafkaPayload
 from sentry.models import File
 from sentry.replays.consumers.recording.factory import ProcessReplayRecordingStrategyFactory
 from sentry.replays.models import ReplayRecordingSegment
-from sentry.testutils.cases import TestCase
+from sentry.testutils import TestCase
 
 
 class TestRecordingsConsumerEndToEnd(TestCase):
@@ -70,3 +70,61 @@ class TestRecordingsConsumerEndToEnd(TestCase):
         assert recording
         assert recording.checksum == sha1(b"testfoobar").hexdigest()
         assert ReplayRecordingSegment.objects.get(replay_id=self.replay_id)
+
+    def test_duplicate_segment_flow(self):
+        processing_strategy = self.processing_factory().create_with_partitions(lambda x: None, None)
+        segment_id = 0
+        consumer_messages = [
+            {
+                "payload": f'{{"segment_id":{segment_id}}}\ntest'.encode(),
+                "replay_id": self.replay_id,
+                "project_id": self.project.id,
+                "id": self.replay_recording_id,
+                "chunk_index": 0,
+                "type": "replay_recording_chunk",
+            },
+            {
+                "type": "replay_recording",
+                "replay_id": self.replay_id,
+                "replay_recording": {
+                    "chunks": 1,
+                    "id": self.replay_recording_id,
+                },
+                "project_id": self.project.id,
+            },
+            {
+                "payload": f'{{"segment_id":{segment_id}}}\nduplicatedyadada'.encode(),
+                "replay_id": self.replay_id,
+                "project_id": self.project.id,
+                "id": self.replay_recording_id,
+                "chunk_index": 0,
+                "type": "replay_recording_chunk",
+            },
+            {
+                "type": "replay_recording",
+                "replay_id": self.replay_id,
+                "replay_recording": {
+                    "chunks": 1,
+                    "id": self.replay_recording_id,
+                },
+                "project_id": self.project.id,
+            },
+        ]
+        for message in consumer_messages:
+            processing_strategy.submit(
+                Message(
+                    Partition(Topic("ingest-replay-recordings"), 1),
+                    1,
+                    KafkaPayload(b"key", msgpack.packb(message), [("should_drop", b"1")]),
+                    datetime.now(),
+                )
+            )
+        processing_strategy.poll()
+        processing_strategy.join(1)
+        processing_strategy.terminate()
+        recording_file_name = f"rr:{self.replay_id}:{segment_id}"
+
+        assert len(File.objects.filter(name=recording_file_name)) == 2
+        # right now both files should be inserted, but only one segment is created,
+        # so the second one is "lost".
+        assert ReplayRecordingSegment.objects.get(replay_id=self.replay_id)