Browse Source

fix(ingest-metrics-consumer): Handle back pressure correctly (#31562)

* fix(ingest-metrics-consumer): Handle back pressure correctly
MeredithAnya 3 years ago
parent
commit
4374932762

+ 31 - 3
src/sentry/sentry_metrics/multiprocess.py

@@ -3,6 +3,7 @@ import logging
 import time
 from collections import deque
 from concurrent.futures import Future
+from copy import deepcopy
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -13,6 +14,7 @@ from typing import (
     MutableMapping,
     NamedTuple,
     Optional,
+    Set,
     Union,
 )
 
@@ -81,6 +83,10 @@ def get_config(topic: str, group_id: str, auto_offset_reset: str) -> MutableMapp
     return consumer_config
 
 
+class DuplicateMessage(Exception):
+    pass
+
+
 class MetricsBatchBuilder:
     """
     Batches up individual messages - type: Message[KafkaPayload] - into a
@@ -94,6 +100,7 @@ class MetricsBatchBuilder:
         self.__messages: MessageBatch = []
         self.__max_batch_size = max_batch_size
         self.__deadline = time.time() + max_batch_time
+        self.__offsets: Set[int] = set()
 
     def __len__(self) -> int:
         return len(self.__messages)
@@ -103,7 +110,10 @@ class MetricsBatchBuilder:
         return self.__messages
 
     def append(self, message: Message[KafkaPayload]) -> None:
+        if message.offset in self.__offsets:
+            raise DuplicateMessage
         self.__messages.append(message)
+        self.__offsets.add(message.offset)
 
     def ready(self) -> bool:
         if len(self.messages) >= self.__max_batch_size:
@@ -145,13 +155,31 @@ class BatchMessages(ProcessingStep[KafkaPayload]):  # type: ignore
         self.__next_step.poll()
 
         if self.__batch and self.__batch.ready():
-            self.__flush()
+            try:
+                self.__flush()
+            except MessageRejected:
+                # Probably means that we have received back pressure due to the
+                # ParallelTransformStep.
+                logger.debug("Attempt to flush batch failed...Re-trying in next poll")
+                pass
 
     def submit(self, message: Message[KafkaPayload]) -> None:
         if self.__batch is None:
             self.__batch = MetricsBatchBuilder(self.__max_batch_size, self.__max_batch_time)
 
-        self.__batch.append(message)
+        try:
+            self.__batch.append(message)
+        except DuplicateMessage:
+            # If we are getting back pressure from the next_step (ParallelTransformStep),
+            # the consumer will keep trying to submit the same carried over message
+            # until it succeeds and stops throwing the MessageRejected error. In this
+            # case we don't want to keep adding the same message to the batch over and
+            # over again
+            logger.debug(f"Message already added to batch with offset: {message.offset}")
+            pass
+
+        if self.__batch and self.__batch.ready():
+            self.__flush()
 
     def __flush(self) -> None:
         if not self.__batch:
@@ -348,7 +376,7 @@ def process_messages(
 
     for message in outer_message.payload:
         parsed_payload_value = parsed_payloads_by_offset[message.offset]
-        new_payload_value = parsed_payload_value
+        new_payload_value = deepcopy(parsed_payload_value)
 
         metric_name = parsed_payload_value["name"]
         tags = parsed_payload_value.get("tags", {})

+ 49 - 7
tests/sentry/sentry_metrics/test_multiprocess_steps.py

@@ -3,15 +3,18 @@ from datetime import datetime, timezone
 from typing import Dict, List, Mapping, MutableMapping, Union
 from unittest.mock import Mock, call, patch
 
+import pytest
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.backends.local.backend import LocalBroker as Broker
 from arroyo.backends.local.storages.memory import MemoryMessageStorage
+from arroyo.processing.strategies import MessageRejected
 from arroyo.types import Message, Partition, Position, Topic
 from arroyo.utils.clock import TestingClock as Clock
 
 from sentry.sentry_metrics.indexer.mock import MockIndexer
 from sentry.sentry_metrics.multiprocess import (
     BatchMessages,
+    DuplicateMessage,
     MetricsBatchBuilder,
     ProduceStep,
     process_messages,
@@ -51,15 +54,10 @@ def test_batch_messages() -> None:
     assert not next_step.submit.called
 
     # submit the second message, message should be added to the batch
+    # which will now saturate the batch_size (2). This will trigger
+    # __flush which in turn calls next.submit and reset the batch to None
     batch_messages_step.submit(message=message2)
 
-    assert len(batch_messages_step._BatchMessages__batch) == 2
-
-    # now the batch_size (2) has been reached, poll should call
-    # self.flush which will call the next step's submit and then
-    # reset the batch to None
-    batch_messages_step.poll()
-
     assert next_step.submit.call_args == call(
         Message(message2.partition, message2.offset, [message1, message2], message2.timestamp),
     )
@@ -67,6 +65,39 @@ def test_batch_messages() -> None:
     assert batch_messages_step._BatchMessages__batch is None
 
 
+def test_batch_messages_rejected_message():
+    next_step = Mock()
+    next_step.submit.side_effect = MessageRejected()
+
+    max_batch_time = 100.0  # seconds
+    max_batch_size = 2
+
+    batch_messages_step = BatchMessages(
+        next_step=next_step, max_batch_time=max_batch_time, max_batch_size=max_batch_size
+    )
+
+    message1 = Message(
+        Partition(Topic("topic"), 0), 1, KafkaPayload(None, b"some value", []), datetime.now()
+    )
+    message2 = Message(
+        Partition(Topic("topic"), 0), 2, KafkaPayload(None, b"another value", []), datetime.now()
+    )
+    batch_messages_step.poll()
+    batch_messages_step.submit(message=message1)
+
+    # if we try to submit a batch when the next step is
+    # not ready to accept more messages we'll get a
+    # MessageRejected error which will bubble up to the
+    # StreamProcessor.
+    with pytest.raises(MessageRejected):
+        batch_messages_step.submit(message=message2)
+
+    # when poll is called, we still try to flush the batch
+    # caust its full but we handled the MessageRejected error
+    batch_messages_step.poll()
+    assert next_step.submit.called
+
+
 def test_metrics_batch_builder():
     max_batch_time = 3.0  # seconds
     max_batch_size = 2
@@ -106,6 +137,17 @@ def test_metrics_batch_builder():
     time.sleep(3)
     assert batch_builder_time.ready()
 
+    # 3. Adding the same message twice to the same batch
+    batch_builder_time = MetricsBatchBuilder(
+        max_batch_size=max_batch_size, max_batch_time=max_batch_time
+    )
+    message1 = Message(
+        Partition(Topic("topic"), 0), 1, KafkaPayload(None, b"some value", []), datetime.now()
+    )
+    batch_builder_time.append(message1)
+    with pytest.raises(DuplicateMessage):
+        batch_builder_time.append(message1)
+
 
 ts = int(datetime.now(tz=timezone.utc).timestamp())
 counter_payload = {