Browse Source

ref(sentry_metrics): Use sentry.utils.arroyo to configure subprocesses. (#50719)

Markus Unterwaditzer 1 year ago
parent
commit
bfbc9c2cfb

+ 13 - 11
src/sentry/sentry_metrics/configuration.py

@@ -113,27 +113,29 @@ def get_ingest_config(
     return _METRICS_INGEST_CONFIG_BY_USE_CASE[(use_case_key, db_backend)]
 
 
-def initialize_sentry_and_global_consumer_state(config: MetricsIngestConfiguration) -> None:
+def initialize_subprocess_state(config: MetricsIngestConfiguration) -> None:
     """
-    Initialization function for subprocesses spawned by the parallel indexer.
-
-    It does the same thing as `initialize_global_consumer_state` except it
-    initializes the Sentry Django app from scratch as well.
+    Initialization function for the subprocesses of the metrics indexer.
 
     `config` is pickleable, and this function lives in a module that can be
     imported without any upfront initialization of the Django app. Meaning that
     an object like
     `functools.partial(initialize_sentry_and_global_consumer_state, config)` is
     pickleable as well (which we pass as initialization callback to arroyo).
-    """
-    from sentry.runner import configure
 
-    configure()
+    This function should ideally be kept minimal and not contain too much
+    logic. Commonly reusable bits should be added to
+    sentry.utils.arroyo.RunTaskWithMultiprocessing.
 
-    initialize_global_consumer_state(config)
+    We already rely on sentry.utils.arroyo.RunTaskWithMultiprocessing to copy
+    statsd tags into the subprocess, eventually we should do the same for
+    Sentry tags.
+    """
+
+    sentry_sdk.set_tag("sentry_metrics.use_case_key", config.use_case_id.value)
 
 
-def initialize_global_consumer_state(config: MetricsIngestConfiguration) -> None:
+def initialize_main_process_state(config: MetricsIngestConfiguration) -> None:
     """
     Initialization function for the main process of the metrics indexer.
 
@@ -151,5 +153,5 @@ def initialize_global_consumer_state(config: MetricsIngestConfiguration) -> None
 
     from sentry.utils.arroyo import MetricsWrapper
 
-    metrics_wrapper = MetricsWrapper(backend, name="sentry_metrics.indexer", tags=global_tag_map)
+    metrics_wrapper = MetricsWrapper(backend, name="sentry_metrics.indexer")
     configure_metrics(metrics_wrapper)

+ 7 - 9
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -10,12 +10,11 @@ from arroyo.processing import StreamProcessor
 from arroyo.processing.strategies import ProcessingStrategy
 from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
 from arroyo.processing.strategies import ProcessingStrategyFactory
-from arroyo.processing.strategies.transform import ParallelTransformStep
 from arroyo.types import Commit, FilteredPayload, Message, Partition, Topic
 
 from sentry.sentry_metrics.configuration import (
     MetricsIngestConfiguration,
-    initialize_sentry_and_global_consumer_state,
+    initialize_subprocess_state,
 )
 from sentry.sentry_metrics.consumers.indexer.common import (
     BatchMessages,
@@ -29,6 +28,7 @@ from sentry.sentry_metrics.consumers.indexer.routing_producer import (
     RoutingProducerStep,
 )
 from sentry.sentry_metrics.consumers.indexer.slicing_router import SlicingRouter
+from sentry.utils.arroyo import RunTaskWithMultiprocessing
 
 logger = logging.getLogger(__name__)
 
@@ -124,10 +124,10 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
             commit=commit,
             slicing_router=self.__slicing_router,
         )
-        parallel_strategy = ParallelTransformStep(
-            MessageProcessor(self.__config).process_messages,
-            Unbatcher(next_step=producer),
-            self.__processes,
+        parallel_strategy = RunTaskWithMultiprocessing(
+            function=MessageProcessor(self.__config).process_messages,
+            next_step=Unbatcher(next_step=producer),
+            num_processes=self.__processes,
             max_batch_size=self.__max_parallel_batch_size,
             # This is in seconds
             max_batch_time=self.__max_parallel_batch_time / 1000,
@@ -140,9 +140,7 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
             # this module, and pass that function here, it would attempt to
             # pull in a bunch of modules that try to read django settings at
             # import time
-            initializer=functools.partial(
-                initialize_sentry_and_global_consumer_state, self.__config
-            ),
+            initializer=functools.partial(initialize_subprocess_state, self.__config),
         )
 
         strategy = BatchMessages(

+ 20 - 6
src/sentry/utils/arroyo.py

@@ -7,9 +7,12 @@ from arroyo.backends.kafka.configuration import build_kafka_consumer_configurati
 from arroyo.backends.kafka.consumer import KafkaConsumer
 from arroyo.commit import ONCE_PER_SECOND
 from arroyo.processing.processor import StreamProcessor
-from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
+from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
+from arroyo.processing.strategies.run_task_with_multiprocessing import (
+    RunTaskWithMultiprocessing as ArroyoRunTaskWithMultiprocessing,
+)
 from arroyo.processing.strategies.run_task_with_multiprocessing import TResult
-from arroyo.types import FilteredPayload, Message, Topic, TStrategyPayload
+from arroyo.types import Topic, TStrategyPayload
 from arroyo.utils.metrics import Metrics
 
 from sentry.metrics.base import MetricsBackend
@@ -99,20 +102,31 @@ def _initialize_arroyo_main() -> None:
     configure_metrics(metrics_wrapper)
 
 
-class RunTaskWithMultiprocessing(ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]):
+class RunTaskWithMultiprocessing(ArroyoRunTaskWithMultiprocessing[TStrategyPayload, TResult]):
+    """
+    A variant of arroyo's RunTaskWithMultiprocessing that initializes Sentry
+    for you, and ensures global metric tags in the subprocess are inherited
+    from the main process.
+    """
+
     def __new__(
         cls,
-        *function: Callable[[Message[TStrategyPayload]], TResult],
-        next_step: ProcessingStrategy[Union[FilteredPayload, TResult]],
+        *,
         initializer: Optional[Callable[[], None]] = None,
         **kwargs: Any,
-    ) -> RunTaskWithMultiprocessing[Union[FilteredPayload, TStrategyPayload]]:
+    ) -> RunTaskWithMultiprocessing:
 
         from django.conf import settings
 
         if settings.KAFKA_CONSUMER_FORCE_DISABLE_MULTIPROCESSING:
             from arroyo.processing.strategies.run_task import RunTask
 
+            kwargs.pop("num_processes", None)
+            kwargs.pop("input_block_size", None)
+            kwargs.pop("output_block_size", None)
+            kwargs.pop("max_batch_size", None)
+            kwargs.pop("max_batch_time", None)
+
             return RunTask(**kwargs)  # type: ignore[return-value]
         else:
             from arroyo.processing.strategies.run_task_with_multiprocessing import (