|
@@ -9,7 +9,6 @@ from datetime import datetime, timedelta
|
|
|
from functools import partial
|
|
|
from typing import Literal
|
|
|
|
|
|
-import msgpack
|
|
|
import sentry_sdk
|
|
|
from arroyo.backends.kafka.consumer import KafkaPayload
|
|
|
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
|
|
@@ -19,7 +18,6 @@ from arroyo.processing.strategies.run_task import RunTask
|
|
|
from arroyo.types import BrokerValue, Commit, Message, Partition
|
|
|
from django.db import router, transaction
|
|
|
from sentry_kafka_schemas import get_codec
|
|
|
-from sentry_kafka_schemas.codecs import ValidationError
|
|
|
from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import IngestMonitorMessage
|
|
|
from sentry_sdk.tracing import Span, Transaction
|
|
|
|
|
@@ -864,11 +862,7 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
|
|
|
assert isinstance(item, BrokerValue)
|
|
|
|
|
|
try:
|
|
|
- try:
|
|
|
- wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(item.payload.value)
|
|
|
- except ValidationError:
|
|
|
- wrapper = msgpack.unpackb(item.payload.value)
|
|
|
- logger.exception("Failed to unpack message payload via sentry_kafka_schemas")
|
|
|
+ wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(item.payload.value)
|
|
|
except Exception:
|
|
|
logger.exception("Failed to unpack message payload")
|
|
|
continue
|
|
@@ -913,12 +907,7 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
|
|
|
def process_single(message: Message[KafkaPayload]):
|
|
|
assert isinstance(message.value, BrokerValue)
|
|
|
try:
|
|
|
- try:
|
|
|
- wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(message.payload.value)
|
|
|
- except ValidationError:
|
|
|
- logger.exception("Failed to unpack message payload via sentry_kafka_schemas")
|
|
|
- wrapper = msgpack.unpackb(message.payload.value)
|
|
|
-
|
|
|
+ wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(message.payload.value)
|
|
|
ts = message.value.timestamp
|
|
|
partition = message.value.partition.index
|
|
|
|