|
@@ -1,7 +1,6 @@
|
|
|
import logging
|
|
|
-import re
|
|
|
from random import random
|
|
|
-from typing import Any, Callable, Dict, Mapping
|
|
|
+from typing import Callable, Dict, Mapping, cast
|
|
|
|
|
|
import jsonschema
|
|
|
import pytz
|
|
@@ -17,9 +16,16 @@ from arroyo.processing.strategies import (
|
|
|
ProcessingStrategyFactory,
|
|
|
RunTask,
|
|
|
)
|
|
|
+from arroyo.processing.strategies.decoder.base import ValidationError
|
|
|
+from arroyo.processing.strategies.decoder.json import JsonCodec
|
|
|
from arroyo.types import BrokerValue, Commit, Message, Partition
|
|
|
from dateutil.parser import parse as parse_date
|
|
|
from django.conf import settings
|
|
|
+from sentry_kafka_schemas import get_schema
|
|
|
+from sentry_kafka_schemas.schema_types.events_subscription_results_v1 import (
|
|
|
+ PayloadV3,
|
|
|
+ SubscriptionResult,
|
|
|
+)
|
|
|
|
|
|
from sentry import options
|
|
|
from sentry.snuba.dataset import Dataset, EntityKey
|
|
@@ -31,7 +37,7 @@ from sentry.utils.arroyo import MetricsWrapper
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
-TQuerySubscriptionCallable = Callable[[Dict[str, Any], QuerySubscription], None]
|
|
|
+TQuerySubscriptionCallable = Callable[[PayloadV3, QuerySubscription], None]
|
|
|
|
|
|
subscriber_registry: Dict[str, TQuerySubscriptionCallable] = {}
|
|
|
|
|
@@ -45,6 +51,14 @@ topic_to_dataset: Dict[str, Dataset] = {
|
|
|
settings.KAFKA_METRICS_SUBSCRIPTIONS_RESULTS: Dataset.Metrics,
|
|
|
}
|
|
|
|
|
|
+dataset_to_logical_topic = {
|
|
|
+ Dataset.Events: "events-subscription-results",
|
|
|
+ Dataset.Transactions: "transactions-subscription-results",
|
|
|
+ Dataset.PerformanceMetrics: "generic-metrics-subscription-results",
|
|
|
+ Dataset.Sessions: "sessions-subscription-results",
|
|
|
+ Dataset.Metrics: "metrics-subscription-results",
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
def register_subscriber(
|
|
|
subscriber_key: str,
|
|
@@ -58,51 +72,60 @@ def register_subscriber(
|
|
|
return inner
|
|
|
|
|
|
|
|
|
-def parse_message_value(value: str) -> Dict[str, Any]:
|
|
|
+def parse_message_value(value: bytes, jsoncodec: JsonCodec) -> PayloadV3:
|
|
|
"""
|
|
|
Parses the value received via the Kafka consumer and verifies that it
|
|
|
matches the expected schema.
|
|
|
:param value: A json formatted string
|
|
|
:return: A dict with the parsed message
|
|
|
"""
|
|
|
- with metrics.timer("snuba_query_subscriber.parse_message_value.json_parse"):
|
|
|
- wrapper: Dict[str, Any] = json.loads(value)
|
|
|
+ old_version = False
|
|
|
|
|
|
with metrics.timer("snuba_query_subscriber.parse_message_value.json_validate_wrapper"):
|
|
|
try:
|
|
|
- jsonschema.validate(wrapper, SUBSCRIPTION_WRAPPER_SCHEMA)
|
|
|
- except jsonschema.ValidationError:
|
|
|
- metrics.incr("snuba_query_subscriber.message_wrapper_invalid")
|
|
|
- raise InvalidSchemaError("Message wrapper does not match schema")
|
|
|
-
|
|
|
- schema_version: int = wrapper["version"]
|
|
|
- if schema_version not in SUBSCRIPTION_PAYLOAD_VERSIONS:
|
|
|
- metrics.incr("snuba_query_subscriber.message_wrapper_invalid_version")
|
|
|
- raise InvalidMessageError("Version specified in wrapper has no schema")
|
|
|
-
|
|
|
- payload: Dict[str, Any] = wrapper["payload"]
|
|
|
- with metrics.timer("snuba_query_subscriber.parse_message_value.json_validate_payload"):
|
|
|
- try:
|
|
|
- jsonschema.validate(payload, SUBSCRIPTION_PAYLOAD_VERSIONS[schema_version])
|
|
|
- except jsonschema.ValidationError:
|
|
|
- metrics.incr("snuba_query_subscriber.message_payload_invalid")
|
|
|
- raise InvalidSchemaError("Message payload does not match schema")
|
|
|
+ wrapper = cast(SubscriptionResult, jsoncodec.decode(value, validate=True))
|
|
|
+ except ValidationError:
|
|
|
+ old_version = True
|
|
|
+ metrics.incr("snuba_query_subscriber.message_wrapper.old_validation")
|
|
|
+ try:
|
|
|
+ wrapper = json.loads(value)
|
|
|
+ jsonschema.validate(wrapper, SUBSCRIPTION_WRAPPER_SCHEMA)
|
|
|
+ except jsonschema.ValidationError:
|
|
|
+ metrics.incr("snuba_query_subscriber.message_wrapper_invalid")
|
|
|
+ raise InvalidSchemaError("Message wrapper does not match schema")
|
|
|
+
|
|
|
+ payload: PayloadV3 = wrapper["payload"]
|
|
|
+
|
|
|
+ if old_version:
|
|
|
+ schema_version: int = wrapper["version"]
|
|
|
+ if schema_version not in SUBSCRIPTION_PAYLOAD_VERSIONS:
|
|
|
+ metrics.incr("snuba_query_subscriber.message_wrapper_invalid_version")
|
|
|
+ raise InvalidMessageError("Version specified in wrapper has no schema")
|
|
|
+
|
|
|
+ with metrics.timer("snuba_query_subscriber.parse_message_value.json_validate_payload"):
|
|
|
+ try:
|
|
|
+ jsonschema.validate(payload, SUBSCRIPTION_PAYLOAD_VERSIONS[schema_version])
|
|
|
+ except jsonschema.ValidationError:
|
|
|
+ metrics.incr("snuba_query_subscriber.message_payload_invalid")
|
|
|
+ raise InvalidSchemaError("Message payload does not match schema")
|
|
|
+
|
|
|
# XXX: Since we just return the raw dict here, when the payload changes it'll
|
|
|
# break things. This should convert the payload into a class rather than passing
|
|
|
# the dict around, but until we get time to refactor we can keep things working
|
|
|
# here.
|
|
|
- payload.setdefault("values", payload.get("result"))
|
|
|
+ payload.setdefault("values", payload.get("result")) # type: ignore
|
|
|
|
|
|
- payload["timestamp"] = parse_date(payload["timestamp"]).replace(tzinfo=pytz.utc)
|
|
|
+ payload["timestamp"] = parse_date(payload["timestamp"]).replace(tzinfo=pytz.utc) # type: ignore
|
|
|
return payload
|
|
|
|
|
|
|
|
|
def handle_message(
|
|
|
- message_value: str,
|
|
|
+ message_value: bytes,
|
|
|
message_offset: int,
|
|
|
message_partition: int,
|
|
|
topic: str,
|
|
|
dataset: str,
|
|
|
+ jsoncodec: JsonCodec,
|
|
|
) -> None:
|
|
|
"""
|
|
|
Parses the value from Kafka, and if valid passes the payload to the callback defined by the
|
|
@@ -116,7 +139,7 @@ def handle_message(
|
|
|
with metrics.timer(
|
|
|
"snuba_query_subscriber.parse_message_value", tags={"dataset": dataset}
|
|
|
):
|
|
|
- contents = parse_message_value(message_value)
|
|
|
+ contents = parse_message_value(message_value, jsoncodec)
|
|
|
except InvalidMessageError:
|
|
|
# If the message is in an invalid format, just log the error
|
|
|
# and continue
|
|
@@ -154,23 +177,11 @@ def handle_message(
|
|
|
},
|
|
|
)
|
|
|
try:
|
|
|
- if "entity" in contents:
|
|
|
- entity_key = contents["entity"]
|
|
|
- else:
|
|
|
- # XXX(ahmed): Remove this logic. This was kept here as backwards compat
|
|
|
- # for subscription updates with schema version `2`. However schema version 3
|
|
|
- # sends the "entity" in the payload
|
|
|
- metrics.incr("query_subscription_consumer.message_value.v2")
|
|
|
- entity_regex = r"^(MATCH|match)[ ]*\(([^)]+)\)"
|
|
|
- entity_match = re.match(entity_regex, contents["request"]["query"])
|
|
|
- if not entity_match:
|
|
|
- raise InvalidMessageError("Unable to fetch entity from query in message")
|
|
|
- entity_key = entity_match.group(2)
|
|
|
if topic in topic_to_dataset:
|
|
|
_delete_from_snuba(
|
|
|
topic_to_dataset[topic],
|
|
|
contents["subscription_id"],
|
|
|
- EntityKey(entity_key),
|
|
|
+ EntityKey(contents["entity"]),
|
|
|
)
|
|
|
else:
|
|
|
logger.error(
|
|
@@ -232,6 +243,8 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
def __init__(self, topic: str):
|
|
|
self.topic = topic
|
|
|
self.dataset = topic_to_dataset[self.topic]
|
|
|
+ self.logical_topic = dataset_to_logical_topic[self.dataset]
|
|
|
+ self.jsoncodec = JsonCodec(get_schema(self.logical_topic)["schema"])
|
|
|
|
|
|
def create_with_partitions(
|
|
|
self,
|
|
@@ -258,6 +271,7 @@ class QuerySubscriptionStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
|
|
|
partition,
|
|
|
self.topic,
|
|
|
self.dataset.value,
|
|
|
+ self.jsoncodec,
|
|
|
)
|
|
|
except Exception:
|
|
|
# This is a failsafe to make sure that no individual message will block this
|