test_gen_metrics_multiprocess_steps.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. from __future__ import annotations
  2. import logging
  3. import pickle
  4. import re
  5. import time
  6. from collections.abc import Callable, MutableMapping, Sequence
  7. from copy import deepcopy
  8. from datetime import datetime, timezone
  9. from typing import Any
  10. from unittest.mock import Mock, call
  11. import pytest
  12. from arroyo.backends.kafka import KafkaPayload
  13. from arroyo.dlq import InvalidMessage
  14. from arroyo.processing.strategies import MessageRejected
  15. from arroyo.types import BrokerValue, Message, Partition, Topic, Value
  16. from sentry.sentry_metrics.aggregation_option_registry import get_aggregation_options
  17. from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
  18. from sentry.sentry_metrics.consumers.indexer.batch import valid_metric_name
  19. from sentry.sentry_metrics.consumers.indexer.common import (
  20. BatchMessages,
  21. IndexerOutputMessageBatch,
  22. MetricsBatchBuilder,
  23. )
  24. from sentry.sentry_metrics.consumers.indexer.processing import MessageProcessor
  25. from sentry.sentry_metrics.indexer.mock import MockIndexer, RawSimpleIndexer
  26. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  27. from sentry.testutils.helpers.options import override_options
  28. from sentry.utils import json
  29. logger = logging.getLogger(__name__)
  30. pytestmark = pytest.mark.sentry_metrics
  31. MESSAGE_PROCESSOR = MessageProcessor(
  32. get_ingest_config(UseCaseKey.PERFORMANCE, IndexerStorage.POSTGRES)
  33. )
  34. BROKER_TIMESTAMP = datetime.now(tz=timezone.utc)
  35. @pytest.fixture(autouse=True)
  36. def update_sentry_settings(settings: Any) -> None:
  37. settings.SENTRY_METRICS_INDEXER_RAISE_VALIDATION_ERRORS = True
  38. def compare_messages_ignoring_mapping_metadata(actual: Message, expected: Message) -> None:
  39. assert actual.committable == expected.committable
  40. actual_payload = actual.payload
  41. expected_payload = expected.payload
  42. if isinstance(actual_payload, InvalidMessage):
  43. assert actual_payload == expected_payload
  44. return
  45. assert actual_payload.key == expected_payload.key
  46. actual_headers_without_mapping_sources = [
  47. (k, v.encode()) for k, v in actual_payload.headers if k != "mapping_sources"
  48. ]
  49. assert actual_headers_without_mapping_sources == expected_payload.headers
  50. actual_deserialized = json.loads(actual_payload.value)
  51. expected_deserialized = json.loads(expected_payload.value)
  52. del actual_deserialized["mapping_meta"]
  53. assert actual_deserialized == expected_deserialized
  54. def compare_message_batches_ignoring_metadata(
  55. actual: IndexerOutputMessageBatch, expected: Sequence[Message]
  56. ) -> None:
  57. assert len(actual.data) == len(expected)
  58. for a, e in zip(actual.data, expected):
  59. compare_messages_ignoring_mapping_metadata(a, e)
  60. def _batch_message_set_up(
  61. next_step: Mock, max_batch_time: float = 100.0, max_batch_size: int = 2
  62. ) -> tuple[Any, Any, Any]:
  63. # batch time is in seconds
  64. batch_messages_step = BatchMessages(
  65. next_step=next_step, max_batch_time=max_batch_time, max_batch_size=max_batch_size
  66. )
  67. message1 = Message(
  68. BrokerValue(
  69. KafkaPayload(None, b"some value", []),
  70. Partition(Topic("topic"), 0),
  71. 1,
  72. BROKER_TIMESTAMP,
  73. )
  74. )
  75. message2 = Message(
  76. BrokerValue(
  77. KafkaPayload(None, b"another value", []),
  78. Partition(Topic("topic"), 0),
  79. 2,
  80. BROKER_TIMESTAMP,
  81. )
  82. )
  83. return (batch_messages_step, message1, message2)
  84. def test_batch_messages() -> None:
  85. next_step = Mock()
  86. batch_messages_step, message1, message2 = _batch_message_set_up(next_step)
  87. # submit the first message, batch builder should should be created
  88. # and the messaged added to the batch
  89. batch_messages_step.submit(message=message1)
  90. assert len(batch_messages_step._BatchMessages__batch) == 1
  91. # neither batch_size or batch_time as been met so poll shouldn't
  92. # do anything yet (aka shouldn't flush and call next_step.submit)
  93. batch_messages_step.poll()
  94. assert len(batch_messages_step._BatchMessages__batch) == 1
  95. assert not next_step.submit.called
  96. # submit the second message, message should be added to the batch
  97. # which will now saturate the batch_size (2). This will trigger
  98. # __flush which in turn calls next.submit and reset the batch to None
  99. batch_messages_step.submit(message=message2)
  100. assert next_step.submit.call_args == call(
  101. Message(Value([message1, message2], message2.committable)),
  102. )
  103. assert batch_messages_step._BatchMessages__batch is None
  104. def test_batch_messages_rejected_message() -> None:
  105. next_step = Mock()
  106. next_step.submit.side_effect = MessageRejected()
  107. batch_messages_step, message1, message2 = _batch_message_set_up(next_step)
  108. batch_messages_step.poll()
  109. batch_messages_step.submit(message=message1)
  110. # if we try to submit a batch when the next step is
  111. # not ready to accept more messages we'll get a
  112. # MessageRejected error. This will be reraised for
  113. # to the stream processor on the subsequent call to submit
  114. batch_messages_step.submit(message=message2)
  115. with pytest.raises(MessageRejected):
  116. batch_messages_step.submit(message=message2)
  117. # when poll is called, we still try to flush the batch
  118. # caust its full but we handled the MessageRejected error
  119. batch_messages_step.poll()
  120. assert next_step.submit.called
  121. def test_batch_messages_join() -> None:
  122. next_step = Mock()
  123. batch_messages_step, message1, _ = _batch_message_set_up(next_step)
  124. batch_messages_step.poll()
  125. batch_messages_step.submit(message=message1)
  126. # A rebalance, restart, scale up or any other event
  127. # that causes partitions to be revoked will call join
  128. batch_messages_step.join(timeout=3)
  129. # we don't flush the batch
  130. assert not next_step.submit.called
  131. def test_metrics_batch_builder() -> None:
  132. max_batch_time = 3.0 # seconds
  133. max_batch_size = 2
  134. # 1. Ready when max_batch_size is reached
  135. batch_builder_size = MetricsBatchBuilder(
  136. max_batch_size=max_batch_size, max_batch_time=max_batch_time
  137. )
  138. assert not batch_builder_size.ready()
  139. message1 = Message(
  140. BrokerValue(
  141. KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
  142. )
  143. )
  144. batch_builder_size.append(message1)
  145. assert not batch_builder_size.ready()
  146. message2 = Message(
  147. BrokerValue(
  148. KafkaPayload(None, b"another value", []),
  149. Partition(Topic("topic"), 0),
  150. 2,
  151. datetime.now(),
  152. )
  153. )
  154. batch_builder_size.append(message2)
  155. assert batch_builder_size.ready()
  156. # 2. Ready when max_batch_time is reached
  157. batch_builder_time = MetricsBatchBuilder(
  158. max_batch_size=max_batch_size, max_batch_time=max_batch_time
  159. )
  160. assert not batch_builder_time.ready()
  161. message1 = Message(
  162. BrokerValue(
  163. KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
  164. )
  165. )
  166. batch_builder_time.append(message1)
  167. assert not batch_builder_time.ready()
  168. time.sleep(3)
  169. assert batch_builder_time.ready()
  170. # 3. Adding the same message twice to the same batch
  171. batch_builder_time = MetricsBatchBuilder(
  172. max_batch_size=max_batch_size, max_batch_time=max_batch_time
  173. )
  174. message1 = Message(
  175. BrokerValue(
  176. KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
  177. )
  178. )
  179. batch_builder_time.append(message1)
  180. ts = int(datetime.now(tz=timezone.utc).timestamp())
  181. counter_payloads: list[dict[str, Any]] = [
  182. {
  183. "name": f"c:{use_case.value}/alert@none",
  184. "tags": {
  185. "environment": "production",
  186. "session.status": "init",
  187. },
  188. "timestamp": ts,
  189. "type": "c",
  190. "value": 1.0,
  191. "org_id": 1,
  192. "project_id": 3,
  193. "retention_days": 90,
  194. }
  195. for use_case in UseCaseID
  196. if use_case is not UseCaseID.SESSIONS
  197. ]
  198. distribution_payloads: list[dict[str, Any]] = [
  199. {
  200. "name": f"d:{use_case.value}/alert@none",
  201. "tags": {
  202. "environment": "production",
  203. "session.status": "healthy",
  204. },
  205. "timestamp": ts,
  206. "type": "d",
  207. "value": [4, 5, 6],
  208. "org_id": 1,
  209. "project_id": 3,
  210. "retention_days": 90,
  211. }
  212. for use_case in UseCaseID
  213. if use_case is not UseCaseID.SESSIONS
  214. ]
  215. set_payloads: list[dict[str, Any]] = [
  216. {
  217. "name": f"s:{use_case.value}/alert@none",
  218. "tags": {
  219. "environment": "production",
  220. "session.status": "errored",
  221. },
  222. "timestamp": ts,
  223. "type": "s",
  224. "value": [3],
  225. "org_id": 1,
  226. "project_id": 3,
  227. "retention_days": 90,
  228. }
  229. for use_case in UseCaseID
  230. if use_case is not UseCaseID.SESSIONS
  231. ]
  232. def __translated_payload(
  233. payload: dict[str, Any],
  234. indexer=None,
  235. ) -> dict[str, str | int | list[int] | MutableMapping[int, int]]:
  236. """
  237. Translates strings to ints using the MockIndexer
  238. in addition to adding the retention_days
  239. """
  240. indexer = indexer or MESSAGE_PROCESSOR._indexer
  241. MRI_RE_PATTERN = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$")
  242. payload = deepcopy(payload)
  243. org_id = payload["org_id"]
  244. matched_mri = MRI_RE_PATTERN.match(payload["name"])
  245. assert matched_mri is not None
  246. use_case_id = UseCaseID(matched_mri.group(2))
  247. new_tags = {
  248. indexer.resolve(use_case_id=use_case_id, org_id=org_id, string=k): v
  249. for k, v in payload["tags"].items()
  250. }
  251. agg_options = get_aggregation_options(payload["name"])
  252. if agg_options:
  253. # Keep this assert for now to indicate that we only have maximum 1 aggregation
  254. # option per metric bucket, regardless of the use case in the bucket
  255. assert len(agg_options) == 1
  256. agg_option = agg_options.popitem()[0]
  257. payload["aggregation_option"] = agg_option.value
  258. payload["metric_id"] = indexer.resolve(
  259. use_case_id=use_case_id, org_id=org_id, string=payload["name"]
  260. )
  261. payload["retention_days"] = 90
  262. payload["tags"] = new_tags
  263. payload["use_case_id"] = use_case_id.value
  264. payload["sentry_received_timestamp"] = BROKER_TIMESTAMP.timestamp()
  265. payload["version"] = 2
  266. payload.pop("unit", None)
  267. del payload["name"]
  268. return payload
  269. @pytest.mark.django_db
  270. def test_process_messages() -> None:
  271. message_payloads = counter_payloads + distribution_payloads + set_payloads
  272. message_batch = [
  273. Message(
  274. BrokerValue(
  275. KafkaPayload(None, json.dumps(payload).encode("utf-8"), []),
  276. Partition(Topic("topic"), 0),
  277. i + 1,
  278. BROKER_TIMESTAMP,
  279. )
  280. )
  281. for i, payload in enumerate(message_payloads)
  282. ]
  283. # the outer message uses the last message's partition, offset, and timestamp
  284. last = message_batch[-1]
  285. outer_message = Message(Value(message_batch, last.committable))
  286. # Add a bunch of special aggregation options
  287. # The expectation with this is that there should
  288. # always be only 1 aggregation per bucket/payload
  289. with override_options(
  290. {
  291. "sentry-metrics.10s-granularity": True,
  292. "sentry-metrics.drop-percentiles.per-use-case": {"spans", "transactions"},
  293. }
  294. ):
  295. new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
  296. expected_new_batch = []
  297. for i, m in enumerate(message_batch):
  298. assert isinstance(m.value, BrokerValue)
  299. expected_new_batch.append(
  300. Message(
  301. BrokerValue(
  302. KafkaPayload(
  303. None,
  304. json.dumps(__translated_payload(message_payloads[i])).encode("utf-8"),
  305. [
  306. ("metric_type", message_payloads[i]["type"].encode()),
  307. ],
  308. ),
  309. m.value.partition,
  310. m.value.offset,
  311. m.value.timestamp,
  312. )
  313. )
  314. )
  315. compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
  316. @pytest.mark.django_db
  317. def test_process_messages_default_card_rollout(set_sentry_option: Callable[..., Any]) -> None:
  318. message_payloads = counter_payloads + distribution_payloads + set_payloads
  319. message_batch = [
  320. Message(
  321. BrokerValue(
  322. KafkaPayload(None, json.dumps(payload).encode("utf-8"), []),
  323. Partition(Topic("topic"), 0),
  324. i + 1,
  325. BROKER_TIMESTAMP,
  326. )
  327. )
  328. for i, payload in enumerate(message_payloads)
  329. ]
  330. # the outer message uses the last message's partition, offset, and timestamp
  331. last = message_batch[-1]
  332. outer_message = Message(Value(message_batch, last.committable))
  333. new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
  334. assert len(new_batch.data) == len(message_batch)
  335. invalid_payloads = [
  336. (
  337. {
  338. "name": "c:transactions/alert@none",
  339. "tags": {
  340. "environment" * 21: "production",
  341. "session.status": "errored",
  342. },
  343. "timestamp": ts,
  344. "type": "s",
  345. "value": [3],
  346. "org_id": 1,
  347. "project_id": 3,
  348. "retention_days": 90,
  349. },
  350. "invalid_tags",
  351. True,
  352. ),
  353. (
  354. {
  355. "name": "c:transactions/alert@" + ("none" * 50),
  356. "tags": {
  357. "environment": "production",
  358. "session.status": "errored",
  359. },
  360. "timestamp": ts,
  361. "type": "s",
  362. "value": [3],
  363. "org_id": 1,
  364. "project_id": 3,
  365. "retention_days": 90,
  366. },
  367. "invalid_metric_name",
  368. True,
  369. ),
  370. (
  371. b"invalid_json_payload",
  372. "invalid_json",
  373. False,
  374. ),
  375. ]
  376. @pytest.mark.django_db
  377. @pytest.mark.parametrize("invalid_payload, error_text, format_payload", invalid_payloads)
  378. def test_process_messages_invalid_messages(
  379. invalid_payload: dict[str, Any], error_text: str, format_payload: bool, caplog: Any
  380. ) -> None:
  381. """
  382. Test the following kinds of invalid payloads:
  383. * tag key > 200 char
  384. * metric name > 200 char
  385. * invalid json
  386. Each outer_message that is passed into process_messages is a batch of messages. If
  387. there is an invalid payload for one of the messages, we just drop that message,
  388. not the entire batch.
  389. The `counter_payload` in these tests is always a valid payload, and the test arg
  390. `invalid_payload` has a payload that fits the scenarios outlined above.
  391. """
  392. formatted_payload = (
  393. json.dumps(invalid_payload).encode("utf-8") if format_payload else invalid_payload
  394. )
  395. message_batch = [
  396. Message(
  397. BrokerValue(
  398. KafkaPayload(None, json.dumps(counter_payloads[0]).encode("utf-8"), []),
  399. Partition(Topic("topic"), 0),
  400. 0,
  401. BROKER_TIMESTAMP,
  402. )
  403. ),
  404. Message(
  405. BrokerValue(
  406. KafkaPayload(None, formatted_payload, []), # type: ignore[arg-type]
  407. Partition(Topic("topic"), 0),
  408. 1,
  409. BROKER_TIMESTAMP,
  410. )
  411. ),
  412. ]
  413. # the outer message uses the last message's partition, offset, and timestamp
  414. last = message_batch[-1]
  415. outer_message = Message(Value(message_batch, last.committable))
  416. with caplog.at_level(logging.ERROR):
  417. new_batch = MESSAGE_PROCESSOR.process_messages(outer_message=outer_message)
  418. # we expect just the valid counter_payload msg to be left
  419. expected_msg = message_batch[0]
  420. expected_new_batch = [
  421. Message(
  422. Value(
  423. KafkaPayload(
  424. None,
  425. json.dumps(__translated_payload(counter_payloads[0])).encode("utf-8"),
  426. [("metric_type", b"c")],
  427. ),
  428. expected_msg.committable,
  429. )
  430. ),
  431. Message(
  432. Value(
  433. InvalidMessage(Partition(Topic("topic"), 0), 1),
  434. message_batch[1].committable,
  435. )
  436. ),
  437. ]
  438. compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
  439. assert error_text in caplog.text
  440. @pytest.mark.django_db
  441. def test_process_messages_rate_limited(caplog: Any, settings: Any) -> None:
  442. """
  443. Test handling of `None`-values coming from the indexer service, which
  444. happens when postgres writes are being rate-limited.
  445. """
  446. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  447. rate_limited_payload = deepcopy(distribution_payloads[0])
  448. rate_limited_payload["tags"]["rate_limited_test"] = "true"
  449. message_batch = [
  450. Message(
  451. BrokerValue(
  452. KafkaPayload(None, json.dumps(counter_payloads[0]).encode("utf-8"), []),
  453. Partition(Topic("topic"), 0),
  454. 0,
  455. BROKER_TIMESTAMP,
  456. )
  457. ),
  458. Message(
  459. BrokerValue(
  460. KafkaPayload(None, json.dumps(rate_limited_payload).encode("utf-8"), []),
  461. Partition(Topic("topic"), 0),
  462. 1,
  463. BROKER_TIMESTAMP,
  464. )
  465. ),
  466. ]
  467. # the outer message uses the last message's partition, offset, and timestamp
  468. last = message_batch[-1]
  469. outer_message = Message(Value(message_batch, last.committable))
  470. message_processor = MessageProcessor(
  471. get_ingest_config(UseCaseKey.PERFORMANCE, IndexerStorage.MOCK)
  472. )
  473. # Insert a None-value into the mock-indexer to simulate a rate-limit.
  474. mock_indexer = message_processor._indexer
  475. assert isinstance(mock_indexer, MockIndexer)
  476. raw_simple_string_indexer = mock_indexer.indexer
  477. assert isinstance(raw_simple_string_indexer, RawSimpleIndexer)
  478. rgx = re.compile("^([c|s|d|g|e]):([a-zA-Z0-9_]+)/.*$").match(distribution_payloads[0]["name"])
  479. assert rgx is not None
  480. raw_simple_string_indexer._strings[UseCaseID(rgx.group(2))][1]["rate_limited_test"] = None
  481. with caplog.at_level(logging.ERROR):
  482. new_batch = message_processor.process_messages(outer_message=outer_message)
  483. # we expect just the counter_payload msg to be left, as that one didn't
  484. # cause/depend on string writes that have been rate limited
  485. expected_msg = message_batch[0]
  486. assert isinstance(expected_msg.value, BrokerValue)
  487. expected_new_batch = [
  488. Message(
  489. BrokerValue(
  490. KafkaPayload(
  491. None,
  492. json.dumps(
  493. __translated_payload(counter_payloads[0], message_processor._indexer)
  494. ).encode("utf-8"),
  495. [("metric_type", b"c")],
  496. ),
  497. expected_msg.value.partition,
  498. expected_msg.value.offset,
  499. expected_msg.value.timestamp,
  500. )
  501. )
  502. ]
  503. compare_message_batches_ignoring_metadata(new_batch, expected_new_batch)
  504. assert "dropped_message" in caplog.text
  505. def test_valid_metric_name() -> None:
  506. assert valid_metric_name("") is True
  507. assert valid_metric_name("blah") is True
  508. assert valid_metric_name("invalid" * 200) is False
  509. def test_process_messages_is_pickleable():
  510. # needed so that the parallel transform step starts up properly
  511. pickle.dumps(MESSAGE_PROCESSOR.process_messages)