Browse Source

feat(metrics): Reintroduce parallel consumer [sns-1602] (#37762)

Co-authored-by: Filippo Pacifici <fpacifici@sentry.io>
Markus Unterwaditzer 2 years ago
parent
commit
d4d34feafc

+ 42 - 0
src/sentry/runner/commands/run.py

@@ -587,6 +587,48 @@ def metrics_streaming_consumer(**options):
         streamer.run()
 
 
+@run.command("ingest-metrics-parallel-consumer")
+@log_options()
+@batching_kafka_options("ingest-metrics-consumer")
+@configuration
+@click.option(
+    "--processes",
+    default=1,
+    type=int,
+)
+@click.option("--input-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
+@click.option("--output-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
+@click.option("--ingest-profile", required=True)
+@click.option("max_msg_batch_size", "--max-msg-batch-size", type=int, default=50)
+@click.option("max_msg_batch_time", "--max-msg-batch-time-ms", type=int, default=10000)
+@click.option("max_parallel_batch_size", "--max-parallel-batch-size", type=int, default=50)
+@click.option("max_parallel_batch_time", "--max-parallel-batch-time-ms", type=int, default=10000)
+def metrics_parallel_consumer(**options):
+    import sentry_sdk
+
+    from sentry.sentry_metrics.configuration import UseCaseKey, get_ingest_config
+    from sentry.sentry_metrics.consumers.indexer.parallel import get_parallel_metrics_consumer
+    from sentry.sentry_metrics.metrics_wrapper import MetricsWrapper
+    from sentry.utils.metrics import backend, global_tags
+
+    use_case = UseCaseKey(options["ingest_profile"])
+    sentry_sdk.set_tag("sentry_metrics.use_case_key", use_case.value)
+    ingest_config = get_ingest_config(use_case)
+    metrics_wrapper = MetricsWrapper(backend, "sentry_metrics.indexer")
+    configure_metrics(metrics_wrapper)
+
+    streamer = get_parallel_metrics_consumer(indexer_profile=ingest_config, **options)
+
+    def handler(signum, frame):
+        streamer.signal_shutdown()
+
+    signal.signal(signal.SIGINT, handler)
+    signal.signal(signal.SIGTERM, handler)
+
+    with global_tags(_all_threads=True, pipeline=ingest_config.internal_metrics_tag):
+        streamer.run()
+
+
 @run.command("ingest-profiles")
 @log_options()
 @click.option("--topic", default="profiles", help="Topic to get profiles data from.")

+ 2 - 11
src/sentry/sentry_metrics/consumers/indexer/common.py

@@ -1,4 +1,3 @@
-import functools
 import logging
 import time
 from typing import Any, List, MutableMapping, Optional, Set
@@ -10,7 +9,7 @@ from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
 from arroyo.types import Message
 from django.conf import settings
 
-from sentry.utils import kafka_config
+from sentry.utils import kafka_config, metrics
 
 MessageBatch = List[Message[KafkaPayload]]
 
@@ -40,13 +39,6 @@ def get_config(topic: str, group_id: str, auto_offset_reset: str) -> MutableMapp
     return consumer_config
 
 
-@functools.lru_cache(maxsize=10)
-def get_metrics():  # type: ignore
-    from sentry.utils import metrics
-
-    return metrics
-
-
 class DuplicateMessage(Exception):
     pass
 
@@ -113,7 +105,6 @@ class BatchMessages(ProcessingStep[KafkaPayload]):  # type: ignore
         self.__batch: Optional[MetricsBatchBuilder] = None
         self.__closed = False
         self.__batch_start: Optional[float] = None
-        self.__metrics = get_metrics()
 
     def poll(self) -> None:
         assert not self.__closed
@@ -156,7 +147,7 @@ class BatchMessages(ProcessingStep[KafkaPayload]):  # type: ignore
         new_message = Message(last.partition, last.offset, self.__batch.messages, last.timestamp)
         if self.__batch_start is not None:
             elapsed_time = time.time() - self.__batch_start
-            self.__metrics.timing("batch_messages.build_time", elapsed_time)
+            metrics.timing("batch_messages.build_time", elapsed_time)
             self.__batch_start = None
 
         self.__next_step.submit(new_message)

+ 6 - 18
src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

@@ -1,4 +1,3 @@
-import functools
 import logging
 import time
 from dataclasses import dataclass
@@ -18,19 +17,12 @@ from django.conf import settings
 from sentry.sentry_metrics.configuration import MetricsIngestConfiguration
 from sentry.sentry_metrics.consumers.indexer.common import BatchMessages, MessageBatch, get_config
 from sentry.sentry_metrics.consumers.indexer.processing import process_messages
-from sentry.utils import kafka_config
+from sentry.utils import kafka_config, metrics
 from sentry.utils.batching_kafka_consumer import create_topics
 
 logger = logging.getLogger(__name__)
 
 
-@functools.lru_cache(maxsize=10)
-def get_metrics():  # type: ignore
-    from sentry.utils import metrics
-
-    return metrics
-
-
 class BatchConsumerStrategyFactory(ProcessingStrategyFactory):  # type: ignore
     """
     Batching Consumer Strategy
@@ -82,7 +74,6 @@ class TransformStep(ProcessingStep[MessageBatch]):  # type: ignore
         )
         self.__next_step = next_step
         self.__closed = False
-        self.__metrics = get_metrics()
 
     def poll(self) -> None:
         self.__next_step.poll()
@@ -90,7 +81,7 @@ class TransformStep(ProcessingStep[MessageBatch]):  # type: ignore
     def submit(self, message: Message[MessageBatch]) -> None:
         assert not self.__closed
 
-        with self.__metrics.timer("transform_step.process_messages"):
+        with metrics.timer("transform_step.process_messages"):
             transformed_message_batch = self.__process_messages(message)
 
         for transformed_message in transformed_message_batch:
@@ -141,7 +132,6 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):  # type: ignore
         self.__commit_function = commit_function
 
         self.__closed = False
-        self.__metrics = get_metrics()
         self.__produced_message_offsets: MutableMapping[Partition, Position] = {}
         self.__callbacks = 0
         self.__started = time.time()
@@ -176,12 +166,12 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):  # type: ignore
 
         # record poll time durations every 5 seconds
         if (self.__poll_start_time + 5) < time.time():
-            self.__metrics.timing("simple_produce_step.join_duration", self.__poll_duration_sum)
+            metrics.timing("simple_produce_step.join_duration", self.__poll_duration_sum)
             self.__poll_duration_sum = 0
             self.__poll_start_time = time.time()
 
     def poll_producer(self, timeout: float) -> None:
-        with self.__metrics.timer("simple_produce_step.producer_poll_duration", sample_rate=0.05):
+        with metrics.timer("simple_produce_step.producer_poll_duration", sample_rate=0.05):
             start = time.time()
             self.__producer.poll(timeout)
             end = time.time()
@@ -192,9 +182,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):  # type: ignore
     def poll(self) -> None:
         timeout = 0.0
         if len(self.__producer) >= self.__producer_queue_max_size:
-            self.__metrics.incr(
-                "simple_produce_step.producer_queue_backup", amount=len(self.__producer)
-            )
+            metrics.incr("simple_produce_step.producer_queue_backup", amount=len(self.__producer))
             timeout = self.__producer_long_poll_timeout
 
         self.poll_producer(timeout)
@@ -229,7 +217,7 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):  # type: ignore
         self.__closed = True
 
     def join(self, timeout: Optional[float]) -> None:
-        with self.__metrics.timer("simple_produce_step.join_duration"):
+        with metrics.timer("simple_produce_step.join_duration"):
             if not timeout:
                 timeout = 5.0
             self.__producer.flush(timeout)

+ 102 - 37
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -1,60 +1,112 @@
 from __future__ import annotations
 
-import functools
+import logging
 from functools import partial
-from typing import Callable, Mapping, Union
+from typing import Callable, Mapping, Optional, Union
 
 from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
 from arroyo.processing import StreamProcessor
-from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
+from arroyo.processing.strategies import ProcessingStrategy
+from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
+from arroyo.processing.strategies import ProcessingStrategyFactory
 from arroyo.processing.strategies.streaming.transform import ParallelTransformStep
-from arroyo.types import Partition, Position, Topic
+from arroyo.types import Message, Partition, Position, Topic
 from django.conf import settings
 
+from sentry.runner import configure
 from sentry.sentry_metrics.configuration import MetricsIngestConfiguration
-from sentry.sentry_metrics.consumers.indexer.common import BatchMessages, get_config
+from sentry.sentry_metrics.consumers.indexer.common import BatchMessages, MessageBatch, get_config
 from sentry.sentry_metrics.consumers.indexer.multiprocess import SimpleProduceStep
 from sentry.sentry_metrics.consumers.indexer.processing import process_messages
 from sentry.utils.batching_kafka_consumer import create_topics
 
+logger = logging.getLogger(__name__)
 
-@functools.lru_cache(maxsize=10)
-def get_metrics():  # type: ignore
-    from sentry.utils import metrics
 
-    return metrics
+class Unbatcher(ProcessingStep[MessageBatch]):  # type: ignore
+    def __init__(
+        self,
+        next_step: ProcessingStep[KafkaPayload],
+    ) -> None:
+        self.__next_step = next_step
+        self.__closed = False
+
+    def poll(self) -> None:
+        self.__next_step.poll()
+
+    def submit(self, message: Message[MessageBatch]) -> None:
+        assert not self.__closed
+
+        for transformed_message in message.payload:
+            self.__next_step.submit(transformed_message)
+
+    def close(self) -> None:
+        self.__closed = True
 
+    def terminate(self) -> None:
+        self.__closed = True
 
-def initializer() -> None:
-    from sentry.runner import configure
+        logger.debug("Terminating %r...", self.__next_step)
+        self.__next_step.terminate()
 
-    configure()
+    def join(self, timeout: Optional[float] = None) -> None:
+        self.__next_step.close()
+        self.__next_step.join(timeout)
 
 
 class MetricsConsumerStrategyFactory(ProcessingStrategyFactory):  # type: ignore
+    """
+    Builds an indexer consumer based on the multi process transform Arroyo step.
+
+    Multi processing happens in batches, the parallel step batches messages, then
+    it dispatches them to a process. This is meant to avoid lock contention that
+    would happen by transferring one message at a time.
+    The parallel transform function is then applied to all messages one by one.
+
+    The indexer must resolve batches of messages. It cannot resolve messages in
+    isolation otherwise the amount of roundtrip to cache would be enormous.
+    So the pipeline works this way:
+    - the indexer batches messages like today.
+    - each batch is a message for the parallel transform step.
+    - the parallel transform step may or may not re-batch messages batcehs
+      together. The load tests show it is still useful.
+    - messages are exploded back into individual ones after the parallel
+      transform step.
+    """
+
     def __init__(
         self,
+        max_msg_batch_size: int,
+        max_msg_batch_time: float,
+        max_parallel_batch_size: int,
+        max_parallel_batch_time: float,
         max_batch_size: int,
         max_batch_time: float,
         processes: int,
         input_block_size: int,
         output_block_size: int,
-        commit_max_batch_size: int,
-        commit_max_batch_time: float,
         config: MetricsIngestConfiguration,
     ):
         self.__config = config
-        self.__max_batch_time = max_batch_time
-        self.__max_batch_size = max_batch_size
+
+        # This is the size of the initial message batching the indexer does
+        self.__max_msg_batch_size = max_msg_batch_size
+        self.__max_msg_batch_time = max_msg_batch_time
+
+        # This is the size of the batches sent to the parallel processes.
+        # These are batches of batches.
+        self.__max_parallel_batch_size = max_parallel_batch_size
+        self.__max_parallel_batch_time = max_parallel_batch_time
+
+        # This is the batch size used to decide when to commit on Kafka.
+        self.__commit_max_batch_size = max_batch_size
+        self.__commit_max_batch_time = max_batch_time
 
         self.__processes = processes
 
         self.__input_block_size = input_block_size
         self.__output_block_size = output_block_size
 
-        self.__commit_max_batch_size = commit_max_batch_size
-        self.__commit_max_batch_time = commit_max_batch_time
-
     def create_with_partitions(
         self,
         commit: Callable[[Mapping[Partition, Position]], None],
@@ -62,30 +114,43 @@ class MetricsConsumerStrategyFactory(ProcessingStrategyFactory):  # type: ignore
     ) -> ProcessingStrategy[KafkaPayload]:
         parallel_strategy = ParallelTransformStep(
             partial(process_messages, self.__config.use_case_id),
-            SimpleProduceStep(
-                commit_function=commit,
-                commit_max_batch_size=self.__commit_max_batch_size,
-                # convert to seconds
-                commit_max_batch_time=self.__commit_max_batch_time / 1000,
-                output_topic=self.__config.output_topic,
+            Unbatcher(
+                SimpleProduceStep(
+                    commit_function=commit,
+                    commit_max_batch_size=self.__commit_max_batch_size,
+                    # This is in seconds
+                    commit_max_batch_time=self.__commit_max_batch_time / 1000,
+                    output_topic=self.__config.output_topic,
+                ),
             ),
             self.__processes,
-            max_batch_size=self.__max_batch_size,
-            max_batch_time=self.__max_batch_time,
+            max_batch_size=self.__max_parallel_batch_size,
+            # This is in seconds
+            max_batch_time=self.__max_parallel_batch_time / 1000,
             input_block_size=self.__input_block_size,
             output_block_size=self.__output_block_size,
-            initializer=initializer,
+            # It is absolutely crucial that we pass a function reference here
+            # where the function lives in a module that does not depend on
+            # Django settings. `sentry.runner` fulfills that requirement, but
+            # if you were to create a wrapper function in this module that
+            # calls configure(), and pass that function here, it would attempt
+            # to pull in a bunch of modules that try to read django settings at
+            # import time
+            initializer=configure,
         )
 
-        strategy = BatchMessages(parallel_strategy, self.__max_batch_time, self.__max_batch_size)
+        strategy = BatchMessages(
+            parallel_strategy, self.__max_msg_batch_time, self.__max_msg_batch_size
+        )
 
         return strategy
 
 
-def get_streaming_metrics_consumer(
-    topic: str,
-    commit_max_batch_size: int,
-    commit_max_batch_time: float,
+def get_parallel_metrics_consumer(
+    max_msg_batch_size: int,
+    max_msg_batch_time: float,
+    max_parallel_batch_size: int,
+    max_parallel_batch_time: float,
     max_batch_size: int,
     max_batch_time: float,
     processes: int,
@@ -93,19 +158,19 @@ def get_streaming_metrics_consumer(
     output_block_size: int,
     group_id: str,
     auto_offset_reset: str,
-    factory_name: str,
     indexer_profile: MetricsIngestConfiguration,
     **options: Mapping[str, Union[str, int]],
 ) -> StreamProcessor:
-    assert factory_name == "multiprocess"
     processing_factory = MetricsConsumerStrategyFactory(
+        max_msg_batch_size=max_msg_batch_size,
+        max_msg_batch_time=max_msg_batch_time,
+        max_parallel_batch_size=max_parallel_batch_size,
+        max_parallel_batch_time=max_parallel_batch_time,
         max_batch_size=max_batch_size,
         max_batch_time=max_batch_time,
         processes=processes,
         input_block_size=input_block_size,
         output_block_size=output_block_size,
-        commit_max_batch_size=commit_max_batch_size,
-        commit_max_batch_time=commit_max_batch_time,
         config=indexer_profile,
     )
 

+ 2 - 25
src/sentry/sentry_metrics/consumers/indexer/processing.py

@@ -1,30 +1,16 @@
-import functools
 import logging
-from typing import TYPE_CHECKING
 
 from arroyo.types import Message
 
+from sentry.sentry_metrics import indexer
 from sentry.sentry_metrics.configuration import UseCaseKey
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch
 from sentry.sentry_metrics.consumers.indexer.common import MessageBatch
+from sentry.utils import metrics
 
 logger = logging.getLogger(__name__)
 
 
-@functools.lru_cache(maxsize=10)
-def get_metrics():  # type: ignore
-    from sentry.utils import metrics
-
-    return metrics
-
-
-@functools.lru_cache(maxsize=10)
-def get_indexer():  # type: ignore
-    from sentry.sentry_metrics import indexer
-
-    return indexer
-
-
 def process_messages(
     use_case_id: UseCaseKey,
     outer_message: Message[MessageBatch],
@@ -47,15 +33,6 @@ def process_messages(
     The value of the message is what we need to parse and then translate
     using the indexer.
     """
-    if TYPE_CHECKING:
-        from sentry.sentry_metrics import indexer
-        from sentry.utils import metrics
-    else:
-        # This, instead of importing the normal way, was likely done to prevent
-        # fork-safety issues.
-        indexer = get_indexer()
-        metrics = get_metrics()
-
     batch = IndexerBatch(use_case_id, outer_message)
 
     org_strings = batch.extract_strings()