Browse Source

build: sentry-kafka-schemas 0.1.4 (#48392)

Lyn Nagara 1 year ago
parent
commit
923184cf95

+ 2 - 2
requirements-base.txt

@@ -57,8 +57,8 @@ requests>=2.25.1
 rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
-sentry-arroyo[json]>=2.9.1
-sentry-kafka-schemas>=0.0.31
+sentry-arroyo>=2.9.1
+sentry-kafka-schemas>=0.1.4
 sentry-redis-tools>=0.1.5
 sentry-relay>=0.8.21
 sentry-sdk>=1.21.1

+ 1 - 1
requirements-dev-frozen.txt

@@ -165,7 +165,7 @@ s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.9.1
 sentry-cli==2.16.0
-sentry-kafka-schemas==0.0.31
+sentry-kafka-schemas==0.1.4
 sentry-redis-tools==0.1.5
 sentry-relay==0.8.21
 sentry-sdk==1.21.1

+ 1 - 1
requirements-frozen.txt

@@ -116,7 +116,7 @@ rsa==4.8
 s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.9.1
-sentry-kafka-schemas==0.0.31
+sentry-kafka-schemas==0.1.4
 sentry-redis-tools==0.1.5
 sentry-relay==0.8.21
 sentry-sdk==1.21.1

+ 3 - 4
src/sentry/sentry_metrics/consumers/indexer/batch.py

@@ -19,10 +19,9 @@ from typing import (
 import rapidjson
 import sentry_sdk
 from arroyo.backends.kafka import KafkaPayload
-from arroyo.codecs import ValidationError
-from arroyo.codecs.json import JsonCodec
 from arroyo.types import BrokerValue, Message
 from django.conf import settings
+from sentry_kafka_schemas.codecs import Codec, ValidationError
 from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric
 from sentry_kafka_schemas.schema_types.snuba_metrics_v1 import Metric
 
@@ -93,12 +92,12 @@ class IndexerBatch:
         outer_message: Message[MessageBatch],
         should_index_tag_values: bool,
         is_output_sliced: bool,
-        arroyo_input_codec: Optional[JsonCodec[Any]],
+        input_codec: Optional[Codec[Any]],
     ) -> None:
         self.outer_message = outer_message
         self.__should_index_tag_values = should_index_tag_values
         self.is_output_sliced = is_output_sliced
-        self.__input_codec = arroyo_input_codec
+        self.__input_codec = input_codec
 
         self._extract_messages()
 

+ 3 - 4
src/sentry/sentry_metrics/consumers/indexer/processing.py

@@ -4,7 +4,6 @@ from typing import Any, Callable, Mapping
 
 import sentry_kafka_schemas
 import sentry_sdk
-from arroyo.codecs.json import JsonCodec
 from arroyo.types import Message
 from django.conf import settings
 
@@ -25,8 +24,8 @@ STORAGE_TO_INDEXER: Mapping[IndexerStorage, Callable[[], StringIndexer]] = {
     IndexerStorage.MOCK: MockIndexer,
 }
 
-_INGEST_SCHEMA: JsonCodec[Any] = JsonCodec(
-    schema=sentry_kafka_schemas.get_schema("ingest-metrics")["schema"]
+_INGEST_CODEC: sentry_kafka_schemas.codecs.Codec[Any] = sentry_kafka_schemas.get_codec(
+    "ingest-metrics"
 )
 
 
@@ -89,7 +88,7 @@ class MessageProcessor:
             outer_message,
             should_index_tag_values=should_index_tag_values,
             is_output_sliced=is_output_sliced,
-            arroyo_input_codec=_INGEST_SCHEMA,
+            input_codec=_INGEST_CODEC,
         )
 
         sdk.set_measurement("indexer_batch.payloads.len", len(batch.parsed_payloads_by_offset))

+ 3 - 6
src/sentry/snuba/query_subscriptions/consumer.py

@@ -3,9 +3,8 @@ from typing import Callable, Dict
 
 import pytz
 import sentry_sdk
-from arroyo.codecs import ValidationError
-from arroyo.codecs.json import JsonCodec
 from dateutil.parser import parse as parse_date
+from sentry_kafka_schemas.codecs import Codec, ValidationError
 from sentry_kafka_schemas.schema_types.events_subscription_results_v1 import (
     PayloadV3,
     SubscriptionResult,
@@ -36,9 +35,7 @@ def register_subscriber(
     return inner
 
 
-def parse_message_value(
-    value: bytes, jsoncodec: JsonCodec[SubscriptionResult]
-) -> SubscriptionUpdate:
+def parse_message_value(value: bytes, jsoncodec: Codec[SubscriptionResult]) -> SubscriptionUpdate:
     """
     Parses the value received via the Kafka consumer and verifies that it
     matches the expected schema.
@@ -71,7 +68,7 @@ def handle_message(
     message_partition: int,
     topic: str,
     dataset: str,
-    jsoncodec: JsonCodec[SubscriptionResult],
+    jsoncodec: Codec[SubscriptionResult],
 ) -> None:
     """
     Parses the value from Kafka, and if valid passes the payload to the callback defined by the

+ 2 - 3
src/sentry/snuba/query_subscriptions/run.py

@@ -7,7 +7,6 @@ import sentry_sdk
 from arroyo import Topic, configure_metrics
 from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
 from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload
-from arroyo.codecs.json import JsonCodec
 from arroyo.commit import ONCE_PER_SECOND
 from arroyo.processing.processor import StreamProcessor
 from arroyo.processing.strategies import (
@@ -18,7 +17,7 @@ from arroyo.processing.strategies import (
     RunTaskWithMultiprocessing,
 )
 from arroyo.types import BrokerValue, Commit, Message, Partition
-from sentry_kafka_schemas import get_schema
+from sentry_kafka_schemas import get_codec
 
 from sentry.snuba.dataset import Dataset
 from sentry.snuba.query_subscriptions.constants import dataset_to_logical_topic, topic_to_dataset
@@ -93,7 +92,7 @@ def process_message(
                 partition,
                 topic,
                 dataset.value,
-                JsonCodec(schema=get_schema(logical_topic)["schema"]),
+                get_codec(logical_topic),
             )
         except Exception:
             # This is a failsafe to make sure that no individual message will block this

+ 19 - 18
tests/sentry/sentry_metrics/test_batch.py

@@ -2,12 +2,12 @@ import logging
 from collections.abc import MutableMapping
 from datetime import datetime, timezone
 from enum import Enum
+from typing import Any
 from unittest.mock import patch
 
 import pytest
 import sentry_kafka_schemas
 from arroyo.backends.kafka import KafkaPayload
-from arroyo.codecs.json import JsonCodec
 from arroyo.types import BrokerValue, Message, Partition, Topic, Value
 
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch, PartitionIdxOffset
@@ -84,7 +84,9 @@ extracted_string_output = {
     }
 }
 
-_INGEST_SCHEMA = JsonCodec(schema=sentry_kafka_schemas.get_schema("ingest-metrics")["schema"])
+_INGEST_CODEC: sentry_kafka_schemas.codecs.Codec[Any] = sentry_kafka_schemas.get_codec(
+    "ingest-metrics"
+)
 
 
 def _construct_messages(payloads):
@@ -129,11 +131,10 @@ def _deconstruct_messages(snuba_messages, kafka_logical_topic="snuba-metrics"):
 
     rv = []
 
-    codec = JsonCodec(schema=sentry_kafka_schemas.get_schema(kafka_logical_topic)["schema"])
+    codec = sentry_kafka_schemas.get_codec(kafka_logical_topic)
 
     for msg in snuba_messages:
-        decoded = json.loads(msg.payload.value.decode("utf-8"))
-        codec.validate(decoded)
+        decoded = codec.decode(msg.payload.value, validate=True)
         rv.append((decoded, msg.payload.headers))
 
     return rv
@@ -236,7 +237,7 @@ def test_extract_strings_with_rollout(should_index_tag_values, expected):
         outer_message,
         should_index_tag_values,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
 
     assert batch.extract_strings() == expected
@@ -301,7 +302,7 @@ def test_extract_strings_with_multiple_use_case_ids():
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == {
         MockUseCaseID.USE_CASE_1: {
@@ -402,7 +403,7 @@ def test_extract_strings_with_invalid_mri():
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == {
         MockUseCaseID.USE_CASE_1: {
@@ -488,7 +489,7 @@ def test_extract_strings_with_multiple_use_case_ids_and_org_ids():
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == {
         MockUseCaseID.USE_CASE_1: {
@@ -534,7 +535,7 @@ def test_all_resolved(caplog, settings):
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == (
         {
@@ -677,7 +678,7 @@ def test_all_resolved_with_routing_information(caplog, settings):
         outer_message,
         True,
         True,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == (
         {
@@ -833,7 +834,7 @@ def test_all_resolved_retention_days_honored(caplog, settings):
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == (
         {
@@ -984,7 +985,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
         outer_message,
         False,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == (
         {
@@ -1113,7 +1114,7 @@ def test_metric_id_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA)
+    batch = IndexerBatch(outer_message, True, False, input_codec=_INGEST_CODEC)
     assert batch.extract_strings() == (
         {
             MockUseCaseID.SESSIONS: {
@@ -1216,7 +1217,7 @@ def test_tag_key_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA)
+    batch = IndexerBatch(outer_message, True, False, input_codec=_INGEST_CODEC)
     assert batch.extract_strings() == (
         {
             MockUseCaseID.SESSIONS: {
@@ -1297,7 +1298,7 @@ def test_tag_value_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA)
+    batch = IndexerBatch(outer_message, True, False, input_codec=_INGEST_CODEC)
     assert batch.extract_strings() == (
         {
             MockUseCaseID.SESSIONS: {
@@ -1428,7 +1429,7 @@ def test_one_org_limited(caplog, settings):
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     assert batch.extract_strings() == (
         {
@@ -1553,7 +1554,7 @@ def test_cardinality_limiter(caplog, settings):
         outer_message,
         True,
         False,
-        arroyo_input_codec=_INGEST_SCHEMA,
+        input_codec=_INGEST_CODEC,
     )
     keys_to_remove = list(batch.parsed_payloads_by_offset)[:2]
     # the messages come in a certain order, and Python dictionaries preserve

+ 2 - 3
tests/sentry/snuba/test_query_subscription_consumer.py

@@ -7,11 +7,10 @@ from unittest import mock
 import pytest
 import pytz
 from arroyo.backends.kafka import KafkaPayload
-from arroyo.codecs.json import JsonCodec
 from arroyo.types import BrokerValue, Message, Partition, Topic
 from dateutil.parser import parse as parse_date
 from django.conf import settings
-from sentry_kafka_schemas import get_schema
+from sentry_kafka_schemas import get_codec
 
 from sentry.runner.commands.run import DEFAULT_BLOCK_SIZE
 from sentry.snuba.dataset import Dataset
@@ -35,7 +34,7 @@ class BaseQuerySubscriptionTest:
 
     @cached_property
     def jsoncodec(self):
-        return JsonCodec(schema=get_schema(self.topic)["schema"])
+        return get_codec(self.topic)
 
     @cached_property
     def valid_wrapper(self):