|
@@ -1,6 +1,6 @@
|
|
|
import logging
|
|
|
import signal
|
|
|
-from typing import Any, Mapping, Optional, Tuple
|
|
|
+from typing import Any, Literal, Mapping, Optional, Tuple, Union
|
|
|
|
|
|
from confluent_kafka import OFFSET_INVALID, TopicPartition
|
|
|
from django.conf import settings
|
|
@@ -191,23 +191,24 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
def _build_consumer(
|
|
|
self,
|
|
|
- entity,
|
|
|
- consumer_group,
|
|
|
- commit_log_topic,
|
|
|
- synchronize_commit_group,
|
|
|
- commit_batch_size=100,
|
|
|
- commit_batch_timeout_ms=5000,
|
|
|
- initial_offset_reset="latest",
|
|
|
+ entity: Union[Literal["all"], Literal["errors"], Literal["transactions"]],
|
|
|
+ consumer_group: str,
|
|
|
+ commit_log_topic: str,
|
|
|
+ synchronize_commit_group: str,
|
|
|
+ commit_batch_size: int = 100,
|
|
|
+ commit_batch_timeout_ms: int = 5000,
|
|
|
+ initial_offset_reset: Union[Literal["latest"], Literal["earliest"]] = "latest",
|
|
|
):
|
|
|
concurrency = options.get(_CONCURRENCY_OPTION)
|
|
|
logger.info(f"Starting post process forwrader to consume {entity} messages")
|
|
|
if entity == PostProcessForwarderType.TRANSACTIONS:
|
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_TRANSACTIONS]["cluster"]
|
|
|
worker = TransactionsPostProcessForwarderWorker(concurrency=concurrency)
|
|
|
+ topic = self.transactions_topic
|
|
|
elif entity == PostProcessForwarderType.ERRORS:
|
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
|
|
|
-
|
|
|
worker = ErrorsPostProcessForwarderWorker(concurrency=concurrency)
|
|
|
+ topic = self.topic
|
|
|
else:
|
|
|
# Default implementation which processes both errors and transactions
|
|
|
# irrespective of values in the header. This would most likely be the case
|
|
@@ -216,6 +217,8 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
|
|
|
assert cluster_name == settings.KAFKA_TOPICS[settings.KAFKA_TRANSACTIONS]["cluster"]
|
|
|
worker = PostProcessForwarderWorker(concurrency=concurrency)
|
|
|
+ topic = self.topic
|
|
|
+ assert self.topic == self.transactions_topic
|
|
|
|
|
|
synchronized_consumer = SynchronizedConsumer(
|
|
|
cluster_name=cluster_name,
|
|
@@ -226,7 +229,7 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
)
|
|
|
|
|
|
consumer = BatchingKafkaConsumer(
|
|
|
- topics=self.topic,
|
|
|
+ topics=topic,
|
|
|
worker=worker,
|
|
|
max_batch_size=commit_batch_size,
|
|
|
max_batch_time=commit_batch_timeout_ms,
|
|
@@ -237,13 +240,13 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
def run_batched_consumer(
|
|
|
self,
|
|
|
- entity,
|
|
|
- consumer_group,
|
|
|
- commit_log_topic,
|
|
|
- synchronize_commit_group,
|
|
|
- commit_batch_size=100,
|
|
|
- commit_batch_timeout_ms=5000,
|
|
|
- initial_offset_reset="latest",
|
|
|
+ entity: Union[Literal["all"], Literal["errors"], Literal["transactions"]],
|
|
|
+ consumer_group: str,
|
|
|
+ commit_log_topic: str,
|
|
|
+ synchronize_commit_group: str,
|
|
|
+ commit_batch_size: int = 100,
|
|
|
+ commit_batch_timeout_ms: int = 5000,
|
|
|
+ initial_offset_reset: Union[Literal["latest"], Literal["earliest"]] = "latest",
|
|
|
):
|
|
|
consumer = self._build_consumer(
|
|
|
entity,
|
|
@@ -460,13 +463,13 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
def run_post_process_forwarder(
|
|
|
self,
|
|
|
- entity,
|
|
|
- consumer_group,
|
|
|
- commit_log_topic,
|
|
|
- synchronize_commit_group,
|
|
|
- commit_batch_size=100,
|
|
|
- commit_batch_timeout_ms=5000,
|
|
|
- initial_offset_reset="latest",
|
|
|
+ entity: Union[Literal["all"], Literal["errors"], Literal["transactions"]],
|
|
|
+ consumer_group: str,
|
|
|
+ commit_log_topic: str,
|
|
|
+ synchronize_commit_group: str,
|
|
|
+ commit_batch_size: int = 100,
|
|
|
+ commit_batch_timeout_ms: int = 5000,
|
|
|
+ initial_offset_reset: Union[Literal["latest"], Literal["earliest"]] = "latest",
|
|
|
):
|
|
|
logger.debug("Starting post-process forwarder...")
|
|
|
|