Browse Source

ref(post-process-forwarder): Align CLI options to match other consumers (#46349)

Lyn Nagara 1 year ago
parent
commit
f74e57524c
1 changed files with 71 additions and 93 deletions
  1. 71 93
      src/sentry/runner/commands/run.py

+ 71 - 93
src/sentry/runner/commands/run.py

@@ -67,6 +67,73 @@ class QueueSetType(click.ParamType):
 QueueSet = QueueSetType()
 
 
+def kafka_options(
+    consumer_group: str,
+    allow_force_cluster: bool = True,
+    include_batching_options: bool = False,
+    default_max_batch_size: Optional[int] = None,
+    default_max_batch_time_ms: Optional[int] = 1000,
+):
+
+    """
+    Basic set of Kafka options for a consumer.
+    """
+
+    def inner(f):
+        f = click.option(
+            "--consumer-group",
+            "group_id",
+            default=consumer_group,
+            help="Kafka consumer group for the consumer.",
+        )(f)
+
+        f = click.option(
+            "--auto-offset-reset",
+            "auto_offset_reset",
+            default="latest",
+            type=click.Choice(["earliest", "latest", "error"]),
+            help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
+        )(f)
+
+        if include_batching_options:
+            f = click.option(
+                "--max-batch-size",
+                "max_batch_size",
+                default=default_max_batch_size,
+                type=int,
+                help="Maximum number of messages to batch before flushing.",
+            )(f)
+
+            f = click.option(
+                "--max-batch-time-ms",
+                "max_batch_time",
+                default=default_max_batch_time_ms,
+                type=int,
+                help="Maximum time (in seconds) to wait before flushing a batch.",
+            )(f)
+
+        if allow_force_cluster:
+            f = click.option(
+                "--force-topic",
+                "force_topic",
+                default=None,
+                type=str,
+                help="Override the Kafka topic the consumer will read from.",
+            )(f)
+
+            f = click.option(
+                "--force-cluster",
+                "force_cluster",
+                default=None,
+                type=str,
+                help="Kafka cluster ID of the overridden topic. Configure clusters via KAFKA_CLUSTERS in server settings.",
+            )(f)
+
+        return f
+
+    return inner
+
+
 def strict_offset_reset_option():
     return click.option(
         "--strict-offset-reset/--no-strict-offset-reset",
@@ -320,11 +387,8 @@ def cron(**options):
 
 
 @run.command("post-process-forwarder")
-@click.option(
-    "--consumer-group",
-    default="snuba-post-processor",
-    help="Consumer group used to track event offsets that have been enqueued for post-processing.",
-)
+@kafka_options("snuba-post-processor")
+@strict_offset_reset_option()
 @click.option(
     "--topic",
     type=str,
@@ -340,31 +404,12 @@ def cron(**options):
     default="snuba-consumers",
     help="Consumer group that the Snuba writer is committing its offset as.",
 )
-@click.option(
-    "--commit-batch-size",
-    default=1000,
-    type=int,
-    help="Deprecated. Remove once no longer passed in production.",
-)
-@click.option(
-    "--commit-batch-timeout-ms",
-    default=5000,
-    type=int,
-    help="Deprecated. Remove once no longer passed in production.",
-)
 @click.option(
     "--concurrency",
     default=5,
     type=int,
     help="Thread pool size for post process worker.",
 )
-@click.option(
-    "--initial-offset-reset",
-    default="latest",
-    type=click.Choice(["earliest", "latest"]),
-    help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
-)
-@strict_offset_reset_option()
 @click.option(
     "--entity",
     type=click.Choice(["errors", "transactions", "search_issues"]),
@@ -380,12 +425,12 @@ def post_process_forwarder(**options):
         # TODO(markus): convert to use run_processor_with_signals -- can't yet because there's a custom shutdown handler
         eventstream.run_post_process_forwarder(
             entity=options["entity"],
-            consumer_group=options["consumer_group"],
+            consumer_group=options["group_id"],
             topic=options["topic"],
             commit_log_topic=options["commit_log_topic"],
             synchronize_commit_group=options["synchronize_commit_group"],
             concurrency=options["concurrency"],
-            initial_offset_reset=options["initial_offset_reset"],
+            initial_offset_reset=options["auto_offset_reset"],
             strict_offset_reset=options["strict_offset_reset"],
         )
     except ForwarderNotRequired:
@@ -448,73 +493,6 @@ def query_subscription_consumer(**options):
     run_processor_with_signals(subscriber)
 
 
-def kafka_options(
-    consumer_group: str,
-    allow_force_cluster: bool = True,
-    include_batching_options: bool = False,
-    default_max_batch_size: Optional[int] = None,
-    default_max_batch_time_ms: Optional[int] = 1000,
-):
-
-    """
-    Basic set of Kafka options for a consumer.
-    """
-
-    def inner(f):
-        f = click.option(
-            "--consumer-group",
-            "group_id",
-            default=consumer_group,
-            help="Kafka consumer group for the consumer.",
-        )(f)
-
-        f = click.option(
-            "--auto-offset-reset",
-            "auto_offset_reset",
-            default="latest",
-            type=click.Choice(["earliest", "latest", "error"]),
-            help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
-        )(f)
-
-        if include_batching_options:
-            f = click.option(
-                "--max-batch-size",
-                "max_batch_size",
-                default=default_max_batch_size,
-                type=int,
-                help="Maximum number of messages to batch before flushing.",
-            )(f)
-
-            f = click.option(
-                "--max-batch-time-ms",
-                "max_batch_time",
-                default=default_max_batch_time_ms,
-                type=int,
-                help="Maximum time (in seconds) to wait before flushing a batch.",
-            )(f)
-
-        if allow_force_cluster:
-            f = click.option(
-                "--force-topic",
-                "force_topic",
-                default=None,
-                type=str,
-                help="Override the Kafka topic the consumer will read from.",
-            )(f)
-
-            f = click.option(
-                "--force-cluster",
-                "force_cluster",
-                default=None,
-                type=str,
-                help="Kafka cluster ID of the overridden topic. Configure clusters via KAFKA_CLUSTERS in server settings.",
-            )(f)
-
-        return f
-
-    return inner
-
-
 @run.command("ingest-consumer")
 @log_options()
 @click.option(