|
@@ -308,7 +308,19 @@ def run_taskworker(
|
|
|
"--bootstrap-servers",
|
|
|
type=str,
|
|
|
help="The bootstrap servers to use for the kafka topic",
|
|
|
- required=True,
|
|
|
+ default="127.0.0.1:9092",
|
|
|
+)
|
|
|
+@click.option(
|
|
|
+ "--kafka-topic",
|
|
|
+ type=str,
|
|
|
+ help="The kafka topic to use for the task",
|
|
|
+ default=None,
|
|
|
+)
|
|
|
+@click.option(
|
|
|
+ "--namespace",
|
|
|
+ type=str,
|
|
|
+ help="The namespace that the task is registered in",
|
|
|
+ default=None,
|
|
|
)
|
|
|
def taskbroker_send_tasks(
|
|
|
task_function_path: str,
|
|
@@ -316,11 +328,16 @@ def taskbroker_send_tasks(
|
|
|
kwargs: str,
|
|
|
repeat: int,
|
|
|
bootstrap_servers: str,
|
|
|
+ kafka_topic: str,
|
|
|
+ namespace: str,
|
|
|
) -> None:
|
|
|
- from sentry.conf.server import KAFKA_CLUSTERS
|
|
|
+ from sentry.conf.server import KAFKA_CLUSTERS, TASKWORKER_ROUTES
|
|
|
from sentry.utils.imports import import_string
|
|
|
|
|
|
KAFKA_CLUSTERS["default"]["common"]["bootstrap.servers"] = bootstrap_servers
|
|
|
+ if kafka_topic and namespace:
|
|
|
+ TASKWORKER_ROUTES[namespace] = kafka_topic
|
|
|
+
|
|
|
try:
|
|
|
func = import_string(task_function_path)
|
|
|
except Exception as e:
|