|
@@ -3,6 +3,7 @@ from typing import Callable, Generic, Mapping, Sequence, TypeVar
|
|
|
|
|
|
from arroyo import Partition, Topic
|
|
|
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
|
|
|
+from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
|
|
|
from arroyo.processing import StreamProcessor
|
|
|
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
|
|
|
from arroyo.processing.strategies.batching import AbstractBatchWorker, BatchProcessingStrategy
|
|
@@ -25,14 +26,12 @@ def get_region_to_control_consumer(
|
|
|
) -> StreamProcessor[KafkaPayload]:
|
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_REGION_TO_CONTROL]["cluster"]
|
|
|
consumer = KafkaConsumer(
|
|
|
- get_kafka_consumer_cluster_options(
|
|
|
- cluster_name,
|
|
|
- override_params={
|
|
|
- "auto.offset.reset": auto_offset_reset,
|
|
|
- "enable.auto.commit": "false",
|
|
|
- "enable.auto.offset.store": "false",
|
|
|
- "group.id": group_id,
|
|
|
- },
|
|
|
+ build_kafka_consumer_configuration(
|
|
|
+ default_config=get_kafka_consumer_cluster_options(
|
|
|
+ cluster_name,
|
|
|
+ ),
|
|
|
+ auto_offset_reset=auto_offset_reset,
|
|
|
+ group_id=group_id,
|
|
|
)
|
|
|
)
|
|
|
|