|
@@ -2,7 +2,7 @@ import datetime
|
|
|
import functools
|
|
|
from abc import abstractmethod
|
|
|
from datetime import timedelta
|
|
|
-from typing import Any, Callable, Mapping, Optional, Set, Union
|
|
|
+from typing import Any, Mapping, Optional, Set, Union
|
|
|
|
|
|
import rapidjson
|
|
|
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
|
|
@@ -10,9 +10,10 @@ from arroyo.commit import IMMEDIATE
|
|
|
from arroyo.processing import StreamProcessor
|
|
|
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
|
|
|
from arroyo.processing.strategies.collect import CollectStep
|
|
|
+from arroyo.processing.strategies.commit import CommitOffsets
|
|
|
from arroyo.processing.strategies.filter import FilterStep
|
|
|
from arroyo.processing.strategies.transform import TransformStep
|
|
|
-from arroyo.types import Message, Partition, Position, Topic
|
|
|
+from arroyo.types import Commit, Message, Partition, Topic
|
|
|
from django.utils import timezone
|
|
|
|
|
|
from sentry.sentry_metrics.configuration import MetricsIngestConfiguration, UseCaseKey
|
|
@@ -186,14 +187,14 @@ class LastSeenUpdaterStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
|
|
|
def create_with_partitions(
|
|
|
self,
|
|
|
- commit: Callable[[Mapping[Partition, Position]], None],
|
|
|
+ commit: Commit,
|
|
|
partitions: Mapping[Partition, int],
|
|
|
) -> ProcessingStrategy[KafkaPayload]:
|
|
|
collect_step = CollectStep(
|
|
|
lambda: LastSeenUpdaterCollector(
|
|
|
metrics=self.__metrics, table=TABLE_MAPPING[self.__use_case_id]
|
|
|
),
|
|
|
- commit,
|
|
|
+ CommitOffsets(commit),
|
|
|
self.__max_batch_size,
|
|
|
self.__max_batch_time,
|
|
|
)
|