|
@@ -1,9 +1,10 @@
|
|
|
import logging
|
|
|
+from typing import Any, Callable, cast, Dict, Iterable, List, Optional
|
|
|
|
|
|
import jsonschema
|
|
|
import pytz
|
|
|
import sentry_sdk
|
|
|
-from confluent_kafka import Consumer, KafkaException, OFFSET_INVALID, TopicPartition
|
|
|
+from confluent_kafka import Consumer, KafkaException, Message, OFFSET_INVALID, TopicPartition
|
|
|
from confluent_kafka.admin import AdminClient
|
|
|
from dateutil.parser import parse as parse_date
|
|
|
from django.conf import settings
|
|
@@ -16,12 +17,15 @@ from sentry.utils.batching_kafka_consumer import wait_for_topics
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
+TQuerySubscriptionCallable = Callable[[Dict[str, Any], QuerySubscription], None]
|
|
|
|
|
|
-subscriber_registry = {}
|
|
|
+subscriber_registry: Dict[str, TQuerySubscriptionCallable] = {}
|
|
|
|
|
|
|
|
|
-def register_subscriber(subscriber_key):
|
|
|
- def inner(func):
|
|
|
+def register_subscriber(
|
|
|
+ subscriber_key: str,
|
|
|
+) -> Callable[[TQuerySubscriptionCallable], TQuerySubscriptionCallable]:
|
|
|
+ def inner(func: TQuerySubscriptionCallable) -> TQuerySubscriptionCallable:
|
|
|
if subscriber_key in subscriber_registry:
|
|
|
raise Exception("Handler already registered for %s" % subscriber_key)
|
|
|
subscriber_registry[subscriber_key] = func
|
|
@@ -45,28 +49,30 @@ class QuerySubscriptionConsumer:
|
|
|
These values are passed along to a callback associated with the subscription.
|
|
|
"""
|
|
|
|
|
|
- topic_to_dataset = {
|
|
|
+ topic_to_dataset: Dict[str, QueryDatasets] = {
|
|
|
settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS: QueryDatasets.EVENTS,
|
|
|
settings.KAFKA_TRANSACTIONS_SUBSCRIPTIONS_RESULTS: QueryDatasets.TRANSACTIONS,
|
|
|
}
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- group_id,
|
|
|
- topic=None,
|
|
|
- commit_batch_size=100,
|
|
|
- initial_offset_reset="earliest",
|
|
|
- force_offset_reset=None,
|
|
|
+ group_id: str,
|
|
|
+ topic: Optional[str] = None,
|
|
|
+ commit_batch_size: int = 100,
|
|
|
+ initial_offset_reset: str = "earliest",
|
|
|
+ force_offset_reset: Optional[str] = None,
|
|
|
):
|
|
|
self.group_id = group_id
|
|
|
if not topic:
|
|
|
- topic = settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS
|
|
|
+ # TODO(typing): Need a way to get the actual value of settings to avoid this
|
|
|
+ topic = cast(str, settings.KAFKA_EVENTS_SUBSCRIPTIONS_RESULTS)
|
|
|
+
|
|
|
self.topic = topic
|
|
|
- cluster_name = settings.KAFKA_TOPICS[topic]["cluster"]
|
|
|
+ cluster_name: str = settings.KAFKA_TOPICS[topic]["cluster"]
|
|
|
self.commit_batch_size = commit_batch_size
|
|
|
self.initial_offset_reset = initial_offset_reset
|
|
|
- self.offsets = {}
|
|
|
- self.consumer = None
|
|
|
+ self.offsets: Dict[int, Optional[int]] = {}
|
|
|
+ self.consumer: Consumer = None
|
|
|
self.cluster_options = kafka_config.get_kafka_consumer_cluster_options(
|
|
|
cluster_name,
|
|
|
{
|
|
@@ -84,26 +90,29 @@ class QuerySubscriptionConsumer:
|
|
|
)
|
|
|
self.resolve_partition_force_offset = self.offset_reset_name_to_func(force_offset_reset)
|
|
|
|
|
|
- def offset_reset_name_to_func(self, offset_reset):
|
|
|
+ def offset_reset_name_to_func(
|
|
|
+ self, offset_reset: Optional[str]
|
|
|
+ ) -> Optional[Callable[[TopicPartition], TopicPartition]]:
|
|
|
if offset_reset in {"smallest", "earliest", "beginning"}:
|
|
|
return self.resolve_partition_offset_earliest
|
|
|
elif offset_reset in {"largest", "latest", "end"}:
|
|
|
return self.resolve_partition_offset_latest
|
|
|
+ return None
|
|
|
|
|
|
- def resolve_partition_offset_earliest(self, partition):
|
|
|
+ def resolve_partition_offset_earliest(self, partition: TopicPartition) -> TopicPartition:
|
|
|
low, high = self.consumer.get_watermark_offsets(partition)
|
|
|
return TopicPartition(partition.topic, partition.partition, low)
|
|
|
|
|
|
- def resolve_partition_offset_latest(self, partition):
|
|
|
+ def resolve_partition_offset_latest(self, partition: TopicPartition) -> TopicPartition:
|
|
|
low, high = self.consumer.get_watermark_offsets(partition)
|
|
|
return TopicPartition(partition.topic, partition.partition, high)
|
|
|
|
|
|
- def run(self):
|
|
|
+ def run(self) -> None:
|
|
|
logger.debug("Starting snuba query subscriber")
|
|
|
self.offsets.clear()
|
|
|
|
|
|
- def on_assign(consumer, partitions):
|
|
|
- updated_partitions = []
|
|
|
+ def on_assign(consumer: Consumer, partitions: List[TopicPartition]) -> None:
|
|
|
+ updated_partitions: List[TopicPartition] = []
|
|
|
for partition in partitions:
|
|
|
if self.resolve_partition_force_offset:
|
|
|
partition = self.resolve_partition_force_offset(partition)
|
|
@@ -124,7 +133,7 @@ class QuerySubscriptionConsumer:
|
|
|
},
|
|
|
)
|
|
|
|
|
|
- def on_revoke(consumer, partitions):
|
|
|
+ def on_revoke(consumer: Consumer, partitions: List[TopicPartition]) -> None:
|
|
|
partition_numbers = [partition.partition for partition in partitions]
|
|
|
self.commit_offsets(partition_numbers)
|
|
|
for partition_number in partition_numbers:
|
|
@@ -177,7 +186,7 @@ class QuerySubscriptionConsumer:
|
|
|
|
|
|
self.shutdown()
|
|
|
|
|
|
- def commit_offsets(self, partitions=None):
|
|
|
+ def commit_offsets(self, partitions: Optional[Iterable[int]] = None) -> None:
|
|
|
logger.info(
|
|
|
"query-subscription-consumer.commit_offsets",
|
|
|
extra={"offsets": str(self.offsets), "partitions": str(partitions)},
|
|
@@ -196,12 +205,12 @@ class QuerySubscriptionConsumer:
|
|
|
|
|
|
self.consumer.commit(offsets=to_commit)
|
|
|
|
|
|
- def shutdown(self):
|
|
|
+ def shutdown(self) -> None:
|
|
|
logger.debug("Committing offsets and closing consumer")
|
|
|
self.commit_offsets()
|
|
|
self.consumer.close()
|
|
|
|
|
|
- def handle_message(self, message):
|
|
|
+ def handle_message(self, message: Message) -> None:
|
|
|
"""
|
|
|
Parses the value from Kafka, and if valid passes the payload to the callback defined by the
|
|
|
subscription. If the subscription has been removed, or no longer has a valid callback then
|
|
@@ -229,7 +238,7 @@ class QuerySubscriptionConsumer:
|
|
|
|
|
|
try:
|
|
|
with metrics.timer("snuba_query_subscriber.fetch_subscription"):
|
|
|
- subscription = QuerySubscription.objects.get_from_cache(
|
|
|
+ subscription: QuerySubscription = QuerySubscription.objects.get_from_cache(
|
|
|
subscription_id=contents["subscription_id"]
|
|
|
)
|
|
|
if subscription.status != QuerySubscription.Status.ACTIVE.value:
|
|
@@ -284,7 +293,7 @@ class QuerySubscriptionConsumer:
|
|
|
|
|
|
callback(contents, subscription)
|
|
|
|
|
|
- def parse_message_value(self, value):
|
|
|
+ def parse_message_value(self, value: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Parses the value received via the Kafka consumer and verifies that it
|
|
|
matches the expected schema.
|
|
@@ -292,7 +301,7 @@ class QuerySubscriptionConsumer:
|
|
|
:return: A dict with the parsed message
|
|
|
"""
|
|
|
with metrics.timer("snuba_query_subscriber.parse_message_value.json_parse"):
|
|
|
- wrapper = json.loads(value)
|
|
|
+ wrapper: Dict[str, Any] = json.loads(value)
|
|
|
|
|
|
with metrics.timer("snuba_query_subscriber.parse_message_value.json_validate_wrapper"):
|
|
|
try:
|
|
@@ -301,12 +310,12 @@ class QuerySubscriptionConsumer:
|
|
|
metrics.incr("snuba_query_subscriber.message_wrapper_invalid")
|
|
|
raise InvalidSchemaError("Message wrapper does not match schema")
|
|
|
|
|
|
- schema_version = wrapper["version"]
|
|
|
+ schema_version: int = wrapper["version"]
|
|
|
if schema_version not in SUBSCRIPTION_PAYLOAD_VERSIONS:
|
|
|
metrics.incr("snuba_query_subscriber.message_wrapper_invalid_version")
|
|
|
raise InvalidMessageError("Version specified in wrapper has no schema")
|
|
|
|
|
|
- payload = wrapper["payload"]
|
|
|
+ payload: Dict[str, Any] = wrapper["payload"]
|
|
|
with metrics.timer("snuba_query_subscriber.parse_message_value.json_validate_payload"):
|
|
|
try:
|
|
|
jsonschema.validate(payload, SUBSCRIPTION_PAYLOAD_VERSIONS[schema_version])
|