|
@@ -8,7 +8,12 @@ from arroyo.backends.kafka.configuration import build_kafka_consumer_configurati
|
|
|
from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
|
|
|
from arroyo.commit import ONCE_PER_SECOND
|
|
|
from arroyo.processing.processor import StreamProcessor
|
|
|
-from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
|
|
|
+from arroyo.processing.strategies import (
|
|
|
+ CommitOffsets,
|
|
|
+ ProcessingStrategy,
|
|
|
+ ProcessingStrategyFactory,
|
|
|
+ RunTask,
|
|
|
+)
|
|
|
from arroyo.types import Commit, Message, Partition
|
|
|
from django.conf import settings
|
|
|
from django.utils import timezone
|
|
@@ -252,41 +257,22 @@ def _process_message(
|
|
|
raise InvalidEventPayloadError(e)
|
|
|
|
|
|
|
|
|
-class OccurrenceStrategy(ProcessingStrategy[KafkaPayload]):
|
|
|
- def __init__(
|
|
|
- self,
|
|
|
- committer: Commit,
|
|
|
- partitions: Mapping[Partition, int],
|
|
|
- ):
|
|
|
- pass
|
|
|
-
|
|
|
- def poll(self) -> None:
|
|
|
- pass
|
|
|
-
|
|
|
- def submit(self, message: Message[KafkaPayload]) -> None:
|
|
|
- try:
|
|
|
- payload = json.loads(message.payload.value, use_rapid_json=True)
|
|
|
- _process_message(payload)
|
|
|
- except (rapidjson.JSONDecodeError, InvalidEventPayloadError, EventLookupError, Exception):
|
|
|
- logger.exception("failed to process message payload")
|
|
|
-
|
|
|
- def close(self) -> None:
|
|
|
- pass
|
|
|
-
|
|
|
- def terminate(self) -> None:
|
|
|
- pass
|
|
|
-
|
|
|
- def join(self, timeout: Optional[float] = None) -> None:
|
|
|
- pass
|
|
|
-
|
|
|
-
|
|
|
class OccurrenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
- def __init__(self) -> None:
|
|
|
- pass
|
|
|
-
|
|
|
def create_with_partitions(
|
|
|
self,
|
|
|
commit: Commit,
|
|
|
partitions: Mapping[Partition, int],
|
|
|
) -> ProcessingStrategy[KafkaPayload]:
|
|
|
- return OccurrenceStrategy(commit, partitions)
|
|
|
+ def process_message(message: Message[KafkaPayload]) -> None:
|
|
|
+ try:
|
|
|
+ payload = json.loads(message.payload.value, use_rapid_json=True)
|
|
|
+ _process_message(payload)
|
|
|
+ except (
|
|
|
+ rapidjson.JSONDecodeError,
|
|
|
+ InvalidEventPayloadError,
|
|
|
+ EventLookupError,
|
|
|
+ Exception,
|
|
|
+ ):
|
|
|
+ logger.exception("failed to process message payload")
|
|
|
+
|
|
|
+ return RunTask(process_message, CommitOffsets(commit))
|