|
@@ -402,11 +402,14 @@ def get_stream_processor(
|
|
|
|
|
|
topic_defn = get_topic_definition(consumer_topic)
|
|
|
real_topic = topic_defn["real_topic_name"]
|
|
|
- cluster = topic_defn["cluster"]
|
|
|
+ cluster_from_config = topic_defn["cluster"]
|
|
|
|
|
|
if topic is None:
|
|
|
topic = real_topic
|
|
|
|
|
|
+ if cluster is None:
|
|
|
+ cluster = cluster_from_config
|
|
|
+
|
|
|
cmd = click.Command(
|
|
|
name=consumer_name, params=list(consumer_definition.get("click_options") or ())
|
|
|
)
|