|
@@ -49,11 +49,13 @@ def wait_for_topics(admin_client: AdminClient, topics: List[str], timeout: int =
|
|
|
|
|
|
|
|
|
def create_topics(cluster_name: str, topics: List[str], force: bool = False) -> None:
|
|
|
- """If configured to do so, create topics and make sure that they exist
|
|
|
+ """
|
|
|
+ If configured to do so, create topics and make sure that they exist
|
|
|
|
|
|
topics must be from the same cluster.
|
|
|
"""
|
|
|
if settings.KAFKA_CONSUMER_AUTO_CREATE_TOPICS or force:
|
|
|
conf = kafka_config.get_kafka_admin_cluster_options(cluster_name)
|
|
|
+ # Topics are implicitly created here
|
|
|
admin_client = AdminClient(conf)
|
|
|
wait_for_topics(admin_client, topics)
|