Browse Source

feat(generic-metrics): Implement namespace killswitch (#53369)

### Overview

Base on work done in https://github.com/getsentry/relay/pull/2281.

- Implements a kill switch during batching and before parsing message
body. Parses the header to look for `namespace` and skips the offset if
the namespace is in the sentry option
`sentry-metrics.indexer.disabled-namespaces`
- Emit namespaces in header for kafka backend

Note: Handles messages with no `namespace` in headers by allowing it to
be processed and emitting a metric. This should never happen, but I
would like to keep this behaviour in for at least a few days just in
case we are generating metrics without `namespace` in headers somewhere

Closes [SNS-2247](https://getsentry.atlassian.net/browse/SNS-2247)

[SNS-2247]:
https://getsentry.atlassian.net/browse/SNS-2247?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
John 1 year ago
parent
commit
5485769757

+ 17 - 0
src/sentry/sentry_metrics/consumers/indexer/batch.py

@@ -8,10 +8,12 @@ from typing import (
     List,
     List,
     Mapping,
     Mapping,
     MutableMapping,
     MutableMapping,
+    MutableSequence,
     NamedTuple,
     NamedTuple,
     Optional,
     Optional,
     Sequence,
     Sequence,
     Set,
     Set,
+    Tuple,
     Union,
     Union,
     cast,
     cast,
 )
 )
@@ -26,6 +28,7 @@ from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric
 from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric
 from sentry_kafka_schemas.schema_types.snuba_generic_metrics_v1 import GenericMetric
 from sentry_kafka_schemas.schema_types.snuba_metrics_v1 import Metric
 from sentry_kafka_schemas.schema_types.snuba_metrics_v1 import Metric
 
 
+from sentry import options
 from sentry.sentry_metrics.consumers.indexer.common import IndexerOutputMessageBatch, MessageBatch
 from sentry.sentry_metrics.consumers.indexer.common import IndexerOutputMessageBatch, MessageBatch
 from sentry.sentry_metrics.consumers.indexer.parsed_message import ParsedMessage
 from sentry.sentry_metrics.consumers.indexer.parsed_message import ParsedMessage
 from sentry.sentry_metrics.consumers.indexer.routing_producer import RoutingPayload
 from sentry.sentry_metrics.consumers.indexer.routing_producer import RoutingPayload
@@ -43,6 +46,7 @@ ACCEPTED_METRIC_TYPES = {"s", "c", "d"}  # set, counter, distribution
 MRI_RE_PATTERN = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$")
 MRI_RE_PATTERN = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$")
 
 
 OrgId = int
 OrgId = int
+Headers = MutableSequence[Tuple[str, bytes]]
 
 
 
 
 class PartitionIdxOffset(NamedTuple):
 class PartitionIdxOffset(NamedTuple):
@@ -106,6 +110,13 @@ class IndexerBatch:
 
 
         self._extract_messages()
         self._extract_messages()
 
 
+    def _extract_namespace(self, headers: Headers) -> Optional[str]:
+        for string, endcoded in headers:
+            if string == "namespace":
+                return endcoded.decode("utf-8")
+        metrics.incr("sentry-metrics.indexer.killswitch.no-namespace-in-header")
+        return None
+
     @metrics.wraps("process_messages.extract_messages")
     @metrics.wraps("process_messages.extract_messages")
     def _extract_messages(self) -> None:
     def _extract_messages(self) -> None:
         self.skipped_offsets: Set[PartitionIdxOffset] = set()
         self.skipped_offsets: Set[PartitionIdxOffset] = set()
@@ -115,6 +126,12 @@ class IndexerBatch:
             assert isinstance(msg.value, BrokerValue)
             assert isinstance(msg.value, BrokerValue)
             partition_offset = PartitionIdxOffset(msg.value.partition.index, msg.value.offset)
             partition_offset = PartitionIdxOffset(msg.value.partition.index, msg.value.offset)
 
 
+            if namespace := self._extract_namespace(msg.payload.headers) in options.get(
+                "sentry-metrics.indexer.disabled-namespaces"
+            ):
+                self.skipped_offsets.add(partition_offset)
+                metrics.incr("process_messages.namespace_disabled", tags={"namespace": namespace})
+                continue
             try:
             try:
                 parsed_payload: ParsedMessage = json.loads(
                 parsed_payload: ParsedMessage = json.loads(
                     msg.payload.value.decode("utf-8"), use_rapid_json=True
                     msg.payload.value.decode("utf-8"), use_rapid_json=True

+ 11 - 5
src/sentry/sentry_metrics/kafka.py

@@ -89,7 +89,7 @@ class KafkaMetricsBackend(GenericMetricsBackend):
             "type": "c",
             "type": "c",
         }
         }
 
 
-        self.__produce(counter_metric)
+        self.__produce(counter_metric, use_case_id)
 
 
     def set(
     def set(
         self,
         self,
@@ -120,7 +120,7 @@ class KafkaMetricsBackend(GenericMetricsBackend):
             "type": "s",
             "type": "s",
         }
         }
 
 
-        self.__produce(set_metric)
+        self.__produce(set_metric, use_case_id)
 
 
     def distribution(
     def distribution(
         self,
         self,
@@ -150,11 +150,17 @@ class KafkaMetricsBackend(GenericMetricsBackend):
             "type": "d",
             "type": "d",
         }
         }
 
 
-        self.__produce(dist_metric)
+        self.__produce(dist_metric, use_case_id)
 
 
-    def __produce(self, metric: Mapping[str, Any]):
+    def __produce(self, metric: Mapping[str, Any], use_case_id: UseCaseID):
         ingest_codec.validate(metric)
         ingest_codec.validate(metric)
-        payload = KafkaPayload(None, json.dumps(metric).encode("utf-8"), [])
+        payload = KafkaPayload(
+            None,
+            json.dumps(metric).encode("utf-8"),
+            [
+                ("namespace", use_case_id.value.encode()),
+            ],
+        )
         self.producer.produce(self.kafka_topic, payload)
         self.producer.produce(self.kafka_topic, payload)
 
 
     def close(self):
     def close(self):

+ 207 - 48
tests/sentry/sentry_metrics/test_batch.py

@@ -13,6 +13,7 @@ from arroyo.types import BrokerValue, Message, Partition, Topic, Value
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch, PartitionIdxOffset
 from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch, PartitionIdxOffset
 from sentry.sentry_metrics.indexer.base import FetchType, FetchTypeExt, Metadata
 from sentry.sentry_metrics.indexer.base import FetchType, FetchTypeExt, Metadata
 from sentry.snuba.metrics.naming_layer.mri import SessionMRI, TransactionMRI
 from sentry.snuba.metrics.naming_layer.mri import SessionMRI, TransactionMRI
+from sentry.testutils.helpers.options import override_options
 from sentry.utils import json
 from sentry.utils import json
 
 
 
 
@@ -39,6 +40,7 @@ counter_payload = {
     "retention_days": 90,
     "retention_days": 90,
     "project_id": 3,
     "project_id": 3,
 }
 }
+counter_headers = [("namespace", b"sessions")]
 
 
 distribution_payload = {
 distribution_payload = {
     "name": SessionMRI.RAW_DURATION.value,
     "name": SessionMRI.RAW_DURATION.value,
@@ -53,6 +55,7 @@ distribution_payload = {
     "retention_days": 90,
     "retention_days": 90,
     "project_id": 3,
     "project_id": 3,
 }
 }
+distribution_headers = [("namespace", b"sessions")]
 
 
 set_payload = {
 set_payload = {
     "name": SessionMRI.ERROR.value,
     "name": SessionMRI.ERROR.value,
@@ -67,6 +70,7 @@ set_payload = {
     "retention_days": 90,
     "retention_days": 90,
     "project_id": 3,
     "project_id": 3,
 }
 }
+set_headers = [("namespace", b"sessions")]
 
 
 extracted_string_output = {
 extracted_string_output = {
     MockUseCaseID.SESSIONS: {
     MockUseCaseID.SESSIONS: {
@@ -228,9 +232,9 @@ def test_extract_strings_with_rollout(should_index_tag_values, expected):
     """
     """
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
     batch = IndexerBatch(
     batch = IndexerBatch(
@@ -293,9 +297,9 @@ def test_extract_strings_with_multiple_use_case_ids():
 
 
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, [("namespace", b"use_case_1")]),
+            (distribution_payload, [("namespace", b"use_case_2")]),
+            (set_payload, [("namespace", b"use_case_2")]),
         ]
         ]
     )
     )
     batch = IndexerBatch(
     batch = IndexerBatch(
@@ -331,6 +335,152 @@ def test_extract_strings_with_multiple_use_case_ids():
     }
     }
 
 
 
 
+@override_options({"sentry-metrics.indexer.disabled-namespaces": ["use_case_2"]})
+@patch("sentry.sentry_metrics.consumers.indexer.batch.UseCaseID", MockUseCaseID)
+def test_extract_strings_with_single_use_case_ids_blocked():
+    """
+    Verify that the extract string method will work normally when a single use case ID is blocked
+    """
+    counter_payload = {
+        "name": "c:use_case_1/session@none",
+        "tags": {
+            "environment": "production",
+            "session.status": "init",
+        },
+        "timestamp": ts,
+        "type": "c",
+        "value": 1,
+        "org_id": 1,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+
+    distribution_payload = {
+        "name": "d:use_case_2/duration@second",
+        "tags": {
+            "environment": "production",
+            "session.status": "healthy",
+        },
+        "timestamp": ts,
+        "type": "d",
+        "value": [4, 5, 6],
+        "org_id": 1,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+
+    set_payload = {
+        "name": "s:use_case_2/error@none",
+        "tags": {
+            "environment": "production",
+            "session.status": "errored",
+        },
+        "timestamp": ts,
+        "type": "s",
+        "value": [3],
+        "org_id": 1,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+
+    outer_message = _construct_outer_message(
+        [
+            (counter_payload, [("namespace", b"use_case_1")]),
+            (distribution_payload, [("namespace", b"use_case_2")]),
+            (set_payload, [("namespace", b"use_case_2")]),
+        ]
+    )
+    batch = IndexerBatch(
+        outer_message,
+        True,
+        False,
+        input_codec=_INGEST_CODEC,
+    )
+    assert batch.extract_strings() == {
+        MockUseCaseID.USE_CASE_1: {
+            1: {
+                "c:use_case_1/session@none",
+                "environment",
+                "production",
+                "session.status",
+                "init",
+            }
+        }
+    }
+
+
+@override_options({"sentry-metrics.indexer.disabled-namespaces": ["use_case_1", "use_case_2"]})
+@patch("sentry.sentry_metrics.consumers.indexer.batch.UseCaseID", MockUseCaseID)
+def test_extract_strings_with_multiple_use_case_ids_blocked():
+    """
+    Verify that the extract string method will work normally when multiple use case IDs are blocked
+    """
+    custom_uc_counter_payload = {
+        "name": "c:use_case_1/session@none",
+        "tags": {
+            "environment": "production",
+            "session.status": "init",
+        },
+        "timestamp": ts,
+        "type": "c",
+        "value": 1,
+        "org_id": 1,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+    perf_distribution_payload = {
+        "name": TransactionMRI.MEASUREMENTS_FCP.value,
+        "tags": {
+            "environment": "production",
+            "session.status": "healthy",
+        },
+        "timestamp": ts,
+        "type": "d",
+        "value": [4, 5, 6],
+        "org_id": 1,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+    custom_uc_set_payload = {
+        "name": "s:use_case_2/error@none",
+        "tags": {
+            "environment": "production",
+            "session.status": "errored",
+        },
+        "timestamp": ts,
+        "type": "s",
+        "value": [3],
+        "org_id": 2,
+        "retention_days": 90,
+        "project_id": 3,
+    }
+
+    outer_message = _construct_outer_message(
+        [
+            (custom_uc_counter_payload, [("namespace", b"use_case_1")]),
+            (perf_distribution_payload, [("namespace", b"transactions")]),
+            (custom_uc_set_payload, [("namespace", b"use_case_2")]),
+        ]
+    )
+    batch = IndexerBatch(
+        outer_message,
+        True,
+        False,
+        input_codec=_INGEST_CODEC,
+    )
+    assert batch.extract_strings() == {
+        MockUseCaseID.TRANSACTIONS: {
+            1: {
+                TransactionMRI.MEASUREMENTS_FCP.value,
+                "environment",
+                "production",
+                "session.status",
+                "healthy",
+            }
+        },
+    }
+
+
 @patch("sentry.sentry_metrics.consumers.indexer.batch.UseCaseID", MockUseCaseID)
 @patch("sentry.sentry_metrics.consumers.indexer.batch.UseCaseID", MockUseCaseID)
 def test_extract_strings_with_invalid_mri():
 def test_extract_strings_with_invalid_mri():
     """
     """
@@ -393,10 +543,10 @@ def test_extract_strings_with_invalid_mri():
 
 
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (bad_counter_payload, []),
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (bad_counter_payload, [("namespace", b"")]),
+            (counter_payload, [("namespace", b"use_case_1")]),
+            (distribution_payload, [("namespace", b"use_case_2")]),
+            (set_payload, [("namespace", b"use_case_2")]),
         ]
         ]
     )
     )
     batch = IndexerBatch(
     batch = IndexerBatch(
@@ -438,6 +588,7 @@ def test_extract_strings_with_multiple_use_case_ids_and_org_ids():
     Verify that the extract string method can handle payloads that has multiple
     Verify that the extract string method can handle payloads that has multiple
     (generic) uses cases and from different orgs
     (generic) uses cases and from different orgs
     """
     """
+
     custom_uc_counter_payload = {
     custom_uc_counter_payload = {
         "name": "c:use_case_1/session@none",
         "name": "c:use_case_1/session@none",
         "tags": {
         "tags": {
@@ -480,9 +631,9 @@ def test_extract_strings_with_multiple_use_case_ids_and_org_ids():
 
 
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (custom_uc_counter_payload, []),
-            (perf_distribution_payload, []),
-            (custom_uc_set_payload, []),
+            (custom_uc_counter_payload, [("namespace", b"use_case_1")]),
+            (perf_distribution_payload, [("namespace", b"transactions")]),
+            (custom_uc_set_payload, [("namespace", b"use_case_1")]),
         ]
         ]
     )
     )
     batch = IndexerBatch(
     batch = IndexerBatch(
@@ -525,9 +676,9 @@ def test_all_resolved(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -614,7 +765,7 @@ def test_all_resolved(caplog, settings):
                 "value": 1.0,
                 "value": 1.0,
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"ch"), ("metric_type", "c")],
+            [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
         ),
         ),
         (
         (
             {
             {
@@ -638,7 +789,7 @@ def test_all_resolved(caplog, settings):
                 "value": [4, 5, 6],
                 "value": [4, 5, 6],
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"ch"), ("metric_type", "d")],
+            [*distribution_headers, ("mapping_sources", b"ch"), ("metric_type", "d")],
         ),
         ),
         (
         (
             {
             {
@@ -662,7 +813,7 @@ def test_all_resolved(caplog, settings):
                 "value": [3],
                 "value": [3],
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"cd"), ("metric_type", "s")],
+            [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
         ),
         ),
     ]
     ]
 
 
@@ -672,9 +823,9 @@ def test_all_resolved_with_routing_information(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -761,7 +912,7 @@ def test_all_resolved_with_routing_information(caplog, settings):
                 "value": 1.0,
                 "value": 1.0,
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"ch"), ("metric_type", "c")],
+            [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
         ),
         ),
         (
         (
             {"org_id": 1},
             {"org_id": 1},
@@ -787,6 +938,7 @@ def test_all_resolved_with_routing_information(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *distribution_headers,
                 ("mapping_sources", b"ch"),
                 ("mapping_sources", b"ch"),
                 ("metric_type", "d"),
                 ("metric_type", "d"),
             ],
             ],
@@ -814,7 +966,7 @@ def test_all_resolved_with_routing_information(caplog, settings):
                 "value": [3],
                 "value": [3],
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"cd"), ("metric_type", "s")],
+            [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
         ),
         ),
     ]
     ]
 
 
@@ -832,9 +984,9 @@ def test_all_resolved_retention_days_honored(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload_modified, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload_modified, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -920,7 +1072,7 @@ def test_all_resolved_retention_days_honored(caplog, settings):
                 "value": 1.0,
                 "value": 1.0,
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"ch"), ("metric_type", "c")],
+            [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
         ),
         ),
         (
         (
             {
             {
@@ -944,7 +1096,7 @@ def test_all_resolved_retention_days_honored(caplog, settings):
                 "value": [4, 5, 6],
                 "value": [4, 5, 6],
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"ch"), ("metric_type", "d")],
+            [*distribution_headers, ("mapping_sources", b"ch"), ("metric_type", "d")],
         ),
         ),
         (
         (
             {
             {
@@ -968,7 +1120,7 @@ def test_all_resolved_retention_days_honored(caplog, settings):
                 "value": [3],
                 "value": [3],
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"cd"), ("metric_type", "s")],
+            [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
         ),
         ),
     ]
     ]
 
 
@@ -987,9 +1139,9 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -1062,7 +1214,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
                 "value": 1.0,
                 "value": 1.0,
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
-            [("mapping_sources", b"c"), ("metric_type", "c")],
+            [*counter_headers, ("mapping_sources", b"c"), ("metric_type", "c")],
         ),
         ),
         (
         (
             {
             {
@@ -1086,6 +1238,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *distribution_headers,
                 ("mapping_sources", b"c"),
                 ("mapping_sources", b"c"),
                 ("metric_type", "d"),
                 ("metric_type", "d"),
             ],
             ],
@@ -1112,6 +1265,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *set_headers,
                 ("mapping_sources", b"c"),
                 ("mapping_sources", b"c"),
                 ("metric_type", "s"),
                 ("metric_type", "s"),
             ],
             ],
@@ -1124,9 +1278,9 @@ def test_metric_id_rate_limited(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -1208,6 +1362,7 @@ def test_metric_id_rate_limited(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *set_headers,
                 ("mapping_sources", b"cd"),
                 ("mapping_sources", b"cd"),
                 ("metric_type", "s"),
                 ("metric_type", "s"),
             ],
             ],
@@ -1231,9 +1386,9 @@ def test_tag_key_rate_limited(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -1316,9 +1471,9 @@ def test_tag_value_rate_limited(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -1409,6 +1564,7 @@ def test_tag_value_rate_limited(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *counter_headers,
                 ("mapping_sources", b"ch"),
                 ("mapping_sources", b"ch"),
                 ("metric_type", "c"),
                 ("metric_type", "c"),
             ],
             ],
@@ -1436,6 +1592,7 @@ def test_tag_value_rate_limited(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *distribution_headers,
                 ("mapping_sources", b"ch"),
                 ("mapping_sources", b"ch"),
                 ("metric_type", "d"),
                 ("metric_type", "d"),
             ],
             ],
@@ -1448,8 +1605,8 @@ def test_one_org_limited(caplog, settings):
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            ({**distribution_payload, "org_id": 2}, []),
+            (counter_payload, counter_headers),
+            ({**distribution_payload, "org_id": 2}, distribution_headers),
         ]
         ]
     )
     )
 
 
@@ -1555,6 +1712,7 @@ def test_one_org_limited(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *distribution_headers,
                 ("mapping_sources", b"ch"),
                 ("mapping_sources", b"ch"),
                 ("metric_type", "d"),
                 ("metric_type", "d"),
             ],
             ],
@@ -1576,9 +1734,9 @@ def test_cardinality_limiter(caplog, settings):
 
 
     outer_message = _construct_outer_message(
     outer_message = _construct_outer_message(
         [
         [
-            (counter_payload, []),
-            (distribution_payload, []),
-            (set_payload, []),
+            (counter_payload, counter_headers),
+            (distribution_payload, distribution_headers),
+            (set_payload, set_headers),
         ]
         ]
     )
     )
 
 
@@ -1659,6 +1817,7 @@ def test_cardinality_limiter(caplog, settings):
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
                 "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
             },
             },
             [
             [
+                *set_headers,
                 ("mapping_sources", b"c"),
                 ("mapping_sources", b"c"),
                 ("metric_type", "s"),
                 ("metric_type", "s"),
             ],
             ],