Просмотр исходного кода

ref: Add input schema validation and e2e types to metrics indexer (#46360)

Markus Unterwaditzer 1 год назад
Родитель
Сommit
d6bb0e355c

+ 2 - 1
requirements-base.txt

@@ -55,7 +55,8 @@ requests>=2.25.1
 rfc3339-validator>=0.1.2
 rfc3986-validator>=0.1.1
 # [end] jsonschema format validators
-sentry-arroyo>=2.7.1
+sentry-arroyo[json]>=2.7.1
+sentry-kafka-schemas>=0.0.12
 sentry-relay>=0.8.19
 sentry-sdk>=1.17.0
 snuba-sdk>=1.0.5

+ 2 - 0
requirements-dev-frozen.txt

@@ -48,6 +48,7 @@ drf-spectacular==0.22.1
 email-reply-parser==0.5.12
 exceptiongroup==1.0.0rc9
 execnet==1.9.0
+fastjsonschema==2.16.2
 fido2==0.9.2
 filelock==3.7.0
 flake8==6.0.0
@@ -163,6 +164,7 @@ s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.7.1
 sentry-cli==2.16.0
+sentry-kafka-schemas==0.0.12
 sentry-relay==0.8.19
 sentry-sdk==1.17.0
 simplejson==3.17.6

+ 2 - 0
requirements-frozen.txt

@@ -38,6 +38,7 @@ django-pg-zero-downtime-migrations==0.11
 djangorestframework==3.12.4
 drf-spectacular==0.22.1
 email-reply-parser==0.5.12
+fastjsonschema==2.16.2
 fido2==0.9.2
 frozenlist==1.3.3
 google-api-core==2.10.1
@@ -115,6 +116,7 @@ rsa==4.8
 s3transfer==0.5.2
 selenium==4.3.0
 sentry-arroyo==2.7.1
+sentry-kafka-schemas==0.0.12
 sentry-relay==0.8.19
 sentry-sdk==1.17.0
 simplejson==3.17.6

+ 5 - 0
src/sentry/options/defaults.py

@@ -602,6 +602,11 @@ register("sentry-metrics.cardinality-limiter.limits.releasehealth.per-org", defa
 register("sentry-metrics.cardinality-limiter.orgs-rollout-rate", default=0.0)
 register("sentry-metrics.cardinality-limiter-rh.orgs-rollout-rate", default=0.0)
 
+register("sentry-metrics.producer-schema-validation.release-health.rollout-rate", default=0.0)
+register("sentry-metrics.consumer-schema-validation.release-health.rollout-rate", default=0.0)
+register("sentry-metrics.producer-schema-validation.performance.rollout-rate", default=0.0)
+register("sentry-metrics.consumer-schema-validation.performance.rollout-rate", default=0.0)
+
 # Flag to determine whether abnormal_mechanism tag should be extracted
 register("sentry-metrics.releasehealth.abnormal-mechanism-extraction-rate", default=0.0)
 

+ 8 - 0
src/sentry/sentry_metrics/configuration.py

@@ -43,6 +43,9 @@ class MetricsIngestConfiguration:
     writes_limiter_namespace: str
     cardinality_limiter_cluster_options: Mapping[str, Any]
     cardinality_limiter_namespace: str
+
+    input_schema_validation_option_name: Optional[str] = None
+
     index_tag_values_option_name: Optional[str] = None
     is_output_sliced: Optional[bool] = False
 
@@ -74,6 +77,7 @@ def get_ingest_config(
                 writes_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
                 cardinality_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_CARDINALITY_LIMITER_OPTIONS,
                 cardinality_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
+                input_schema_validation_option_name="sentry-metrics.consumer-schema-validation.release-health.rollout-rate",
             )
         )
 
@@ -91,6 +95,7 @@ def get_ingest_config(
                 cardinality_limiter_namespace=PERFORMANCE_PG_NAMESPACE,
                 index_tag_values_option_name="sentry-metrics.performance.index-tag-values",
                 is_output_sliced=settings.SENTRY_METRICS_INDEXER_ENABLE_SLICED_PRODUCER,
+                input_schema_validation_option_name="sentry-metrics.consumer-schema-validation.performance.rollout-rate",
             )
         )
 
@@ -107,6 +112,7 @@ def get_ingest_config(
                 writes_limiter_namespace=RELEASE_HEALTH_CS_NAMESPACE,
                 cardinality_limiter_cluster_options={},
                 cardinality_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
+                input_schema_validation_option_name="sentry-metrics.consumer-schema-validation.release-health.rollout-rate",
             )
         )
 
@@ -124,6 +130,7 @@ def get_ingest_config(
                 cardinality_limiter_cluster_options=settings.SENTRY_METRICS_INDEXER_CARDINALITY_LIMITER_OPTIONS_PERFORMANCE,
                 cardinality_limiter_namespace=PERFORMANCE_PG_NAMESPACE,
                 is_output_sliced=settings.SENTRY_METRICS_INDEXER_ENABLE_SLICED_PRODUCER,
+                input_schema_validation_option_name="sentry-metrics.consumer-schema-validation.performance.rollout-rate",
             )
         )
 
@@ -140,6 +147,7 @@ def get_ingest_config(
                 writes_limiter_namespace="test-namespace",
                 cardinality_limiter_cluster_options={},
                 cardinality_limiter_namespace=RELEASE_HEALTH_PG_NAMESPACE,
+                input_schema_validation_option_name="sentry-metrics.consumer-schema-validation.release-health.rollout-rate",
             )
         )
 

+ 56 - 33
src/sentry/sentry_metrics/consumers/indexer/batch.py

@@ -3,6 +3,7 @@ import random
 from collections import defaultdict
 from typing import (
     Any,
+    Dict,
     List,
     Mapping,
     MutableMapping,
@@ -10,15 +11,19 @@ from typing import (
     Optional,
     Sequence,
     Set,
-    TypedDict,
+    Union,
     cast,
 )
 
 import rapidjson
 import sentry_sdk
 from arroyo.backends.kafka import KafkaPayload
+from arroyo.processing.strategies.decoder.json import JsonCodec
 from arroyo.types import BrokerValue, Message
 from django.conf import settings
+from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric
+from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric
+from sentry_kafka_schemas.schema_types.snuba_metrics_v1 import Metric
 
 from sentry.sentry_metrics.configuration import UseCaseKey
 from sentry.sentry_metrics.consumers.indexer.common import IndexerOutputMessageBatch, MessageBatch
@@ -65,14 +70,6 @@ def invalid_metric_tags(tags: Mapping[str, str]) -> Sequence[str]:
     return invalid_strs
 
 
-class InboundMessage(TypedDict):
-    # Note: This is only the subset of fields we access in this file.
-    org_id: int
-    name: str
-    type: str
-    tags: Mapping[str, str]
-
-
 class IndexerBatch:
     def __init__(
         self,
@@ -80,24 +77,28 @@ class IndexerBatch:
         outer_message: Message[MessageBatch],
         should_index_tag_values: bool,
         is_output_sliced: bool,
+        arroyo_input_codec: Optional[JsonCodec],
     ) -> None:
         self.use_case_id = use_case_id
         self.outer_message = outer_message
         self.__should_index_tag_values = should_index_tag_values
         self.is_output_sliced = is_output_sliced
+        self.__input_codec = arroyo_input_codec
 
         self._extract_messages()
 
     @metrics.wraps("process_messages.extract_messages")
     def _extract_messages(self) -> None:
         self.skipped_offsets: Set[PartitionIdxOffset] = set()
-        self.parsed_payloads_by_offset: MutableMapping[PartitionIdxOffset, InboundMessage] = {}
+        self.parsed_payloads_by_offset: MutableMapping[PartitionIdxOffset, IngestMetric] = {}
 
         for msg in self.outer_message.payload:
             assert isinstance(msg.value, BrokerValue)
             partition_offset = PartitionIdxOffset(msg.value.partition.index, msg.value.offset)
             try:
                 parsed_payload = json.loads(msg.payload.value.decode("utf-8"), use_rapid_json=True)
+                if self.__input_codec:
+                    self.__input_codec.validate(parsed_payload)
                 self.parsed_payloads_by_offset[partition_offset] = parsed_payload
             except rapidjson.JSONDecodeError:
                 self.skipped_offsets.add(partition_offset)
@@ -214,7 +215,7 @@ class IndexerBatch:
 
         for message in self.outer_message.payload:
             used_tags: Set[str] = set()
-            output_message_meta: Mapping[str, MutableMapping[str, str]] = defaultdict(dict)
+            output_message_meta: Dict[str, Dict[str, str]] = defaultdict(dict)
             assert isinstance(message.value, BrokerValue)
             partition_offset = PartitionIdxOffset(
                 message.value.partition.index, message.value.offset
@@ -228,18 +229,15 @@ class IndexerBatch:
                     },
                 )
                 continue
-            new_payload_value = cast(
-                MutableMapping[Any, Any],
-                self.parsed_payloads_by_offset.pop(partition_offset),
-            )
+            old_payload_value = self.parsed_payloads_by_offset.pop(partition_offset)
 
-            metric_name = new_payload_value["name"]
-            org_id = new_payload_value["org_id"]
+            metric_name = old_payload_value["name"]
+            org_id = old_payload_value["org_id"]
             sentry_sdk.set_tag("sentry_metrics.organization_id", org_id)
-            tags = new_payload_value.get("tags", {})
+            tags = old_payload_value.get("tags", {})
             used_tags.add(metric_name)
 
-            new_tags: MutableMapping[str, int] = {}
+            new_tags: Dict[str, Union[str, int]] = {}
             exceeded_global_quotas = 0
             exceeded_org_quotas = 0
 
@@ -259,7 +257,7 @@ class IndexerBatch:
                             exceeded_org_quotas += 1
                         continue
 
-                    value_to_write = v
+                    value_to_write: Union[int, str] = v
                     if self.__should_index_tag_values:
                         new_v = mapping[org_id][v]
                         if new_v is None:
@@ -313,13 +311,7 @@ class IndexerBatch:
                 "".join(sorted(t.value for t in fetch_types_encountered)), "utf-8"
             )
 
-            # When sending tag values as strings, set the version on the payload
-            # to 2. This is used by the consumer to determine how to decode the
-            # tag values.
-            if not self.__should_index_tag_values:
-                new_payload_value["version"] = 2
-            new_payload_value["tags"] = new_tags
-            new_payload_value["metric_id"] = numeric_metric_id = mapping[org_id][metric_name]
+            numeric_metric_id = mapping[org_id][metric_name]
             if numeric_metric_id is None:
                 metadata = bulk_record_meta[org_id].get(metric_name)
                 metrics.incr(
@@ -344,12 +336,42 @@ class IndexerBatch:
                     )
                 continue
 
-            new_payload_value["retention_days"] = new_payload_value.get("retention_days", 90)
+            new_payload_value: Mapping[str, Any]
 
-            new_payload_value["mapping_meta"] = output_message_meta
-            new_payload_value["use_case_id"] = self.use_case_id.value
-
-            del new_payload_value["name"]
+            if self.__should_index_tag_values:
+                new_payload_v1: Metric = {
+                    "tags": new_tags,
+                    # XXX: relay actually sends this value unconditionally
+                    "retention_days": old_payload_value.get("retention_days", 90),
+                    "mapping_meta": output_message_meta,
+                    "use_case_id": self.use_case_id.value,
+                    "metric_id": numeric_metric_id,
+                    "org_id": old_payload_value["org_id"],
+                    "timestamp": old_payload_value["timestamp"],
+                    "project_id": old_payload_value["project_id"],
+                    "type": old_payload_value["type"],
+                    "value": old_payload_value["value"],
+                }
+
+                new_payload_value = new_payload_v1
+            else:
+                # When sending tag values as strings, set the version on the payload
+                # to 2. This is used by the consumer to determine how to decode the
+                # tag values.
+                new_payload_v2: GenericMetric = {
+                    "tags": cast(Dict[str, str], new_tags),
+                    "version": 2,
+                    "retention_days": old_payload_value.get("retention_days", 90),
+                    "mapping_meta": output_message_meta,
+                    "use_case_id": self.use_case_id.value,
+                    "metric_id": numeric_metric_id,
+                    "org_id": old_payload_value["org_id"],
+                    "timestamp": old_payload_value["timestamp"],
+                    "project_id": old_payload_value["project_id"],
+                    "type": old_payload_value["type"],
+                    "value": old_payload_value["value"],
+                }
+                new_payload_value = new_payload_v2
 
             kafka_payload = KafkaPayload(
                 key=message.payload.key,
@@ -357,7 +379,8 @@ class IndexerBatch:
                 headers=[
                     *message.payload.headers,
                     ("mapping_sources", mapping_header_content),
-                    ("metric_type", new_payload_value["type"]),
+                    # XXX: type mismatch, but seems to work fine in prod
+                    ("metric_type", new_payload_value["type"]),  # type: ignore
                 ],
             )
             if self.is_output_sliced:

+ 16 - 1
src/sentry/sentry_metrics/consumers/indexer/processing.py

@@ -2,7 +2,9 @@ import logging
 import random
 from typing import Callable, Mapping
 
+import sentry_kafka_schemas
 import sentry_sdk
+from arroyo.processing.strategies.decoder.json import JsonCodec
 from arroyo.types import Message
 from django.conf import settings
 
@@ -25,6 +27,8 @@ STORAGE_TO_INDEXER: Mapping[IndexerStorage, Callable[[], StringIndexer]] = {
     IndexerStorage.MOCK: MockIndexer,
 }
 
+_INGEST_SCHEMA = JsonCodec(sentry_kafka_schemas.get_schema("ingest-metrics")["schema"])
+
 
 class MessageProcessor:
     def __init__(self, config: MetricsIngestConfiguration):
@@ -81,8 +85,19 @@ class MessageProcessor:
         )
         is_output_sliced = self._config.is_output_sliced or False
 
+        arroyo_input_codec_should_sample = (
+            self._config.input_schema_validation_option_name
+            and 0.0
+            < options.get(self._config.input_schema_validation_option_name)
+            < random.random()
+        )
+
         batch = IndexerBatch(
-            self._config.use_case_id, outer_message, should_index_tag_values, is_output_sliced
+            self._config.use_case_id,
+            outer_message,
+            should_index_tag_values=should_index_tag_values,
+            is_output_sliced=is_output_sliced,
+            arroyo_input_codec=_INGEST_SCHEMA if arroyo_input_codec_should_sample else None,
         )
 
         sdk.set_measurement("indexer_batch.payloads.len", len(batch.parsed_payloads_by_offset))

+ 2 - 2
src/sentry/sentry_metrics/indexer/limiters/cardinality.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 import dataclasses
 from collections import defaultdict
-from typing import Mapping, MutableMapping, Optional, Sequence, TypedDict
+from typing import Dict, Mapping, MutableMapping, Optional, Sequence, TypedDict
 
 from sentry import options
 from sentry.ratelimits.cardinality import (
@@ -66,7 +66,7 @@ class InboundMessage(TypedDict):
     # Note: This is only the subset of fields we access in this file.
     org_id: int
     name: str
-    tags: Mapping[str, str]
+    tags: Dict[str, str]
 
 
 class TimeseriesCardinalityLimiter:

+ 79 - 24
tests/sentry/sentry_metrics/test_batch.py

@@ -3,7 +3,9 @@ from collections.abc import MutableMapping
 from datetime import datetime, timezone
 
 import pytest
+import sentry_kafka_schemas
 from arroyo.backends.kafka import KafkaPayload
+from arroyo.processing.strategies.decoder.json import JsonCodec
 from arroyo.types import BrokerValue, Message, Partition, Topic, Value
 
 from sentry.sentry_metrics.configuration import UseCaseKey
@@ -23,8 +25,9 @@ counter_payload = {
     },
     "timestamp": ts,
     "type": "c",
-    "value": 1.0,
+    "value": 1,
     "org_id": 1,
+    "retention_days": 90,
     "project_id": 3,
 }
 
@@ -37,8 +40,8 @@ distribution_payload = {
     "timestamp": ts,
     "type": "d",
     "value": [4, 5, 6],
-    "unit": "seconds",
     "org_id": 1,
+    "retention_days": 90,
     "project_id": 3,
 }
 
@@ -52,6 +55,7 @@ set_payload = {
     "type": "s",
     "value": [3],
     "org_id": 1,
+    "retention_days": 90,
     "project_id": 3,
 }
 
@@ -69,6 +73,8 @@ extracted_string_output = {
     }
 }
 
+_INGEST_SCHEMA = JsonCodec(sentry_kafka_schemas.get_schema("ingest-metrics")["schema"])
+
 
 def _construct_messages(payloads):
     message_batch = []
@@ -96,7 +102,7 @@ def _construct_outer_message(payloads):
     return outer_message
 
 
-def _deconstruct_messages(snuba_messages):
+def _deconstruct_messages(snuba_messages, kafka_logical_topic="snuba-metrics"):
     """
     Convert a list of messages returned by `reconstruct_messages` into python
     primitives, to run assertions on:
@@ -109,10 +115,17 @@ def _deconstruct_messages(snuba_messages):
 
     ...because pytest's assertion diffs work better with python primitives.
     """
-    return [
-        (json.loads(msg.payload.value.decode("utf-8")), msg.payload.headers)
-        for msg in snuba_messages
-    ]
+
+    rv = []
+
+    codec = JsonCodec(sentry_kafka_schemas.get_schema(kafka_logical_topic)["schema"])
+
+    for msg in snuba_messages:
+        decoded = json.loads(msg.payload.value.decode("utf-8"))
+        codec.validate(decoded)
+        rv.append((decoded, msg.payload.headers))
+
+    return rv
 
 
 def _deconstruct_routing_messages(snuba_messages):
@@ -203,7 +216,13 @@ def test_extract_strings_with_rollout(should_index_tag_values, expected):
             (set_payload, []),
         ]
     )
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, should_index_tag_values, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        should_index_tag_values,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
 
     assert batch.extract_strings() == expected
 
@@ -218,7 +237,13 @@ def test_all_resolved(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        True,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -308,7 +333,6 @@ def test_all_resolved(caplog, settings):
                 "tags": {"3": 7, "9": 5},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -350,7 +374,13 @@ def test_all_resolved_with_routing_information(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, True)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        True,
+        True,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -442,7 +472,6 @@ def test_all_resolved_with_routing_information(caplog, settings):
                 "tags": {"3": 7, "9": 5},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -493,7 +522,13 @@ def test_all_resolved_retention_days_honored(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        True,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -583,7 +618,6 @@ def test_all_resolved_retention_days_honored(caplog, settings):
                 "tags": {"3": 7, "9": 5},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -634,7 +668,13 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, False, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        False,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -670,7 +710,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
     )
 
     assert _get_string_indexer_log_records(caplog) == []
-    assert _deconstruct_messages(snuba_payloads) == [
+    assert _deconstruct_messages(snuba_payloads, kafka_logical_topic="snuba-generic-metrics") == [
         (
             {
                 "version": 2,
@@ -710,7 +750,6 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
                 "tags": {"3": "production", "5": "healthy"},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -751,7 +790,9 @@ def test_metric_id_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -847,7 +888,9 @@ def test_tag_key_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -925,7 +968,9 @@ def test_tag_value_rate_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -1024,7 +1069,6 @@ def test_tag_value_rate_limited(caplog, settings):
                 "tags": {"3": 7, "9": 5},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -1042,7 +1086,13 @@ def test_one_org_limited(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        True,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     assert batch.extract_strings() == (
         {
             1: {
@@ -1128,7 +1178,6 @@ def test_one_org_limited(caplog, settings):
                 "tags": {"2": 4, "5": 3},
                 "timestamp": ts,
                 "type": "d",
-                "unit": "seconds",
                 "use_case_id": "performance",
                 "value": [4, 5, 6],
             },
@@ -1156,7 +1205,13 @@ def test_cardinality_limiter(caplog, settings):
         ]
     )
 
-    batch = IndexerBatch(UseCaseKey.PERFORMANCE, outer_message, True, False)
+    batch = IndexerBatch(
+        UseCaseKey.PERFORMANCE,
+        outer_message,
+        True,
+        False,
+        arroyo_input_codec=_INGEST_SCHEMA,
+    )
     keys_to_remove = list(batch.parsed_payloads_by_offset)[:2]
     # the messages come in a certain order, and Python dictionaries preserve
     # their insertion order. So we can hardcode offsets here.

+ 3 - 1
tests/sentry/sentry_metrics/test_multiprocess_steps.py

@@ -277,11 +277,13 @@ def __translated_payload(
     payload["tags"] = new_tags
     payload["use_case_id"] = "release-health"
 
+    payload.pop("unit", None)
     del payload["name"]
     return payload
 
 
-def test_process_messages() -> None:
+def test_process_messages(set_sentry_option) -> None:
+    set_sentry_option("sentry-metrics.consumer-schema-validation.release-health.rollout-rate", 0.0)
     message_payloads = [counter_payload, distribution_payload, set_payload]
     message_batch = [
         Message(