Browse Source

feat(metrics): Emit invalid messages encountered during batching into DLQ (#58840)

Part of my efforts to add dlq to the sentry metrics consumers, see the
final change here:
https://github.com/getsentry/sentry/pull/57998 

Previous PR in the stack:
https://github.com/getsentry/sentry/pull/58677

### Overview
- Add an `invalid_msg_meta` field to the `IndexerOutputMessageBatch`
- Maintain a sorted deque of the offsets of invalid messages seen in
batching
- During the `poll` step of the unbatcher, emit a
InvalidMessageException so the invalid messages will end up in the DLQ
while the regular message are processed normally
- Update tests on batching with assertions that make sure the
`invalid_msg_meta` field to of the `IndexerOutputMessageBatch` is empty
John 1 year ago
parent
commit
a412429aac

+ 12 - 2
bin/send_metrics.py

@@ -149,7 +149,13 @@ def produce_msgs(messages, is_generic, host, dryrun):
     show_default=True,
     help="Specify which org id(s) to send",
 )
-def main(use_cases, rand_str, host, dryrun, org_id):
+@click.option(
+    "--num-bad-msg",
+    default=0,
+    show_default=True,
+    help="Number of additional badly formatted metric messages to send",
+)
+def main(use_cases, rand_str, host, dryrun, org_id, num_bad_msg):
     if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
         click.secho(
             "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
@@ -158,9 +164,10 @@ def main(use_cases, rand_str, host, dryrun, org_id):
         )
         exit(1)
 
+    rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
+
     is_generic = UseCaseID.SESSIONS.value not in use_cases
 
-    rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
     messages = list(
         itertools.chain.from_iterable(
             (
@@ -172,6 +179,9 @@ def main(use_cases, rand_str, host, dryrun, org_id):
             for org in org_id
         )
     )
+
+    messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
+
     random.shuffle(messages)
 
     produce_msgs(messages, is_generic, host, dryrun)

+ 9 - 13
src/sentry/sentry_metrics/consumers/indexer/batch.py

@@ -1,6 +1,6 @@
 import logging
 import random
-from collections import defaultdict
+from collections import defaultdict, deque
 from typing import (
     Any,
     Callable,
@@ -126,8 +126,13 @@ class IndexerBatch:
                 parsed_payload = self._extract_message(msg)
                 self._validate_message(parsed_payload)
                 self.parsed_payloads_by_meta[broker_meta] = parsed_payload
-            except Exception:
+            except Exception as e:
                 self.invalid_msg_meta.add(broker_meta)
+                logger.error(
+                    e,
+                    extra={"payload_value": str(msg.payload.value)},
+                    exc_info=True,
+                )
 
         for namespace, cnt in skipped_msgs_cnt.items():
             metrics.incr(
@@ -163,17 +168,7 @@ class IndexerBatch:
                 extra={"payload_value": str(msg.payload.value)},
                 exc_info=True,
             )
-        try:
-            parsed_payload["use_case_id"] = use_case_id = extract_use_case_id(
-                parsed_payload["name"]
-            )
-        except ValidationError:
-            logger.error(
-                "process_messages.invalid_metric_resource_identifier",
-                extra={"payload_value": str(msg.payload.value)},
-                exc_info=True,
-            )
-            raise
+        parsed_payload["use_case_id"] = use_case_id = extract_use_case_id(parsed_payload["name"])
 
         self.__message_count[use_case_id] += 1
         self.__message_size_max[use_case_id] = max(
@@ -501,5 +496,6 @@ class IndexerBatch:
             )
         return IndexerOutputMessageBatch(
             new_messages,
+            deque(sorted(self.invalid_msg_meta)),
             cogs_usage,
         )

+ 12 - 1
src/sentry/sentry_metrics/consumers/indexer/common.py

@@ -1,7 +1,17 @@
 import logging
 import time
 from dataclasses import dataclass
-from typing import Any, List, Mapping, MutableMapping, MutableSequence, NamedTuple, Optional, Union
+from typing import (
+    Any,
+    Deque,
+    List,
+    Mapping,
+    MutableMapping,
+    MutableSequence,
+    NamedTuple,
+    Optional,
+    Union,
+)
 
 from arroyo import Partition
 from arroyo.backends.kafka import KafkaPayload
@@ -32,6 +42,7 @@ DEFAULT_QUEUED_MIN_MESSAGES = 100000
 @dataclass(frozen=True)
 class IndexerOutputMessageBatch:
     data: MutableSequence[Message[Union[RoutingPayload, KafkaPayload]]]
+    invalid_msg_meta: Deque[BrokerMeta]
     cogs_data: Mapping[UseCaseID, int]
 
 

+ 10 - 3
src/sentry/sentry_metrics/consumers/indexer/multiprocess.py

@@ -1,12 +1,12 @@
 import logging
 import time
 from functools import partial
-from typing import Any, Mapping, MutableMapping, Optional
+from typing import Any, Mapping, MutableMapping, Optional, Union
 
 from arroyo.backends.abstract import Producer as AbstractProducer
 from arroyo.backends.kafka import KafkaPayload
 from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
-from arroyo.types import Commit, Message, Partition
+from arroyo.types import Commit, FilteredPayload, Message, Partition
 from confluent_kafka import Producer
 
 from sentry.utils import kafka_config, metrics
@@ -68,7 +68,14 @@ class SimpleProduceStep(ProcessingStep[KafkaPayload]):
             self.__commit_function(self.__produced_message_offsets)
             self.__produced_message_offsets = {}
 
-    def submit(self, message: Message[KafkaPayload]) -> None:
+    def submit(self, message: Message[Union[KafkaPayload, FilteredPayload]]) -> None:
+        if isinstance(message.payload, FilteredPayload):
+            # FilteredPayload will not be commited, this may cause the the indexer to consume
+            # and produce invalid message to the DLQ twice if the last messages it consume
+            # are invalid and is then shutdown. But it will never produce valid messages
+            # twice to snuba
+            # TODO: Use the arroyo producer which handles FilteredPayload elegantly
+            return
         self.__producer.produce(
             topic=self.__producer_topic,
             key=None,

+ 18 - 4
src/sentry/sentry_metrics/consumers/indexer/parallel.py

@@ -2,10 +2,12 @@ from __future__ import annotations
 
 import functools
 import logging
-from typing import Any, Mapping, Optional, Union, cast
+from collections import deque
+from typing import Any, Deque, Mapping, NamedTuple, Optional, Union, cast
 
 from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
 from arroyo.commit import ONCE_PER_SECOND
+from arroyo.dlq import InvalidMessage
 from arroyo.processing import StreamProcessor
 from arroyo.processing.strategies import ProcessingStrategy
 from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
@@ -39,19 +41,31 @@ logger = logging.getLogger(__name__)
 class Unbatcher(ProcessingStep[Union[FilteredPayload, IndexerOutputMessageBatch]]):
     def __init__(
         self,
-        next_step: ProcessingStep[Union[KafkaPayload, RoutingPayload]],
+        next_step: ProcessingStep[Union[FilteredPayload, KafkaPayload, RoutingPayload]],
     ) -> None:
         self.__next_step = next_step
         self.__closed = False
+        self._invalid_msg_meta: Deque[NamedTuple] = deque()
 
     def poll(self) -> None:
+        if self._invalid_msg_meta:
+            partition, offset = self._invalid_msg_meta.popleft()
+            raise InvalidMessage(partition, offset)
+
         self.__next_step.poll()
 
     def submit(self, message: Message[Union[FilteredPayload, IndexerOutputMessageBatch]]) -> None:
         assert not self.__closed
 
-        # FilteredPayloads are not handled in the indexer
-        for transformed_message in cast(IndexerOutputMessageBatch, message.payload).data:
+        if isinstance(message.payload, FilteredPayload):
+            self.__next_step.submit(cast(Message[KafkaPayload], message))
+            return
+
+        self._invalid_msg_meta.extend(message.payload.invalid_msg_meta)
+
+        _ = message.payload.cogs_data
+
+        for transformed_message in message.payload.data:
             self.__next_step.submit(transformed_message)
 
     def close(self) -> None:

+ 14 - 0
tests/sentry/sentry_metrics/test_batch.py

@@ -250,6 +250,7 @@ def test_extract_strings_with_rollout(should_index_tag_values, expected):
     )
 
     assert batch.extract_strings() == expected
+    assert not batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -413,6 +414,7 @@ def test_extract_strings_with_single_use_case_ids_blocked():
             }
         }
     }
+    assert not batch.invalid_msg_meta
 
 
 @override_options({"sentry-metrics.indexer.disabled-namespaces": ["spans", "escalating_issues"]})
@@ -485,6 +487,7 @@ def test_extract_strings_with_multiple_use_case_ids_blocked():
             }
         },
     }
+    assert not batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -587,6 +590,7 @@ def test_extract_strings_with_invalid_mri():
             }
         },
     }
+    assert batch.invalid_msg_meta == {BrokerMeta(Partition(Topic("topic"), 0), 0)}
 
 
 @pytest.mark.django_db
@@ -677,6 +681,7 @@ def test_extract_strings_with_multiple_use_case_ids_and_org_ids():
             }
         },
     }
+    assert not batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -729,6 +734,7 @@ def test_resolved_with_aggregation_options(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -869,6 +875,7 @@ def test_all_resolved(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1179,6 +1186,7 @@ def test_all_resolved_retention_days_honored(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1331,6 +1339,7 @@ def test_batch_resolve_with_values_not_indexed(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1475,6 +1484,7 @@ def test_metric_id_rate_limited(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1589,6 +1599,7 @@ def test_tag_key_rate_limited(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1680,6 +1691,7 @@ def test_tag_value_rate_limited(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1822,6 +1834,7 @@ def test_one_org_limited(caplog, settings):
             }
         }
     )
+    assert not batch.invalid_msg_meta
 
     caplog.set_level(logging.ERROR)
     snuba_payloads = batch.reconstruct_messages(
@@ -1954,6 +1967,7 @@ def test_cardinality_limiter(caplog, settings):
             },
         }
     }
+    assert not batch.invalid_msg_meta
 
     snuba_payloads = batch.reconstruct_messages(
         {

+ 10 - 1
tests/sentry/sentry_metrics/test_gen_metrics_multiprocess_steps.py

@@ -4,6 +4,7 @@ import logging
 import pickle
 import re
 import time
+from collections import deque
 from copy import deepcopy
 from datetime import datetime, timezone
 from typing import Any, Dict, List, MutableMapping, Sequence, Union
@@ -20,6 +21,7 @@ from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_
 from sentry.sentry_metrics.consumers.indexer.batch import valid_metric_name
 from sentry.sentry_metrics.consumers.indexer.common import (
     BatchMessages,
+    BrokerMeta,
     IndexerOutputMessageBatch,
     MetricsBatchBuilder,
 )
@@ -365,6 +367,7 @@ def test_process_messages() -> None:
         )
 
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
+    assert not new_batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -391,7 +394,9 @@ def test_process_messages_default_card_rollout(set_sentry_option) -> None:
         1.0,
     ):
         new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
-        assert len(new_batch.data) == len(message_batch)
+
+    assert len(new_batch.data) == len(message_batch)
+    assert not new_batch.invalid_msg_meta
 
 
 invalid_payloads = [
@@ -501,6 +506,7 @@ def test_process_messages_invalid_messages(
     ]
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
     assert error_text in caplog.text
+    assert new_batch.invalid_msg_meta == deque([BrokerMeta(Partition(Topic("topic"), 0), 1)])
 
 
 @pytest.mark.django_db
@@ -572,6 +578,7 @@ def test_process_messages_rate_limited(caplog, settings) -> None:
     ]
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
     assert "dropped_message" in caplog.text
+    assert not new_batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -624,6 +631,8 @@ def test_process_messages_cardinality_limited(
 
         compare_message_batches_ignoring_metadata(new_batch, [])
 
+    assert not new_batch.invalid_msg_meta
+
 
 def test_valid_metric_name() -> None:
     assert valid_metric_name("") is True

+ 6 - 0
tests/sentry/sentry_metrics/test_rh_metrics_multiprocess_steps.py

@@ -3,6 +3,7 @@ from __future__ import annotations
 import logging
 import pickle
 import time
+from collections import deque
 from copy import deepcopy
 from datetime import datetime, timezone
 from typing import Any, Dict, List, MutableMapping, Sequence, Union
@@ -18,6 +19,7 @@ from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_
 from sentry.sentry_metrics.consumers.indexer.batch import valid_metric_name
 from sentry.sentry_metrics.consumers.indexer.common import (
     BatchMessages,
+    BrokerMeta,
     IndexerOutputMessageBatch,
     MetricsBatchBuilder,
 )
@@ -340,6 +342,7 @@ def test_process_messages() -> None:
             )
         )
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
+    assert not new_batch.invalid_msg_meta
 
 
 invalid_payloads = [
@@ -449,6 +452,7 @@ def test_process_messages_invalid_messages(
     ]
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
     assert error_text in caplog.text
+    assert new_batch.invalid_msg_meta == deque([BrokerMeta(Partition(Topic("topic"), 0), 1)])
 
 
 @pytest.mark.django_db
@@ -516,6 +520,7 @@ def test_process_messages_rate_limited(caplog, settings) -> None:
     ]
     compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
     assert "dropped_message" in caplog.text
+    assert not new_batch.invalid_msg_meta
 
 
 @pytest.mark.django_db
@@ -567,6 +572,7 @@ def test_process_messages_cardinality_limited(
             new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
 
         compare_message_batches_ignoring_metadata(new_batch, [])
+        assert not new_batch.invalid_msg_meta
 
 
 def test_valid_metric_name() -> None: