|
@@ -13,7 +13,6 @@ from arroyo.processing.strategies.commit import CommitOffsets
|
|
|
from arroyo.processing.strategies.filter import FilterStep
|
|
|
from arroyo.processing.strategies.reduce import Reduce
|
|
|
from arroyo.processing.strategies.run_task import RunTask
|
|
|
-from arroyo.processing.strategies.transform import TransformStep
|
|
|
from arroyo.types import BaseValue, Commit, Message, Partition, Topic
|
|
|
from django.utils import timezone
|
|
|
|
|
@@ -147,7 +146,7 @@ class LastSeenUpdaterStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
RunTask(do_update, CommitOffsets(commit)),
|
|
|
)
|
|
|
|
|
|
- transform_step = TransformStep(retrieve_db_read_keys, collect_step)
|
|
|
+ transform_step = RunTask(retrieve_db_read_keys, collect_step)
|
|
|
return FilterStep(self.__should_accept, transform_step, commit_policy=ONCE_PER_SECOND)
|
|
|
|
|
|
|