|
@@ -268,12 +268,13 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
def run_streaming_consumer(
|
|
def run_streaming_consumer(
|
|
self,
|
|
self,
|
|
- consumer_group,
|
|
|
|
- commit_log_topic,
|
|
|
|
- synchronize_commit_group,
|
|
|
|
- commit_batch_size=100,
|
|
|
|
- initial_offset_reset="latest",
|
|
|
|
- ):
|
|
|
|
|
|
+ entity: Union[Literal["all"], Literal["errors"], Literal["transactions"]],
|
|
|
|
+ consumer_group: str,
|
|
|
|
+ commit_log_topic: str,
|
|
|
|
+ synchronize_commit_group: str,
|
|
|
|
+ commit_batch_size: int = 100,
|
|
|
|
+ initial_offset_reset: Union[Literal["latest"], Literal["earliest"]] = "latest",
|
|
|
|
+ ) -> None:
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
|
|
cluster_name = settings.KAFKA_TOPICS[settings.KAFKA_EVENTS]["cluster"]
|
|
|
|
|
|
consumer = SynchronizedConsumer(
|
|
consumer = SynchronizedConsumer(
|
|
@@ -356,7 +357,15 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
)
|
|
)
|
|
commit(offsets_to_commit)
|
|
commit(offsets_to_commit)
|
|
|
|
|
|
- consumer.subscribe([self.topic], on_assign=on_assign, on_revoke=on_revoke)
|
|
|
|
|
|
+ if entity == "transactions":
|
|
|
|
+ topic = self.transactions_topic
|
|
|
|
+ elif entity == "errors":
|
|
|
|
+ topic = self.topic
|
|
|
|
+ else:
|
|
|
|
+ topic = self.topic
|
|
|
|
+ assert self.topic == self.transactions_topic
|
|
|
|
+
|
|
|
|
+ consumer.subscribe([topic], on_assign=on_assign, on_revoke=on_revoke)
|
|
|
|
|
|
def commit_offsets():
|
|
def commit_offsets():
|
|
offsets_to_commit = []
|
|
offsets_to_commit = []
|
|
@@ -487,6 +496,7 @@ class KafkaEventStream(SnubaProtocolEventStream):
|
|
else:
|
|
else:
|
|
logger.info("Starting streaming consumer")
|
|
logger.info("Starting streaming consumer")
|
|
self.run_streaming_consumer(
|
|
self.run_streaming_consumer(
|
|
|
|
+ entity,
|
|
consumer_group,
|
|
consumer_group,
|
|
commit_log_topic,
|
|
commit_log_topic,
|
|
synchronize_commit_group,
|
|
synchronize_commit_group,
|