Browse Source

Revert "ref: Add generic consumers CLI for almost all consumers (#50738)"

This reverts commit 9645ace440193da6fcc6d1061b55e8f63cdf12fa.

Co-authored-by: untitaker <837573+untitaker@users.noreply.github.com>
getsentry-bot 1 year ago
parent
commit
1e3ecad585

+ 7 - 143
src/sentry/consumers/__init__.py

@@ -1,55 +1,11 @@
-from __future__ import annotations
+from typing import Mapping, TypedDict
 
-from typing import Any, Mapping, Optional, Sequence, TypedDict
-
-import click
 from django.conf import settings
-from typing_extensions import Required
-
-DEFAULT_BLOCK_SIZE = int(32 * 1e6)
-
-
-class ConsumerDefinition(TypedDict, total=False):
-    topic: Required[str]
-    strategy_factory: Required[str]
-
-    # Additional CLI options the consumer should accept. These arguments are
-    # passed as kwargs to the strategy_factory.
-    click_options: Sequence[click.Option]
-
-    # Hardcoded additional kwargs for strategy_factory
-    static_args: Mapping[str, Any]
 
 
-def convert_max_batch_time(ctx, param, value):
-    if value <= 0:
-        raise click.BadParameter("--max-batch-time must be greater than 0")
-
-    # Our CLI arguments are written in ms, but the strategy requires seconds
-    return int(value / 1000.0)
-
-
-def multiprocessing_options(
-    default_max_batch_size: Optional[int] = None, default_max_batch_time_ms: Optional[int] = 1000
-):
-    return [
-        click.Option(["--processes", "num_processes"], default=1, type=int),
-        click.Option(["--input-block-size"], type=int, default=DEFAULT_BLOCK_SIZE),
-        click.Option(["--output-block-size"], type=int, default=DEFAULT_BLOCK_SIZE),
-        click.Option(
-            ["--max-batch-size"],
-            default=default_max_batch_size,
-            type=int,
-            help="Maximum number of messages to batch before flushing.",
-        ),
-        click.Option(
-            ["--max-batch-time-ms", "max_batch_time"],
-            default=default_max_batch_time_ms,
-            callback=convert_max_batch_time,
-            type=int,
-            help="Maximum time (in seconds) to wait before flushing a batch.",
-        ),
-    ]
+class ConsumerDefinition(TypedDict):
+    topic: str
+    strategy_factory: str
 
 
 # consumer name -> consumer definition
@@ -57,100 +13,8 @@ KAFKA_CONSUMERS: Mapping[str, ConsumerDefinition] = {
     "ingest-profiles": {
         "topic": settings.KAFKA_PROFILES,
         "strategy_factory": "sentry.profiles.consumers.process.factory.ProcessProfileStrategyFactory",
-    },
-    "ingest-replay-recordings": {
-        "topic": settings.KAFKA_INGEST_REPLAYS_RECORDINGS,
-        "strategy_factory": "sentry.replays.consumers.recording.ProcessReplayRecordingStrategyFactory",
-    },
-    "ingest-monitors": {
-        "topic": settings.KAFKA_INGEST_MONITORS,
-        "strategy_factory": "sentry.monitors.consumers.monitor_consumer.StoreMonitorCheckInStrategyFactory",
-    },
-    "billing-metrics-consumer": {
-        "topic": settings.KAFKA_SNUBA_GENERIC_METRICS,
-        "strategy_factory": "sentry.ingest.billing_metrics_consumer.BillingMetricsConsumerStrategyFactory",
-    },
-    # Known differences to 'sentry run occurrences-ingest-consumer':
-    # - ingest_consumer_types metric tag is missing. Use the kafka_topic and
-    #   group_id tags provided by run_basic_consumer instead
-    "ingest-occurrences": {
-        "topic": settings.KAFKA_INGEST_OCCURRENCES,
-        "strategy_factory": "sentry.issues.run.OccurrenceStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=20),
-    },
-    "events-subscription-results": {
-        "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
-        "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "topic": settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS,
-        },
-    },
-    "transactions-subscription-results": {
-        "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
-        "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "topic": settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS,
-        },
-    },
-    "generic-metrics-subscription-results": {
-        "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
-        "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "topic": settings.KAFKA_GENERIC_METRICS_SUBSCRIPTIONS_RESULTS,
-        },
-    },
-    "sessions-subscription-results": {
-        "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
-        "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
-        "click_options": multiprocessing_options(),
-        "static_args": {
-            "topic": settings.KAFKA_SESSIONS_SUBSCRIPTIONS_RESULTS,
-        },
-    },
-    "metrics-subscription-results": {
-        "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
-        "strategy_factory": "sentry.snuba.query_subscriptions.run.QuerySubscriptionStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "topic": settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS,
-        },
-    },
-    "ingest-events": {
-        "topic": settings.KAFKA_INGEST_EVENTS,
-        "strategy_factory": "sentry.ingest.consumer_v2.factory.IngestStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "consumer_type": "events",
-        },
-    },
-    "ingest-attachments": {
-        "topic": settings.KAFKA_INGEST_ATTACHMENTS,
-        "strategy_factory": "sentry.ingest.consumer_v2.factory.IngestStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "consumer_type": "attachments",
-        },
-    },
-    "ingest-transactions": {
-        "topic": settings.KAFKA_INGEST_TRANSACTIONS,
-        "strategy_factory": "sentry.ingest.consumer_v2.factory.IngestStrategyFactory",
-        "click_options": multiprocessing_options(default_max_batch_size=100),
-        "static_args": {
-            "consumer_type": "transactions",
-        },
-    },
+    }
 }
 
-
-def print_deprecation_warning(name, group_id):
-    assert name in KAFKA_CONSUMERS, name
-
-    import click
-
-    click.echo(
-        f"WARNING: Deprecated command, use sentry run consumer {name} "
-        f"--consumer-group {group_id} ..."
-    )
+for consumer in KAFKA_CONSUMERS:
+    assert KAFKA_CONSUMERS[consumer]["topic"] in settings.KAFKA_TOPICS, consumer

+ 38 - 33
src/sentry/ingest/consumer_v2/factory.py

@@ -1,6 +1,6 @@
 from __future__ import annotations
 
-from typing import Any, Mapping, MutableMapping
+from typing import Any, Mapping, MutableMapping, NamedTuple
 
 from arroyo import Topic
 from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
@@ -24,43 +24,37 @@ from sentry.snuba.utils import initialize_consumer_state
 from sentry.utils import kafka_config
 
 
+class MultiProcessConfig(NamedTuple):
+    num_processes: int
+    max_batch_size: int
+    max_batch_time: int
+    input_block_size: int
+    output_block_size: int
+
+
 class IngestStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
     def __init__(
         self,
-        consumer_type: str,
-        num_processes: int,
-        max_batch_size: int,
-        max_batch_time: int,
-        input_block_size: int,
-        output_block_size: int,
+        health_checker: HealthChecker,
+        multi_process: MultiProcessConfig | None = None,
     ):
-        self.consumer_type = consumer_type
-        self.num_processes = num_processes
-        self.max_batch_size = max_batch_size
-        self.max_batch_time = max_batch_time
-        self.input_block_size = input_block_size
-        self.output_block_size = output_block_size
-        self.health_checker = HealthChecker()
+        self.health_checker = health_checker
+        self.multi_process = multi_process
 
     def create_with_partitions(
         self,
         commit: Commit,
         partitions: Mapping[Partition, int],
     ) -> ProcessingStrategy[KafkaPayload]:
-
-        # The attachments consumer that is used for multiple message types needs
-        # ordering guarantees: Attachments have to be written before the event using
-        # them is being processed. We will use a simple serial `RunTask` for those
-        # for now.
-        if self.num_processes > 1 and self.consumer_type != ConsumerType.Attachments:
-            return RunTaskWithMultiprocessing(
+        if (mp := self.multi_process) is not None:
+            next_step = RunTaskWithMultiprocessing(
                 process_ingest_message,
                 CommitOffsets(commit),
-                num_processes=self.num_processes,
-                max_batch_size=self.max_batch_size,
-                max_batch_time=self.max_batch_time,
-                input_block_size=self.input_block_size,
-                output_block_size=self.output_block_size,
+                mp.num_processes,
+                mp.max_batch_size,
+                mp.max_batch_time,
+                mp.input_block_size,
+                mp.output_block_size,
                 initializer=initialize_consumer_state,
             )
         else:
@@ -79,7 +73,7 @@ def get_ingest_consumer(
     strict_offset_reset: bool,
     max_batch_size: int,
     max_batch_time: int,
-    num_processes: int,
+    processes: int,
     input_block_size: int,
     output_block_size: int,
     force_topic: str | None,
@@ -95,16 +89,27 @@ def get_ingest_consumer(
     )
     consumer = KafkaConsumer(consumer_config)
 
-    return StreamProcessor(
-        consumer=consumer,
-        topic=Topic(topic),
-        processor_factory=IngestStrategyFactory(
-            consumer_type=consumer_type,
-            num_processes=num_processes,
+    # The attachments consumer that is used for multiple message types needs
+    # ordering guarantees: Attachments have to be written before the event using
+    # them is being processed. We will use a simple serial `RunTask` for those
+    # for now.
+    multi_process = None
+    if processes > 1 and consumer_type != ConsumerType.Attachments:
+        multi_process = MultiProcessConfig(
+            num_processes=processes,
             max_batch_size=max_batch_size,
             max_batch_time=max_batch_time,
             input_block_size=input_block_size,
             output_block_size=output_block_size,
+        )
+
+    health_checker = HealthChecker()
+
+    return StreamProcessor(
+        consumer=consumer,
+        topic=Topic(topic),
+        processor_factory=IngestStrategyFactory(
+            health_checker=health_checker, multi_process=multi_process
         ),
         commit_policy=ONCE_PER_SECOND,
     )

+ 6 - 6
src/sentry/issues/run.py

@@ -27,7 +27,7 @@ def get_occurrences_ingest_consumer(
     strict_offset_reset: bool,
     max_batch_size: int,
     max_batch_time: int,
-    num_processes: int,
+    processes: int,
     input_block_size: int,
     output_block_size: int,
 ) -> StreamProcessor[KafkaPayload]:
@@ -38,7 +38,7 @@ def get_occurrences_ingest_consumer(
         strict_offset_reset,
         max_batch_size,
         max_batch_time,
-        num_processes,
+        processes,
         input_block_size,
         output_block_size,
     )
@@ -51,7 +51,7 @@ def create_ingest_occurences_consumer(
     strict_offset_reset: bool,
     max_batch_size: int,
     max_batch_time: int,
-    num_processes: int,
+    processes: int,
     input_block_size: int,
     output_block_size: int,
 ) -> StreamProcessor[KafkaPayload]:
@@ -75,7 +75,7 @@ def create_ingest_occurences_consumer(
     strategy_factory = OccurrenceStrategyFactory(
         max_batch_size,
         max_batch_time,
-        num_processes,
+        processes,
         input_block_size,
         output_block_size,
     )
@@ -93,14 +93,14 @@ class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         self,
         max_batch_size: int,
         max_batch_time: int,
-        num_processes: int,
+        processes: int,
         input_block_size: int,
         output_block_size: int,
     ):
         super().__init__()
         self.max_batch_size = max_batch_size
         self.max_batch_time = max_batch_time
-        self.num_processes = num_processes
+        self.num_processes = processes
         self.input_block_size = input_block_size
         self.output_block_size = output_block_size
 

+ 5 - 52
src/sentry/runner/commands/run.py

@@ -478,18 +478,8 @@ def post_process_forwarder(**options):
 @log_options()
 @configuration
 def query_subscription_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-    from sentry.snuba.query_subscriptions.constants import (
-        dataset_to_logical_topic,
-        topic_to_dataset,
-    )
     from sentry.snuba.query_subscriptions.run import get_query_subscription_consumer
 
-    dataset = topic_to_dataset[options["topic"]]
-    logical_topic = dataset_to_logical_topic[dataset]
-    # name of new consumer == name of logical topic
-    print_deprecation_warning(logical_topic, options["group_id"])
-
     subscriber = get_query_subscription_consumer(
         topic=options["topic"],
         group_id=options["group"],
@@ -498,7 +488,7 @@ def query_subscription_consumer(**options):
         max_batch_size=options["max_batch_size"],
         # Our batcher expects the time in seconds
         max_batch_time=int(options["max_batch_time"] / 1000),
-        num_processes=options["processes"],
+        processes=options["processes"],
         input_block_size=options["input_block_size"],
         output_block_size=options["output_block_size"],
         multi_proc=True,
@@ -520,7 +510,6 @@ def query_subscription_consumer(**options):
 @configuration
 @click.option(
     "--processes",
-    "num_processes",
     default=1,
     type=int,
 )
@@ -533,10 +522,6 @@ def ingest_consumer(consumer_type, **options):
     The "ingest consumer" tasks read events from a kafka topic (coming from Relay) and schedules
     process event celery tasks for them
     """
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning(ConsumerType.get_topic_name(consumer_type), options["group_id"])
-
     from arroyo import configure_metrics
 
     from sentry.ingest.consumer_v2.factory import get_ingest_consumer
@@ -561,16 +546,12 @@ def ingest_consumer(consumer_type, **options):
 @configuration
 @click.option(
     "--processes",
-    "num_processes",
     default=1,
     type=int,
 )
 @click.option("--input-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 @click.option("--output-block-size", type=int, default=DEFAULT_BLOCK_SIZE)
 def occurrences_ingest_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning("ingest-occurrences", options["group_id"])
     from django.conf import settings
 
     from sentry.utils import metrics
@@ -632,9 +613,6 @@ def metrics_parallel_consumer(**options):
 @strict_offset_reset_option()
 @configuration
 def metrics_billing_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning("billing-metrics-consumer", options["group_id"])
     from sentry.ingest.billing_metrics_consumer import get_metrics_billing_consumer
 
     consumer = get_metrics_billing_consumer(**options)
@@ -648,9 +626,6 @@ def metrics_billing_consumer(**options):
 @strict_offset_reset_option()
 @configuration
 def profiles_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning("ingest-profiles", options["group_id"])
     from sentry.profiles.consumers import get_profiles_process_consumer
 
     consumer = get_profiles_process_consumer(**options)
@@ -662,7 +637,6 @@ def profiles_consumer(**options):
 @click.argument(
     "consumer_name",
 )
-@click.argument("consumer_args", nargs=-1)
 @click.option(
     "--topic",
     type=str,
@@ -683,7 +657,7 @@ def profiles_consumer(**options):
 )
 @strict_offset_reset_option()
 @configuration
-def basic_consumer(consumer_name, consumer_args, topic, **options):
+def basic_consumer(consumer_name, topic, **options):
     """
     Launch a "new-style" consumer based on its "consumer name".
 
@@ -692,15 +666,6 @@ def basic_consumer(consumer_name, consumer_args, topic, **options):
         sentry run consumer ingest-profiles --consumer-group ingest-profiles
 
     runs the ingest-profiles consumer with the consumer group ingest-profiles.
-
-    Consumers are defined in 'sentry.consumers'. Each consumer can take
-    additional CLI options. Those can be passed after '--':
-
-        sentry run consumer ingest-occurrences --consumer-group occurrence-consumer -- --processes 1
-
-    Consumer-specific arguments can be viewed with:
-
-        sentry run consumer ingest-occurrences --consumer-group occurrence-consumer -- --help
     """
     from sentry.consumers import KAFKA_CONSUMERS
 
@@ -721,17 +686,11 @@ def basic_consumer(consumer_name, consumer_args, topic, **options):
             f"responsible for this consumer"
         )
 
-    cmd = click.Command(
-        name=consumer_name, params=list(consumer_definition.get("click_options") or ())
-    )
-    cmd_context = cmd.make_context(consumer_name, list(consumer_args))
-    strategy_factory = cmd_context.invoke(
-        strategy_factory_cls, **cmd_context.params, **consumer_definition.get("static_args") or {}
-    )
-
     from sentry.utils.arroyo import run_basic_consumer
 
-    run_basic_consumer(topic=topic or default_topic, **options, strategy_factory=strategy_factory)
+    run_basic_consumer(
+        topic=topic or default_topic, **options, strategy_factory_cls=strategy_factory_cls
+    )
 
 
 @run.command("ingest-replay-recordings")
@@ -742,9 +701,6 @@ def basic_consumer(consumer_name, consumer_args, topic, **options):
     "--topic", default="ingest-replay-recordings", help="Topic to get replay recording data from"
 )
 def replays_recordings_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning("ingest-replay-recordings", options["group_id"])
     from sentry.replays.consumers import get_replays_recordings_consumer
 
     consumer = get_replays_recordings_consumer(**options)
@@ -758,9 +714,6 @@ def replays_recordings_consumer(**options):
 @strict_offset_reset_option()
 @configuration
 def monitors_consumer(**options):
-    from sentry.consumers import print_deprecation_warning
-
-    print_deprecation_warning("ingest-monitors", options["group_id"])
     from sentry.monitors.consumers import get_monitor_check_ins_consumer
 
     consumer = get_monitor_check_ins_consumer(**options)

+ 4 - 4
src/sentry/snuba/query_subscriptions/run.py

@@ -32,7 +32,7 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         topic: str,
         max_batch_size: int,
         max_batch_time: int,
-        num_processes: int,
+        processes: int,
         input_block_size: int,
         output_block_size: int,
         multi_proc: bool = True,
@@ -42,7 +42,7 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
         self.logical_topic = dataset_to_logical_topic[self.dataset]
         self.max_batch_size = max_batch_size
         self.max_batch_time = max_batch_time
-        self.num_processes = num_processes
+        self.num_processes = processes
         self.input_block_size = input_block_size
         self.output_block_size = output_block_size
         self.multi_proc = multi_proc
@@ -115,7 +115,7 @@ def get_query_subscription_consumer(
     initial_offset_reset: str,
     max_batch_size: int,
     max_batch_time: int,
-    num_processes: int,
+    processes: int,
     input_block_size: int,
     output_block_size: int,
     multi_proc: bool = False,
@@ -144,7 +144,7 @@ def get_query_subscription_consumer(
             topic,
             max_batch_size,
             max_batch_time,
-            num_processes,
+            processes,
             input_block_size,
             output_block_size,
             multi_proc=multi_proc,

+ 3 - 3
src/sentry/utils/arroyo.py

@@ -1,7 +1,7 @@
 from __future__ import annotations
 
 from functools import partial
-from typing import Any, Callable, Mapping, Optional, Union
+from typing import Any, Callable, Mapping, Optional, Type, Union
 
 from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
 from arroyo.backends.kafka.consumer import KafkaConsumer
@@ -129,7 +129,7 @@ def run_basic_consumer(
     group_id: str,
     auto_offset_reset: str,
     strict_offset_reset: bool,
-    strategy_factory: ProcessingStrategyFactory[Any],
+    strategy_factory_cls: Type[ProcessingStrategyFactory[Any]],
 ) -> None:
     from django.conf import settings
 
@@ -157,7 +157,7 @@ def run_basic_consumer(
     processor = StreamProcessor(
         consumer=consumer,
         topic=Topic(topic),
-        processor_factory=strategy_factory,
+        processor_factory=strategy_factory_cls(),
         commit_policy=ONCE_PER_SECOND,
     )
 

+ 1 - 1
src/sentry/utils/pytest/kafka.py

@@ -157,7 +157,7 @@ def session_ingest_consumer(scope_consumers, kafka_admin, task_runner):
             strict_offset_reset=False,
             max_batch_size=1,
             max_batch_time=10,
-            num_processes=1,
+            processes=1,
             input_block_size=1,
             output_block_size=1,
             force_topic=topic_event_name,

+ 0 - 18
tests/sentry/consumers/test_run.py

@@ -1,18 +0,0 @@
-import pytest
-from arroyo.processing.strategies.abstract import ProcessingStrategyFactory
-
-from sentry import consumers
-from sentry.utils.imports import import_string
-
-
-@pytest.mark.parametrize("consumer_def", list(consumers.KAFKA_CONSUMERS.items()))
-def test_all_importable(consumer_def, settings):
-    name: str
-    defn: consumers.ConsumerDefinition
-    name, defn = consumer_def
-
-    factory = import_string(defn["strategy_factory"])
-    assert issubclass(factory, ProcessingStrategyFactory)
-
-    topic = defn["topic"]
-    assert topic is None or topic in settings.KAFKA_TOPICS

+ 2 - 2
tests/sentry/ingest/ingest_consumer/test_ingest_consumer_kafka.py

@@ -112,7 +112,7 @@ def test_ingest_consumer_reads_from_topic_and_calls_celery_task(
         strict_offset_reset=None,
         max_batch_size=2,
         max_batch_time=5,
-        num_processes=1,
+        processes=1,
         input_block_size=DEFAULT_BLOCK_SIZE,
         output_block_size=DEFAULT_BLOCK_SIZE,
         force_cluster=None,
@@ -173,7 +173,7 @@ def test_ingest_topic_can_be_overridden(
         strict_offset_reset=None,
         max_batch_size=2,
         max_batch_time=5,
-        num_processes=1,
+        processes=1,
         input_block_size=DEFAULT_BLOCK_SIZE,
         output_block_size=DEFAULT_BLOCK_SIZE,
         force_topic=new_event_topic,