123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609 |
- from __future__ import annotations
- import logging
- import pickle
- import re
- import time
- from collections.abc import Callable, MutableMapping, Sequence
- from copy import deepcopy
- from datetime import datetime, timezone
- from typing import Any
- from unittest.mock import Mock, call
- import pytest
- from arroyo.backends.kafka import KafkaPayload
- from arroyo.dlq import InvalidMessage
- from arroyo.processing.strategies import MessageRejected
- from arroyo.types import BrokerValue, Message, Partition, Topic, Value
- from sentry.sentry_metrics.aggregation_option_registry import get_aggregation_options
- from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
- from sentry.sentry_metrics.consumers.indexer.batch import valid_metric_name
- from sentry.sentry_metrics.consumers.indexer.common import (
- BatchMessages,
- IndexerOutputMessageBatch,
- MetricsBatchBuilder,
- )
- from sentry.sentry_metrics.consumers.indexer.processing import MessageProcessor
- from sentry.sentry_metrics.indexer.mock import MockIndexer, RawSimpleIndexer
- from sentry.sentry_metrics.use_case_id_registry import UseCaseID
- from sentry.testutils.helpers.options import override_options
- from sentry.utils import json
- logger = logging.getLogger(__name__)
- pytestmark = pytest.mark.sentry_metrics
- MESSAGE_PROCESSOR = MessageProcessor(
- get_ingest_config(UseCaseKey.PERFORMANCE, IndexerStorage.POSTGRES)
- )
- BROKER_TIMESTAMP = datetime.now(tz=timezone.utc)
- @pytest.fixture(autouse=True)
- def update_sentry_settings(settings: Any) -> None:
- settings.SENTRY_METRICS_INDEXER_RAISE_VALIDATION_ERRORS = True
- def compare_messages_ignoring_mapping_metadata(actual: Message, expected: Message) -> None:
- assert actual.committable == expected.committable
- actual_payload = actual.payload
- expected_payload = expected.payload
- if isinstance(actual_payload, InvalidMessage):
- assert actual_payload == expected_payload
- return
- assert actual_payload.key == expected_payload.key
- actual_headers_without_mapping_sources = [
- (k, v.encode()) for k, v in actual_payload.headers if k != "mapping_sources"
- ]
- assert actual_headers_without_mapping_sources == expected_payload.headers
- actual_deserialized = json.loads(actual_payload.value)
- expected_deserialized = json.loads(expected_payload.value)
- del actual_deserialized["mapping_meta"]
- assert actual_deserialized == expected_deserialized
- def compare_message_batches_ignoring_metadata(
- actual: IndexerOutputMessageBatch, expected: Sequence[Message]
- ) -> None:
- assert len(actual.data) == len(expected)
- for a, e in zip(actual.data, expected):
- compare_messages_ignoring_mapping_metadata(a, e)
- def _batch_message_set_up(
- next_step: Mock, max_batch_time: float = 100.0, max_batch_size: int = 2
- ) -> tuple[Any, Any, Any]:
- # batch time is in seconds
- batch_messages_step = BatchMessages(
- next_step=next_step, max_batch_time=max_batch_time, max_batch_size=max_batch_size
- )
- message1 = Message(
- BrokerValue(
- KafkaPayload(None, b"some value", []),
- Partition(Topic("topic"), 0),
- 1,
- BROKER_TIMESTAMP,
- )
- )
- message2 = Message(
- BrokerValue(
- KafkaPayload(None, b"another value", []),
- Partition(Topic("topic"), 0),
- 2,
- BROKER_TIMESTAMP,
- )
- )
- return (batch_messages_step, message1, message2)
- def test_batch_messages() -> None:
- next_step = Mock()
- batch_messages_step, message1, message2 = _batch_message_set_up(next_step)
- # submit the first message, batch builder should should be created
- # and the messaged added to the batch
- batch_messages_step.submit(message=message1)
- assert len(batch_messages_step._BatchMessages__batch) == 1
- # neither batch_size or batch_time as been met so poll shouldn't
- # do anything yet (aka shouldn't flush and call next_step.submit)
- batch_messages_step.poll()
- assert len(batch_messages_step._BatchMessages__batch) == 1
- assert not next_step.submit.called
- # submit the second message, message should be added to the batch
- # which will now saturate the batch_size (2). This will trigger
- # __flush which in turn calls next.submit and reset the batch to None
- batch_messages_step.submit(message=message2)
- assert next_step.submit.call_args == call(
- Message(Value([message1, message2], message2.committable)),
- )
- assert batch_messages_step._BatchMessages__batch is None
- def test_batch_messages_rejected_message() -> None:
- next_step = Mock()
- next_step.submit.side_effect = MessageRejected()
- batch_messages_step, message1, message2 = _batch_message_set_up(next_step)
- batch_messages_step.poll()
- batch_messages_step.submit(message=message1)
- # if we try to submit a batch when the next step is
- # not ready to accept more messages we'll get a
- # MessageRejected error. This will be reraised for
- # to the stream processor on the subsequent call to submit
- batch_messages_step.submit(message=message2)
- with pytest.raises(MessageRejected):
- batch_messages_step.submit(message=message2)
- # when poll is called, we still try to flush the batch
- # caust its full but we handled the MessageRejected error
- batch_messages_step.poll()
- assert next_step.submit.called
- def test_batch_messages_join() -> None:
- next_step = Mock()
- batch_messages_step, message1, _ = _batch_message_set_up(next_step)
- batch_messages_step.poll()
- batch_messages_step.submit(message=message1)
- # A rebalance, restart, scale up or any other event
- # that causes partitions to be revoked will call join
- batch_messages_step.join(timeout=3)
- # we don't flush the batch
- assert not next_step.submit.called
- def test_metrics_batch_builder() -> None:
- max_batch_time = 3.0 # seconds
- max_batch_size = 2
- # 1. Ready when max_batch_size is reached
- batch_builder_size = MetricsBatchBuilder(
- max_batch_size=max_batch_size, max_batch_time=max_batch_time
- )
- assert not batch_builder_size.ready()
- message1 = Message(
- BrokerValue(
- KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
- )
- )
- batch_builder_size.append(message1)
- assert not batch_builder_size.ready()
- message2 = Message(
- BrokerValue(
- KafkaPayload(None, b"another value", []),
- Partition(Topic("topic"), 0),
- 2,
- datetime.now(),
- )
- )
- batch_builder_size.append(message2)
- assert batch_builder_size.ready()
- # 2. Ready when max_batch_time is reached
- batch_builder_time = MetricsBatchBuilder(
- max_batch_size=max_batch_size, max_batch_time=max_batch_time
- )
- assert not batch_builder_time.ready()
- message1 = Message(
- BrokerValue(
- KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
- )
- )
- batch_builder_time.append(message1)
- assert not batch_builder_time.ready()
- time.sleep(3)
- assert batch_builder_time.ready()
- # 3. Adding the same message twice to the same batch
- batch_builder_time = MetricsBatchBuilder(
- max_batch_size=max_batch_size, max_batch_time=max_batch_time
- )
- message1 = Message(
- BrokerValue(
- KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
- )
- )
- batch_builder_time.append(message1)
- ts = int(datetime.now(tz=timezone.utc).timestamp())
- counter_payloads: list[dict[str, Any]] = [
- {
- "name": f"c:{use_case.value}/alert@none",
- "tags": {
- "environment": "production",
- "session.status": "init",
- },
- "timestamp": ts,
- "type": "c",
- "value": 1.0,
- "org_id": 1,
- "project_id": 3,
- "retention_days": 90,
- }
- for use_case in UseCaseID
- if use_case is not UseCaseID.SESSIONS
- ]
- distribution_payloads: list[dict[str, Any]] = [
- {
- "name": f"d:{use_case.value}/alert@none",
- "tags": {
- "environment": "production",
- "session.status": "healthy",
- },
- "timestamp": ts,
- "type": "d",
- "value": [4, 5, 6],
- "org_id": 1,
- "project_id": 3,
- "retention_days": 90,
- }
- for use_case in UseCaseID
- if use_case is not UseCaseID.SESSIONS
- ]
- set_payloads: list[dict[str, Any]] = [
- {
- "name": f"s:{use_case.value}/alert@none",
- "tags": {
- "environment": "production",
- "session.status": "errored",
- },
- "timestamp": ts,
- "type": "s",
- "value": [3],
- "org_id": 1,
- "project_id": 3,
- "retention_days": 90,
- }
- for use_case in UseCaseID
- if use_case is not UseCaseID.SESSIONS
- ]
- def __translated_payload(
- payload: dict[str, Any],
- indexer=None,
- ) -> dict[str, str | int | list[int] | MutableMapping[int, int]]:
- """
- Translates strings to ints using the MockIndexer
- in addition to adding the retention_days
- """
- indexer = indexer or MESSAGE_PROCESSOR._indexer
- MRI_RE_PATTERN = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$")
- payload = deepcopy(payload)
- org_id = payload["org_id"]
- matched_mri = MRI_RE_PATTERN.match(payload["name"])
- assert matched_mri is not None
- use_case_id = UseCaseID(matched_mri.group(2))
- new_tags = {
- indexer.resolve(use_case_id=use_case_id, org_id=org_id, string=k): v
- for k, v in payload["tags"].items()
- }
- agg_options = get_aggregation_options(payload["name"])
- if agg_options:
- # Keep this assert for now to indicate that we only have maximum 1 aggregation
- # option per metric bucket, regardless of the use case in the bucket
- assert len(agg_options) == 1
- agg_option = agg_options.popitem()[0]
- payload["aggregation_option"] = agg_option.value
- payload["metric_id"] = indexer.resolve(
- use_case_id=use_case_id, org_id=org_id, string=payload["name"]
- )
- payload["retention_days"] = 90
- payload["tags"] = new_tags
- payload["use_case_id"] = use_case_id.value
- payload["sentry_received_timestamp"] = BROKER_TIMESTAMP.timestamp()
- payload["version"] = 2
- payload.pop("unit", None)
- del payload["name"]
- return payload
- @pytest.mark.django_db
- def test_process_messages() -> None:
- message_payloads = counter_payloads + distribution_payloads + set_payloads
- message_batch = [
- Message(
- BrokerValue(
- KafkaPayload(None, json.dumps(payload).encode("utf-8"), []),
- Partition(Topic("topic"), 0),
- i + 1,
- BROKER_TIMESTAMP,
- )
- )
- for i, payload in enumerate(message_payloads)
- ]
- # the outer message uses the last message's partition, offset, and timestamp
- last = message_batch[-1]
- outer_message = Message(Value(message_batch, last.committable))
- # Add a bunch of special aggregation options
- # The expectation with this is that there should
- # always be only 1 aggregation per bucket/payload
- with override_options(
- {
- "sentry-metrics.10s-granularity": True,
- "sentry-metrics.drop-percentiles.per-use-case": {"spans", "transactions"},
- }
- ):
- new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
- expected_new_batch = []
- for i, m in enumerate(message_batch):
- assert isinstance(m.value, BrokerValue)
- expected_new_batch.append(
- Message(
- BrokerValue(
- KafkaPayload(
- None,
- json.dumps(__translated_payload(message_payloads[i])).encode("utf-8"),
- [
- ("metric_type", message_payloads[i]["type"].encode()),
- ],
- ),
- m.value.partition,
- m.value.offset,
- m.value.timestamp,
- )
- )
- )
- compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
- @pytest.mark.django_db
- def test_process_messages_default_card_rollout(set_sentry_option: Callable[..., Any]) -> None:
- message_payloads = counter_payloads + distribution_payloads + set_payloads
- message_batch = [
- Message(
- BrokerValue(
- KafkaPayload(None, json.dumps(payload).encode("utf-8"), []),
- Partition(Topic("topic"), 0),
- i + 1,
- BROKER_TIMESTAMP,
- )
- )
- for i, payload in enumerate(message_payloads)
- ]
- # the outer message uses the last message's partition, offset, and timestamp
- last = message_batch[-1]
- outer_message = Message(Value(message_batch, last.committable))
- new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
- assert len(new_batch.data) == len(message_batch)
- invalid_payloads = [
- (
- {
- "name": "c:transactions/alert@none",
- "tags": {
- "environment" * 21: "production",
- "session.status": "errored",
- },
- "timestamp": ts,
- "type": "s",
- "value": [3],
- "org_id": 1,
- "project_id": 3,
- "retention_days": 90,
- },
- "invalid_tags",
- True,
- ),
- (
- {
- "name": "c:transactions/alert@" + ("none" * 50),
- "tags": {
- "environment": "production",
- "session.status": "errored",
- },
- "timestamp": ts,
- "type": "s",
- "value": [3],
- "org_id": 1,
- "project_id": 3,
- "retention_days": 90,
- },
- "invalid_metric_name",
- True,
- ),
- (
- b"invalid_json_payload",
- "invalid_json",
- False,
- ),
- ]
- @pytest.mark.django_db
- @pytest.mark.parametrize("invalid_payload, error_text, format_payload", invalid_payloads)
- def test_process_messages_invalid_messages(
- invalid_payload: dict[str, Any], error_text: str, format_payload: bool, caplog: Any
- ) -> None:
- """
- Test the following kinds of invalid payloads:
- * tag key > 200 char
- * metric name > 200 char
- * invalid json
- Each outer_message that is passed into process_messages is a batch of messages. If
- there is an invalid payload for one of the messages, we just drop that message,
- not the entire batch.
- The `counter_payload` in these tests is always a valid payload, and the test arg
- `invalid_payload` has a payload that fits the scenarios outlined above.
- """
- formatted_payload = (
- json.dumps(invalid_payload).encode("utf-8") if format_payload else invalid_payload
- )
- message_batch = [
- Message(
- BrokerValue(
- KafkaPayload(None, json.dumps(counter_payloads[0]).encode("utf-8"), []),
- Partition(Topic("topic"), 0),
- 0,
- BROKER_TIMESTAMP,
- )
- ),
- Message(
- BrokerValue(
- KafkaPayload(None, formatted_payload, []), # type: ignore[arg-type]
- Partition(Topic("topic"), 0),
- 1,
- BROKER_TIMESTAMP,
- )
- ),
- ]
- # the outer message uses the last message's partition, offset, and timestamp
- last = message_batch[-1]
- outer_message = Message(Value(message_batch, last.committable))
- with caplog.at_level(logging.ERROR):
- new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
- # we expect just the valid counter_payload msg to be left
- expected_msg = message_batch[0]
- expected_new_batch = [
- Message(
- Value(
- KafkaPayload(
- None,
- json.dumps(__translated_payload(counter_payloads[0])).encode("utf-8"),
- [("metric_type", b"c")],
- ),
- expected_msg.committable,
- )
- ),
- Message(
- Value(
- InvalidMessage(Partition(Topic("topic"), 0), 1),
- message_batch[1].committable,
- )
- ),
- ]
- compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
- assert error_text in caplog.text
- @pytest.mark.django_db
- def test_process_messages_rate_limited(caplog: Any, settings: Any) -> None:
- """
- Test handling of `None`-values coming from the indexer service, which
- happens when postgres writes are being rate-limited.
- """
- settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
- rate_limited_payload = deepcopy(distribution_payloads[0])
- rate_limited_payload["tags"]["rate_limited_test"] = "true"
- message_batch = [
- Message(
- BrokerValue(
- KafkaPayload(None, json.dumps(counter_payloads[0]).encode("utf-8"), []),
- Partition(Topic("topic"), 0),
- 0,
- BROKER_TIMESTAMP,
- )
- ),
- Message(
- BrokerValue(
- KafkaPayload(None, json.dumps(rate_limited_payload).encode("utf-8"), []),
- Partition(Topic("topic"), 0),
- 1,
- BROKER_TIMESTAMP,
- )
- ),
- ]
- # the outer message uses the last message's partition, offset, and timestamp
- last = message_batch[-1]
- outer_message = Message(Value(message_batch, last.committable))
- message_processor = MessageProcessor(
- get_ingest_config(UseCaseKey.PERFORMANCE, IndexerStorage.MOCK)
- )
- # Insert a None-value into the mock-indexer to simulate a rate-limit.
- mock_indexer = message_processor._indexer
- assert isinstance(mock_indexer, MockIndexer)
- raw_simple_string_indexer = mock_indexer.indexer
- assert isinstance(raw_simple_string_indexer, RawSimpleIndexer)
- rgx = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$").match(distribution_payloads[0]["name"])
- assert rgx is not None
- raw_simple_string_indexer._strings[UseCaseID(rgx.group(2))][1]["rate_limited_test"] = None
- with caplog.at_level(logging.ERROR):
- new_batch = message_processor.process_messages(outer_message=outer_message)
- # we expect just the counter_payload msg to be left, as that one didn't
- # cause/depend on string writes that have been rate limited
- expected_msg = message_batch[0]
- assert isinstance(expected_msg.value, BrokerValue)
- expected_new_batch = [
- Message(
- BrokerValue(
- KafkaPayload(
- None,
- json.dumps(
- __translated_payload(counter_payloads[0], message_processor._indexer)
- ).encode("utf-8"),
- [("metric_type", b"c")],
- ),
- expected_msg.value.partition,
- expected_msg.value.offset,
- expected_msg.value.timestamp,
- )
- )
- ]
- compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
- assert "dropped_message" in caplog.text
- def test_valid_metric_name() -> None:
- assert valid_metric_name("") is True
- assert valid_metric_name("blah") is True
- assert valid_metric_name("invalid" * 200) is False
- def test_process_messages_is_pickleable():
- # needed so that the parallel transform step starts up properly
- pickle.dumps(MESSAGE_PROCESSOR.process_messages)
|