Browse Source

perf: Arroyo 2.15.2 and reusable multiprocessing pools (#61221)

Arroyo 2.15.0 enables consumers to reuse the multiprocessing pool instance across rebalances in Sentry consumers.

Since Snuba is deployed in a rolling manner, deploys currently create a large number of rebalances across the consumer group. During this time, we end up initializing and closing the multiprocess pools over and over again, which is slow since we have to import all of Sentry into the subprocesses. This causes consumers to backlog unnecessarily. This PR attempts to reduce the impact of rebalancing on consumer throughput.
Lyn Nagara 1 year ago
parent
commit
a918666674

+ 1 - 2
requirements-base.txt

@@ -62,8 +62,7 @@ requests>=2.25.1
 rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
-sentry-arroyo>=2.14.25
-sentry-kafka-schemas>=0.1.38
+sentry-arroyo>=2.15.2
 sentry-kafka-schemas>=0.1.38
 sentry-redis-tools>=0.1.7
 sentry-relay>=0.8.39

+ 1 - 1
requirements-dev-frozen.txt

@@ -171,7 +171,7 @@ rfc3986-validator==0.1.1
 rsa==4.8
 s3transfer==0.6.1
 selenium==4.16.0
-sentry-arroyo==2.14.25
+sentry-arroyo==2.15.2
 sentry-cli==2.16.0
 sentry-forked-django-stubs==4.2.7.post1
 sentry-forked-djangorestframework-stubs==3.14.5.post1

+ 1 - 1
requirements-frozen.txt

@@ -118,7 +118,7 @@ rfc3339-validator==0.1.2
 rfc3986-validator==0.1.1
 rsa==4.8
 s3transfer==0.6.1
-sentry-arroyo==2.14.25
+sentry-arroyo==2.15.2
 sentry-kafka-schemas==0.1.38
 sentry-redis-tools==0.1.7
 sentry-relay==0.8.39

+ 2 - 1
src/sentry/eventstream/kafka/dispatch.py

@@ -91,5 +91,6 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None:
 
 
 class EventPostProcessForwarderStrategyFactory(PostProcessForwarderStrategyFactory):
-    def _dispatch_function(self, message: Message[KafkaPayload]) -> None:
+    @staticmethod
+    def _dispatch_function(message: Message[KafkaPayload]) -> None:
         return _get_task_kwargs_and_dispatch(message)

+ 23 - 5
src/sentry/ingest/consumer/factory.py

@@ -20,7 +20,7 @@ from django.conf import settings
 from sentry.ingest.types import ConsumerType
 from sentry.processing.backpressure.arroyo import HealthChecker, create_backpressure_step
 from sentry.utils import kafka_config
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 from .attachment_event import decode_and_process_chunks, process_attachments_and_events
 from .simple_event import process_simple_event_message
@@ -42,14 +42,16 @@ def maybe_multiprocess_step(
     mp: MultiProcessConfig | None,
     function: Callable[[Message[TInput]], TOutput],
     next_step: ProcessingStrategy[FilteredPayload | TOutput],
+    pool: Optional[MultiprocessingPool],
 ) -> ProcessingStrategy[FilteredPayload | TInput]:
     if mp is not None:
+        assert pool is not None
         return RunTaskWithMultiprocessing(
             function=function,
             next_step=next_step,
-            num_processes=mp.num_processes,
             max_batch_size=mp.max_batch_size,
             max_batch_time=mp.max_batch_time,
+            pool=pool,
             input_block_size=mp.input_block_size,
             output_block_size=mp.output_block_size,
         )
@@ -74,6 +76,12 @@ class IngestStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         self.is_attachment_topic = consumer_type == ConsumerType.Attachments
 
         self.multi_process = None
+        self._pool = MultiprocessingPool(num_processes)
+
+        # XXX: Attachment topic has two multiprocessing strategies chained together so we use
+        # two pools.
+        if self.is_attachment_topic:
+            self._attachments_pool = MultiprocessingPool(num_processes)
         if num_processes > 1:
             self.multi_process = MultiProcessConfig(
                 num_processes, max_batch_size, max_batch_time, input_block_size, output_block_size
@@ -91,7 +99,9 @@ class IngestStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         final_step = CommitOffsets(commit)
 
         if not self.is_attachment_topic:
-            next_step = maybe_multiprocess_step(mp, process_simple_event_message, final_step)
+            next_step = maybe_multiprocess_step(
+                mp, process_simple_event_message, final_step, self._pool
+            )
             return create_backpressure_step(health_checker=self.health_checker, next_step=next_step)
 
         # The `attachments` topic is a bit different, as it allows multiple event types:
@@ -104,7 +114,9 @@ class IngestStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         # are being handled in a step before the event depending on them is processed in a
         # later step.
 
-        step_2 = maybe_multiprocess_step(mp, process_attachments_and_events, final_step)
+        step_2 = maybe_multiprocess_step(
+            mp, process_attachments_and_events, final_step, self._attachments_pool
+        )
         # This `FilterStep` will skip over processing `None` (aka already handled attachment chunks)
         # in the second step. We filter this here explicitly,
         # to avoid arroyo from needlessly dispatching `None` messages.
@@ -113,10 +125,16 @@ class IngestStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         # As the steps are defined (and types inferred) in reverse order, we would get a type error here,
         # as `step_1` outputs an `| None`, but the `filter_step` does not mention that in its type,
         # as it is inferred from the `step_2` input type which does not mention `| None`.
-        step_1 = maybe_multiprocess_step(mp, decode_and_process_chunks, filter_step)  # type:ignore
+        step_1 = maybe_multiprocess_step(
+            mp, decode_and_process_chunks, filter_step, self._pool  # type:ignore
+        )
 
         return create_backpressure_step(health_checker=self.health_checker, next_step=step_1)
 
+    def shutdown(self) -> None:
+        self._pool.close()
+        self._attachments_pool.close()
+
 
 def get_ingest_consumer(
     consumer_type: str,

+ 6 - 3
src/sentry/issues/run.py

@@ -13,7 +13,7 @@ from arroyo.processing.strategies import (
 )
 from arroyo.types import Commit, Message, Partition
 
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 from sentry.utils.kafka_config import get_topic_definition
 
 logger = logging.getLogger(__name__)
@@ -95,9 +95,9 @@ class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         super().__init__()
         self.max_batch_size = max_batch_size
         self.max_batch_time = max_batch_time
-        self.num_processes = num_processes
         self.input_block_size = input_block_size
         self.output_block_size = output_block_size
+        self.pool = MultiprocessingPool(num_processes)
 
     def create_with_partitions(
         self,
@@ -107,13 +107,16 @@ class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         return RunTaskWithMultiprocessing(
             function=process_message,
             next_step=CommitOffsets(commit),
-            num_processes=self.num_processes,
             max_batch_size=self.max_batch_size,
             max_batch_time=self.max_batch_time,
+            pool=self.pool,
             input_block_size=self.input_block_size,
             output_block_size=self.output_block_size,
         )
 
+    def shutdown(self) -> None:
+        self.pool.close()
+
 
 def process_message(message: Message[KafkaPayload]) -> None:
     from sentry.issues.occurrence_consumer import (

+ 8 - 4
src/sentry/post_process_forwarder/post_process_forwarder.py

@@ -11,14 +11,15 @@ from arroyo.processing.strategies import (
 )
 from arroyo.types import Commit, Message, Partition
 
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 logger = logging.getLogger(__name__)
 
 
 class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload], ABC):
+    @staticmethod
     @abstractmethod
-    def _dispatch_function(self, message: Message[KafkaPayload]) -> None:
+    def _dispatch_function(message: Message[KafkaPayload]) -> None:
         raise NotImplementedError()
 
     def __init__(
@@ -32,13 +33,13 @@ class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload
         concurrency: int,
     ) -> None:
         self.mode = mode
-        self.num_processes = num_processes
         self.input_block_size = input_block_size
         self.output_block_size = output_block_size
         self.max_batch_size = max_batch_size
         self.max_batch_time = max_batch_time
         self.concurrency = concurrency
         self.max_pending_futures = concurrency + 1000
+        self.pool = MultiprocessingPool(num_processes)
 
     def create_with_partitions(
         self,
@@ -58,11 +59,14 @@ class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload
             return RunTaskWithMultiprocessing(
                 function=self._dispatch_function,
                 next_step=CommitOffsets(commit),
-                num_processes=self.num_processes,
                 max_batch_size=self.max_batch_size,
                 max_batch_time=self.max_batch_time,
+                pool=self.pool,
                 input_block_size=self.input_block_size,
                 output_block_size=self.output_block_size,
             )
         else:
             raise ValueError(f"Invalid mode {self.mode}")
+
+    def shutdown(self) -> None:
+        self.pool.close()

+ 8 - 2
src/sentry/replays/consumers/recording.py

@@ -15,7 +15,7 @@ from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import Replay
 from sentry_sdk.tracing import Span
 
 from sentry.replays.usecases.ingest import ingest_recording
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 logger = logging.getLogger(__name__)
 
@@ -60,6 +60,7 @@ class ProcessReplayRecordingStrategyFactory(ProcessingStrategyFactory[KafkaPaylo
         self.output_block_size = output_block_size
         self.use_processes = self.num_processes > 1
         self.force_synchronous = force_synchronous
+        self.pool = MultiprocessingPool(num_processes) if self.use_processes else None
 
     def create_with_partitions(
         self,
@@ -72,12 +73,13 @@ class ProcessReplayRecordingStrategyFactory(ProcessingStrategyFactory[KafkaPaylo
                 next_step=CommitOffsets(commit),
             )
         elif self.use_processes:
+            assert self.pool is not None
             return RunTaskWithMultiprocessing(
                 function=process_message,
                 next_step=CommitOffsets(commit),
-                num_processes=self.num_processes,
                 max_batch_size=self.max_batch_size,
                 max_batch_time=self.max_batch_time,
+                pool=self.pool,
                 input_block_size=self.input_block_size,
                 output_block_size=self.output_block_size,
             )
@@ -93,6 +95,10 @@ class ProcessReplayRecordingStrategyFactory(ProcessingStrategyFactory[KafkaPaylo
                 ),
             )
 
+    def shutdown(self) -> None:
+        if self.pool:
+            self.pool.close()
+
 
 def initialize_threaded_context(message: Message[KafkaPayload]) -> MessageContext:
     """Initialize a Sentry transaction and unpack the message."""

+ 17 - 12
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -25,7 +25,7 @@ from sentry.sentry_metrics.consumers.indexer.routing_producer import (
     RoutingProducerStep,
 )
 from sentry.sentry_metrics.consumers.indexer.slicing_router import SlicingRouter
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 logger = logging.getLogger(__name__)
 
@@ -137,11 +137,20 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         self.__max_parallel_batch_size = max_parallel_batch_size
         self.__max_parallel_batch_time = max_parallel_batch_time
 
-        self.__processes = processes
-
         self.__input_block_size = input_block_size
         self.__output_block_size = output_block_size
         self.__slicing_router = slicing_router
+        self.__pool = MultiprocessingPool(
+            num_processes=processes,
+            # It is absolutely crucial that we pass a function reference here
+            # where the function lives in a module that does not depend on
+            # Django settings. `sentry.sentry_metrics.configuration` fulfills
+            # that requirement, but if you were to create a wrapper function in
+            # this module, and pass that function here, it would attempt to
+            # pull in a bunch of modules that try to read django settings at
+            # import time
+            initializer=functools.partial(initialize_subprocess_state, self.config),
+        )
 
     def create_with_partitions(
         self,
@@ -153,23 +162,16 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
             commit=commit,
             slicing_router=self.__slicing_router,
         )
+
         parallel_strategy = RunTaskWithMultiprocessing(
             function=MessageProcessor(self.config).process_messages,
             next_step=Unbatcher(next_step=producer),
-            num_processes=self.__processes,
+            pool=self.__pool,
             max_batch_size=self.__max_parallel_batch_size,
             # This is in seconds
             max_batch_time=self.__max_parallel_batch_time / 1000,
             input_block_size=self.__input_block_size,
             output_block_size=self.__output_block_size,
-            # It is absolutely crucial that we pass a function reference here
-            # where the function lives in a module that does not depend on
-            # Django settings. `sentry.sentry_metrics.configuration` fulfills
-            # that requirement, but if you were to create a wrapper function in
-            # this module, and pass that function here, it would attempt to
-            # pull in a bunch of modules that try to read django settings at
-            # import time
-            initializer=functools.partial(initialize_subprocess_state, self.config),
         )
 
         strategy = BatchMessages(
@@ -178,6 +180,9 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
 
         return strategy
 
+    def shutdown(self) -> None:
+        self.__pool.close()
+
 
 def get_metrics_producer_strategy(
     config: MetricsIngestConfiguration,

+ 6 - 4
src/sentry/snuba/query_subscriptions/run.py

@@ -20,7 +20,7 @@ from sentry_kafka_schemas import get_codec
 
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset
-from sentry.utils.arroyo import RunTaskWithMultiprocessing
+from sentry.utils.arroyo import MultiprocessingPool, RunTaskWithMultiprocessing
 
 logger = logging.getLogger(__name__)
 
@@ -41,10 +41,10 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         self.logical_topic = dataset_to_logical_topic[self.dataset]
         self.max_batch_size = max_batch_size
         self.max_batch_time = max_batch_time
-        self.num_processes = num_processes
         self.input_block_size = input_block_size
         self.output_block_size = output_block_size
         self.multi_proc = multi_proc
+        self.pool = MultiprocessingPool(num_processes)
 
     def create_with_partitions(
         self,
@@ -56,15 +56,18 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
             return RunTaskWithMultiprocessing(
                 function=callable,
                 next_step=CommitOffsets(commit),
-                num_processes=self.num_processes,
                 max_batch_size=self.max_batch_size,
                 max_batch_time=self.max_batch_time,
+                pool=self.pool,
                 input_block_size=self.input_block_size,
                 output_block_size=self.output_block_size,
             )
         else:
             return RunTask(callable, CommitOffsets(commit))
 
+    def shutdown(self) -> None:
+        self.pool.close()
+
 
 def process_message(
     dataset: Dataset, topic: str, logical_topic: str, message: Message[KafkaPayload]
@@ -118,7 +121,6 @@ def get_query_subscription_consumer(
     output_block_size: Optional[int],
     multi_proc: bool = False,
 ) -> StreamProcessor[KafkaPayload]:
-
     from sentry.utils import kafka_config
 
     cluster_name = kafka_config.get_topic_definition(topic)["cluster"]

Some files were not shown because too many files changed in this diff