test_batch.py 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283
  1. import logging
  2. from collections.abc import MutableMapping
  3. from datetime import datetime, timezone
  4. import pytest
  5. import sentry_kafka_schemas
  6. from arroyo.backends.kafka import KafkaPayload
  7. from arroyo.processing.strategies.decoder.json import JsonCodec
  8. from arroyo.types import BrokerValue, Message, Partition, Topic, Value
  9. from sentry.sentry_metrics.configuration import UseCaseKey
  10. from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch, PartitionIdxOffset
  11. from sentry.sentry_metrics.indexer.base import FetchType, FetchTypeExt, Metadata
  12. from sentry.snuba.metrics.naming_layer.mri import SessionMRI
  13. from sentry.utils import json
  14. pytestmark = pytest.mark.sentry_metrics
  15. ts = int(datetime.now(tz=timezone.utc).timestamp())
  16. counter_payload = {
  17. "name": SessionMRI.SESSION.value,
  18. "tags": {
  19. "environment": "production",
  20. "session.status": "init",
  21. },
  22. "timestamp": ts,
  23. "type": "c",
  24. "value": 1,
  25. "org_id": 1,
  26. "retention_days": 90,
  27. "project_id": 3,
  28. }
  29. distribution_payload = {
  30. "name": SessionMRI.RAW_DURATION.value,
  31. "tags": {
  32. "environment": "production",
  33. "session.status": "healthy",
  34. },
  35. "timestamp": ts,
  36. "type": "d",
  37. "value": [4, 5, 6],
  38. "org_id": 1,
  39. "retention_days": 90,
  40. "project_id": 3,
  41. }
  42. set_payload = {
  43. "name": SessionMRI.ERROR.value,
  44. "tags": {
  45. "environment": "production",
  46. "session.status": "errored",
  47. },
  48. "timestamp": ts,
  49. "type": "s",
  50. "value": [3],
  51. "org_id": 1,
  52. "retention_days": 90,
  53. "project_id": 3,
  54. }
  55. extracted_string_output = {
  56. 1: {
  57. "c:sessions/session@none",
  58. "d:sessions/duration@second",
  59. "environment",
  60. "errored",
  61. "healthy",
  62. "init",
  63. "production",
  64. "s:sessions/error@none",
  65. "session.status",
  66. }
  67. }
  68. _INGEST_SCHEMA = JsonCodec(sentry_kafka_schemas.get_schema("ingest-metrics")["schema"])
  69. def _construct_messages(payloads):
  70. message_batch = []
  71. for i, (payload, headers) in enumerate(payloads):
  72. message_batch.append(
  73. Message(
  74. BrokerValue(
  75. KafkaPayload(None, json.dumps(payload).encode("utf-8"), headers or []),
  76. Partition(Topic("topic"), 0),
  77. i,
  78. datetime.now(),
  79. )
  80. )
  81. )
  82. return message_batch
  83. def _construct_outer_message(payloads):
  84. message_batch = _construct_messages(payloads)
  85. # the outer message uses the last message's partition, offset, and timestamp
  86. last = message_batch[-1]
  87. outer_message = Message(Value(message_batch, last.committable))
  88. return outer_message
  89. def _deconstruct_messages(snuba_messages, kafka_logical_topic="snuba-metrics"):
  90. """
  91. Convert a list of messages returned by `reconstruct_messages` into python
  92. primitives, to run assertions on:
  93. assert _deconstruct_messages(batch.reconstruct_messages(...)) == [ ... ]
  94. This is slightly nicer to work with than:
  95. assert batch.reconstruct_messages(...) == _construct_messages([ ... ])
  96. ...because pytest's assertion diffs work better with python primitives.
  97. """
  98. rv = []
  99. codec = JsonCodec(sentry_kafka_schemas.get_schema(kafka_logical_topic)["schema"])
  100. for msg in snuba_messages:
  101. decoded = json.loads(msg.payload.value.decode("utf-8"))
  102. codec.validate(decoded)
  103. rv.append((decoded, msg.payload.headers))
  104. return rv
  105. def _deconstruct_routing_messages(snuba_messages):
  106. """
  107. Similar to `_deconstruct_messages`, but for routing messages.
  108. """
  109. all_messages = []
  110. for msg in snuba_messages:
  111. headers: MutableMapping[str, str] = {}
  112. for key, value in msg.payload.routing_header.items():
  113. headers.update({key: value})
  114. payload = json.loads(msg.payload.routing_message.value.decode("utf-8"))
  115. all_messages.append((headers, payload, msg.payload.routing_message.headers))
  116. return all_messages
  117. def _get_string_indexer_log_records(caplog):
  118. """
  119. Get all log records and relevant extra arguments for easy snapshotting.
  120. """
  121. return [
  122. (
  123. rec.message,
  124. {
  125. k: v
  126. for k, v in rec.__dict__.items()
  127. if k
  128. in (
  129. "string_type",
  130. "is_global_quota",
  131. "num_global_quotas",
  132. "num_global_quotas",
  133. "org_batch_size",
  134. )
  135. },
  136. )
  137. for rec in caplog.records
  138. ]
  139. @pytest.mark.parametrize(
  140. "should_index_tag_values, expected",
  141. [
  142. pytest.param(
  143. True,
  144. {
  145. 1: {
  146. "c:sessions/session@none",
  147. "d:sessions/duration@second",
  148. "environment",
  149. "errored",
  150. "healthy",
  151. "init",
  152. "production",
  153. "s:sessions/error@none",
  154. "session.status",
  155. },
  156. },
  157. id="index tag values true",
  158. ),
  159. pytest.param(
  160. False,
  161. {
  162. 1: {
  163. "c:sessions/session@none",
  164. "d:sessions/duration@second",
  165. "environment",
  166. "s:sessions/error@none",
  167. "session.status",
  168. },
  169. },
  170. id="index tag values false",
  171. ),
  172. ],
  173. )
  174. def test_extract_strings_with_rollout(should_index_tag_values, expected):
  175. """
  176. Test that the indexer batch extracts the correct strings from the messages
  177. based on whether tag values should be indexed or not.
  178. """
  179. outer_message = _construct_outer_message(
  180. [
  181. (counter_payload, []),
  182. (distribution_payload, []),
  183. (set_payload, []),
  184. ]
  185. )
  186. batch = IndexerBatch(
  187. UseCaseKey.PERFORMANCE,
  188. outer_message,
  189. should_index_tag_values,
  190. False,
  191. arroyo_input_codec=_INGEST_SCHEMA,
  192. )
  193. assert batch.extract_strings() == expected
  194. def test_all_resolved(caplog, settings):
  195. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  196. outer_message = _construct_outer_message(
  197. [
  198. (counter_payload, []),
  199. (distribution_payload, []),
  200. (set_payload, []),
  201. ]
  202. )
  203. batch = IndexerBatch(
  204. UseCaseKey.PERFORMANCE,
  205. outer_message,
  206. True,
  207. False,
  208. arroyo_input_codec=_INGEST_SCHEMA,
  209. )
  210. assert batch.extract_strings() == (
  211. {
  212. 1: {
  213. "c:sessions/session@none",
  214. "d:sessions/duration@second",
  215. "environment",
  216. "errored",
  217. "healthy",
  218. "init",
  219. "production",
  220. "s:sessions/error@none",
  221. "session.status",
  222. }
  223. }
  224. )
  225. caplog.set_level(logging.ERROR)
  226. snuba_payloads = batch.reconstruct_messages(
  227. {
  228. 1: {
  229. "c:sessions/session@none": 1,
  230. "d:sessions/duration@second": 2,
  231. "environment": 3,
  232. "errored": 4,
  233. "healthy": 5,
  234. "init": 6,
  235. "production": 7,
  236. "s:sessions/error@none": 8,
  237. "session.status": 9,
  238. }
  239. },
  240. {
  241. 1: {
  242. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  243. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  244. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  245. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  246. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  247. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  248. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  249. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  250. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  251. }
  252. },
  253. )
  254. assert _get_string_indexer_log_records(caplog) == []
  255. assert _deconstruct_messages(snuba_payloads) == [
  256. (
  257. {
  258. "mapping_meta": {
  259. "c": {
  260. "1": "c:sessions/session@none",
  261. "3": "environment",
  262. "7": "production",
  263. "9": "session.status",
  264. },
  265. "h": {"6": "init"},
  266. },
  267. "metric_id": 1,
  268. "org_id": 1,
  269. "project_id": 3,
  270. "retention_days": 90,
  271. "tags": {"3": 7, "9": 6},
  272. "timestamp": ts,
  273. "type": "c",
  274. "use_case_id": "performance",
  275. "value": 1.0,
  276. },
  277. [("mapping_sources", b"ch"), ("metric_type", "c")],
  278. ),
  279. (
  280. {
  281. "mapping_meta": {
  282. "c": {
  283. "2": "d:sessions/duration@second",
  284. "3": "environment",
  285. "7": "production",
  286. "9": "session.status",
  287. },
  288. "h": {"5": "healthy"},
  289. },
  290. "metric_id": 2,
  291. "org_id": 1,
  292. "project_id": 3,
  293. "retention_days": 90,
  294. "tags": {"3": 7, "9": 5},
  295. "timestamp": ts,
  296. "type": "d",
  297. "use_case_id": "performance",
  298. "value": [4, 5, 6],
  299. },
  300. [("mapping_sources", b"ch"), ("metric_type", "d")],
  301. ),
  302. (
  303. {
  304. "mapping_meta": {
  305. "c": {
  306. "3": "environment",
  307. "7": "production",
  308. "8": "s:sessions/error@none",
  309. "9": "session.status",
  310. },
  311. "d": {"4": "errored"},
  312. },
  313. "metric_id": 8,
  314. "org_id": 1,
  315. "project_id": 3,
  316. "retention_days": 90,
  317. "tags": {"3": 7, "9": 4},
  318. "timestamp": ts,
  319. "type": "s",
  320. "use_case_id": "performance",
  321. "value": [3],
  322. },
  323. [("mapping_sources", b"cd"), ("metric_type", "s")],
  324. ),
  325. ]
  326. def test_all_resolved_with_routing_information(caplog, settings):
  327. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  328. outer_message = _construct_outer_message(
  329. [
  330. (counter_payload, []),
  331. (distribution_payload, []),
  332. (set_payload, []),
  333. ]
  334. )
  335. batch = IndexerBatch(
  336. UseCaseKey.PERFORMANCE,
  337. outer_message,
  338. True,
  339. True,
  340. arroyo_input_codec=_INGEST_SCHEMA,
  341. )
  342. assert batch.extract_strings() == (
  343. {
  344. 1: {
  345. "c:sessions/session@none",
  346. "d:sessions/duration@second",
  347. "environment",
  348. "errored",
  349. "healthy",
  350. "init",
  351. "production",
  352. "s:sessions/error@none",
  353. "session.status",
  354. }
  355. }
  356. )
  357. caplog.set_level(logging.ERROR)
  358. snuba_payloads = batch.reconstruct_messages(
  359. {
  360. 1: {
  361. "c:sessions/session@none": 1,
  362. "d:sessions/duration@second": 2,
  363. "environment": 3,
  364. "errored": 4,
  365. "healthy": 5,
  366. "init": 6,
  367. "production": 7,
  368. "s:sessions/error@none": 8,
  369. "session.status": 9,
  370. }
  371. },
  372. {
  373. 1: {
  374. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  375. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  376. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  377. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  378. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  379. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  380. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  381. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  382. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  383. }
  384. },
  385. )
  386. assert _get_string_indexer_log_records(caplog) == []
  387. assert _deconstruct_routing_messages(snuba_payloads) == [
  388. (
  389. {"org_id": 1},
  390. {
  391. "mapping_meta": {
  392. "c": {
  393. "1": "c:sessions/session@none",
  394. "3": "environment",
  395. "7": "production",
  396. "9": "session.status",
  397. },
  398. "h": {"6": "init"},
  399. },
  400. "metric_id": 1,
  401. "org_id": 1,
  402. "project_id": 3,
  403. "retention_days": 90,
  404. "tags": {"3": 7, "9": 6},
  405. "timestamp": ts,
  406. "type": "c",
  407. "use_case_id": "performance",
  408. "value": 1.0,
  409. },
  410. [("mapping_sources", b"ch"), ("metric_type", "c")],
  411. ),
  412. (
  413. {"org_id": 1},
  414. {
  415. "mapping_meta": {
  416. "c": {
  417. "2": "d:sessions/duration@second",
  418. "3": "environment",
  419. "7": "production",
  420. "9": "session.status",
  421. },
  422. "h": {"5": "healthy"},
  423. },
  424. "metric_id": 2,
  425. "org_id": 1,
  426. "project_id": 3,
  427. "retention_days": 90,
  428. "tags": {"3": 7, "9": 5},
  429. "timestamp": ts,
  430. "type": "d",
  431. "use_case_id": "performance",
  432. "value": [4, 5, 6],
  433. },
  434. [("mapping_sources", b"ch"), ("metric_type", "d")],
  435. ),
  436. (
  437. {"org_id": 1},
  438. {
  439. "mapping_meta": {
  440. "c": {
  441. "3": "environment",
  442. "7": "production",
  443. "8": "s:sessions/error@none",
  444. "9": "session.status",
  445. },
  446. "d": {"4": "errored"},
  447. },
  448. "metric_id": 8,
  449. "org_id": 1,
  450. "project_id": 3,
  451. "retention_days": 90,
  452. "tags": {"3": 7, "9": 4},
  453. "timestamp": ts,
  454. "type": "s",
  455. "use_case_id": "performance",
  456. "value": [3],
  457. },
  458. [("mapping_sources", b"cd"), ("metric_type", "s")],
  459. ),
  460. ]
  461. def test_all_resolved_retention_days_honored(caplog, settings):
  462. """
  463. Tests that the indexer batch honors the incoming retention_days values
  464. from Relay or falls back to 90.
  465. """
  466. distribution_payload_modified = distribution_payload.copy()
  467. distribution_payload_modified["retention_days"] = 30
  468. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  469. outer_message = _construct_outer_message(
  470. [
  471. (counter_payload, []),
  472. (distribution_payload_modified, []),
  473. (set_payload, []),
  474. ]
  475. )
  476. batch = IndexerBatch(
  477. UseCaseKey.PERFORMANCE,
  478. outer_message,
  479. True,
  480. False,
  481. arroyo_input_codec=_INGEST_SCHEMA,
  482. )
  483. assert batch.extract_strings() == (
  484. {
  485. 1: {
  486. "c:sessions/session@none",
  487. "d:sessions/duration@second",
  488. "environment",
  489. "errored",
  490. "healthy",
  491. "init",
  492. "production",
  493. "s:sessions/error@none",
  494. "session.status",
  495. }
  496. }
  497. )
  498. caplog.set_level(logging.ERROR)
  499. snuba_payloads = batch.reconstruct_messages(
  500. {
  501. 1: {
  502. "c:sessions/session@none": 1,
  503. "d:sessions/duration@second": 2,
  504. "environment": 3,
  505. "errored": 4,
  506. "healthy": 5,
  507. "init": 6,
  508. "production": 7,
  509. "s:sessions/error@none": 8,
  510. "session.status": 9,
  511. }
  512. },
  513. {
  514. 1: {
  515. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  516. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  517. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  518. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  519. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  520. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  521. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  522. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  523. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  524. }
  525. },
  526. )
  527. assert _get_string_indexer_log_records(caplog) == []
  528. assert _deconstruct_messages(snuba_payloads) == [
  529. (
  530. {
  531. "mapping_meta": {
  532. "c": {
  533. "1": "c:sessions/session@none",
  534. "3": "environment",
  535. "7": "production",
  536. "9": "session.status",
  537. },
  538. "h": {"6": "init"},
  539. },
  540. "metric_id": 1,
  541. "org_id": 1,
  542. "project_id": 3,
  543. "retention_days": 90,
  544. "tags": {"3": 7, "9": 6},
  545. "timestamp": ts,
  546. "type": "c",
  547. "use_case_id": "performance",
  548. "value": 1.0,
  549. },
  550. [("mapping_sources", b"ch"), ("metric_type", "c")],
  551. ),
  552. (
  553. {
  554. "mapping_meta": {
  555. "c": {
  556. "2": "d:sessions/duration@second",
  557. "3": "environment",
  558. "7": "production",
  559. "9": "session.status",
  560. },
  561. "h": {"5": "healthy"},
  562. },
  563. "metric_id": 2,
  564. "org_id": 1,
  565. "project_id": 3,
  566. "retention_days": 30,
  567. "tags": {"3": 7, "9": 5},
  568. "timestamp": ts,
  569. "type": "d",
  570. "use_case_id": "performance",
  571. "value": [4, 5, 6],
  572. },
  573. [("mapping_sources", b"ch"), ("metric_type", "d")],
  574. ),
  575. (
  576. {
  577. "mapping_meta": {
  578. "c": {
  579. "3": "environment",
  580. "7": "production",
  581. "8": "s:sessions/error@none",
  582. "9": "session.status",
  583. },
  584. "d": {"4": "errored"},
  585. },
  586. "metric_id": 8,
  587. "org_id": 1,
  588. "project_id": 3,
  589. "retention_days": 90,
  590. "tags": {"3": 7, "9": 4},
  591. "timestamp": ts,
  592. "type": "s",
  593. "use_case_id": "performance",
  594. "value": [3],
  595. },
  596. [("mapping_sources", b"cd"), ("metric_type", "s")],
  597. ),
  598. ]
  599. def test_batch_resolve_with_values_not_indexed(caplog, settings):
  600. """
  601. Tests that the indexer batch skips resolving tag values for indexing and
  602. sends the raw tag value to Snuba.
  603. The difference between this test and test_all_resolved is that the tag values are
  604. strings instead of integers. Because of that indexed tag keys are
  605. different and mapping_meta is smaller. The payload also contains the
  606. version field to specify that the tag values are not indexed.
  607. """
  608. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  609. outer_message = _construct_outer_message(
  610. [
  611. (counter_payload, []),
  612. (distribution_payload, []),
  613. (set_payload, []),
  614. ]
  615. )
  616. batch = IndexerBatch(
  617. UseCaseKey.PERFORMANCE,
  618. outer_message,
  619. False,
  620. False,
  621. arroyo_input_codec=_INGEST_SCHEMA,
  622. )
  623. assert batch.extract_strings() == (
  624. {
  625. 1: {
  626. "c:sessions/session@none",
  627. "d:sessions/duration@second",
  628. "environment",
  629. "s:sessions/error@none",
  630. "session.status",
  631. }
  632. }
  633. )
  634. caplog.set_level(logging.ERROR)
  635. snuba_payloads = batch.reconstruct_messages(
  636. {
  637. 1: {
  638. "c:sessions/session@none": 1,
  639. "d:sessions/duration@second": 2,
  640. "environment": 3,
  641. "s:sessions/error@none": 4,
  642. "session.status": 5,
  643. }
  644. },
  645. {
  646. 1: {
  647. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  648. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  649. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  650. "s:sessions/error@none": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  651. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  652. }
  653. },
  654. )
  655. assert _get_string_indexer_log_records(caplog) == []
  656. assert _deconstruct_messages(snuba_payloads, kafka_logical_topic="snuba-generic-metrics") == [
  657. (
  658. {
  659. "version": 2,
  660. "mapping_meta": {
  661. "c": {
  662. "1": "c:sessions/session@none",
  663. "3": "environment",
  664. "5": "session.status",
  665. },
  666. },
  667. "metric_id": 1,
  668. "org_id": 1,
  669. "project_id": 3,
  670. "retention_days": 90,
  671. "tags": {"3": "production", "5": "init"},
  672. "timestamp": ts,
  673. "type": "c",
  674. "use_case_id": "performance",
  675. "value": 1.0,
  676. },
  677. [("mapping_sources", b"c"), ("metric_type", "c")],
  678. ),
  679. (
  680. {
  681. "version": 2,
  682. "mapping_meta": {
  683. "c": {
  684. "2": "d:sessions/duration@second",
  685. "3": "environment",
  686. "5": "session.status",
  687. },
  688. },
  689. "metric_id": 2,
  690. "org_id": 1,
  691. "project_id": 3,
  692. "retention_days": 90,
  693. "tags": {"3": "production", "5": "healthy"},
  694. "timestamp": ts,
  695. "type": "d",
  696. "use_case_id": "performance",
  697. "value": [4, 5, 6],
  698. },
  699. [("mapping_sources", b"c"), ("metric_type", "d")],
  700. ),
  701. (
  702. {
  703. "version": 2,
  704. "mapping_meta": {
  705. "c": {
  706. "3": "environment",
  707. "4": "s:sessions/error@none",
  708. "5": "session.status",
  709. },
  710. },
  711. "metric_id": 4,
  712. "org_id": 1,
  713. "project_id": 3,
  714. "retention_days": 90,
  715. "tags": {"3": "production", "5": "errored"},
  716. "timestamp": ts,
  717. "type": "s",
  718. "use_case_id": "performance",
  719. "value": [3],
  720. },
  721. [("mapping_sources", b"c"), ("metric_type", "s")],
  722. ),
  723. ]
  724. def test_metric_id_rate_limited(caplog, settings):
  725. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  726. outer_message = _construct_outer_message(
  727. [
  728. (counter_payload, []),
  729. (distribution_payload, []),
  730. (set_payload, []),
  731. ]
  732. )
  733. batch = IndexerBatch(
  734. UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
  735. )
  736. assert batch.extract_strings() == (
  737. {
  738. 1: {
  739. "c:sessions/session@none",
  740. "d:sessions/duration@second",
  741. "environment",
  742. "errored",
  743. "healthy",
  744. "init",
  745. "production",
  746. "s:sessions/error@none",
  747. "session.status",
  748. }
  749. }
  750. )
  751. caplog.set_level(logging.ERROR)
  752. snuba_payloads = batch.reconstruct_messages(
  753. {
  754. 1: {
  755. "c:sessions/session@none": None,
  756. "d:sessions/duration@second": None,
  757. "environment": 3,
  758. "errored": 4,
  759. "healthy": 5,
  760. "init": 6,
  761. "production": 7,
  762. "s:sessions/error@none": 8,
  763. "session.status": 9,
  764. }
  765. },
  766. {
  767. 1: {
  768. "c:sessions/session@none": Metadata(
  769. id=None,
  770. fetch_type=FetchType.RATE_LIMITED,
  771. fetch_type_ext=FetchTypeExt(is_global=False),
  772. ),
  773. "d:sessions/duration@second": Metadata(
  774. id=None, fetch_type=FetchType.RATE_LIMITED, fetch_type_ext=None
  775. ),
  776. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  777. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  778. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  779. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  780. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  781. "s:sessions/error@none": Metadata(id=None, fetch_type=FetchType.DB_READ),
  782. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  783. }
  784. },
  785. )
  786. assert _deconstruct_messages(snuba_payloads) == [
  787. (
  788. {
  789. "mapping_meta": {
  790. "c": {"3": "environment", "7": "production", "9": "session.status"},
  791. "d": {"4": "errored", "None": "s:sessions/error@none"},
  792. },
  793. "metric_id": 8,
  794. "org_id": 1,
  795. "project_id": 3,
  796. "retention_days": 90,
  797. "tags": {"3": 7, "9": 4},
  798. "timestamp": ts,
  799. "type": "s",
  800. "use_case_id": "performance",
  801. "value": [3],
  802. },
  803. [("mapping_sources", b"cd"), ("metric_type", "s")],
  804. ),
  805. ]
  806. assert _get_string_indexer_log_records(caplog) == [
  807. (
  808. "process_messages.dropped_message",
  809. {"org_batch_size": 9, "is_global_quota": False, "string_type": "metric_id"},
  810. ),
  811. (
  812. "process_messages.dropped_message",
  813. {"org_batch_size": 9, "is_global_quota": False, "string_type": "metric_id"},
  814. ),
  815. ]
  816. def test_tag_key_rate_limited(caplog, settings):
  817. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  818. outer_message = _construct_outer_message(
  819. [
  820. (counter_payload, []),
  821. (distribution_payload, []),
  822. (set_payload, []),
  823. ]
  824. )
  825. batch = IndexerBatch(
  826. UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
  827. )
  828. assert batch.extract_strings() == (
  829. {
  830. 1: {
  831. "c:sessions/session@none",
  832. "d:sessions/duration@second",
  833. "environment",
  834. "errored",
  835. "healthy",
  836. "init",
  837. "production",
  838. "s:sessions/error@none",
  839. "session.status",
  840. }
  841. }
  842. )
  843. caplog.set_level(logging.ERROR)
  844. snuba_payloads = batch.reconstruct_messages(
  845. {
  846. 1: {
  847. "c:sessions/session@none": 1,
  848. "d:sessions/duration@second": 2,
  849. "environment": None,
  850. "errored": 4,
  851. "healthy": 5,
  852. "init": 6,
  853. "production": 7,
  854. "s:sessions/error@none": 8,
  855. "session.status": 9,
  856. }
  857. },
  858. {
  859. 1: {
  860. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  861. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  862. "environment": Metadata(
  863. id=None,
  864. fetch_type=FetchType.RATE_LIMITED,
  865. fetch_type_ext=FetchTypeExt(is_global=False),
  866. ),
  867. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  868. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  869. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  870. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  871. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  872. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  873. }
  874. },
  875. )
  876. assert _get_string_indexer_log_records(caplog) == [
  877. (
  878. "process_messages.dropped_message",
  879. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  880. ),
  881. (
  882. "process_messages.dropped_message",
  883. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  884. ),
  885. (
  886. "process_messages.dropped_message",
  887. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  888. ),
  889. ]
  890. assert _deconstruct_messages(snuba_payloads) == []
  891. def test_tag_value_rate_limited(caplog, settings):
  892. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  893. outer_message = _construct_outer_message(
  894. [
  895. (counter_payload, []),
  896. (distribution_payload, []),
  897. (set_payload, []),
  898. ]
  899. )
  900. batch = IndexerBatch(
  901. UseCaseKey.PERFORMANCE, outer_message, True, False, arroyo_input_codec=_INGEST_SCHEMA
  902. )
  903. assert batch.extract_strings() == (
  904. {
  905. 1: {
  906. "c:sessions/session@none",
  907. "d:sessions/duration@second",
  908. "environment",
  909. "errored",
  910. "healthy",
  911. "init",
  912. "production",
  913. "s:sessions/error@none",
  914. "session.status",
  915. }
  916. }
  917. )
  918. caplog.set_level(logging.ERROR)
  919. snuba_payloads = batch.reconstruct_messages(
  920. {
  921. 1: {
  922. "c:sessions/session@none": 1,
  923. "d:sessions/duration@second": 2,
  924. "environment": 3,
  925. "errored": None,
  926. "healthy": 5,
  927. "init": 6,
  928. "production": 7,
  929. "s:sessions/error@none": 8,
  930. "session.status": 9,
  931. }
  932. },
  933. {
  934. 1: {
  935. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  936. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  937. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  938. "errored": Metadata(
  939. id=None,
  940. fetch_type=FetchType.RATE_LIMITED,
  941. fetch_type_ext=FetchTypeExt(is_global=False),
  942. ),
  943. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  944. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  945. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  946. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  947. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  948. }
  949. },
  950. )
  951. assert _get_string_indexer_log_records(caplog) == [
  952. (
  953. "process_messages.dropped_message",
  954. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  955. ),
  956. ]
  957. assert _deconstruct_messages(snuba_payloads) == [
  958. (
  959. {
  960. "mapping_meta": {
  961. "c": {
  962. "1": "c:sessions/session@none",
  963. "3": "environment",
  964. "7": "production",
  965. "9": "session.status",
  966. },
  967. "h": {"6": "init"},
  968. },
  969. "metric_id": 1,
  970. "org_id": 1,
  971. "project_id": 3,
  972. "retention_days": 90,
  973. "tags": {"3": 7, "9": 6},
  974. "timestamp": ts,
  975. "type": "c",
  976. "use_case_id": "performance",
  977. "value": 1.0,
  978. },
  979. [("mapping_sources", b"ch"), ("metric_type", "c")],
  980. ),
  981. (
  982. {
  983. "mapping_meta": {
  984. "c": {
  985. "2": "d:sessions/duration@second",
  986. "3": "environment",
  987. "7": "production",
  988. "9": "session.status",
  989. },
  990. "h": {"5": "healthy"},
  991. },
  992. "metric_id": 2,
  993. "org_id": 1,
  994. "project_id": 3,
  995. "retention_days": 90,
  996. "tags": {"3": 7, "9": 5},
  997. "timestamp": ts,
  998. "type": "d",
  999. "use_case_id": "performance",
  1000. "value": [4, 5, 6],
  1001. },
  1002. [("mapping_sources", b"ch"), ("metric_type", "d")],
  1003. ),
  1004. ]
  1005. def test_one_org_limited(caplog, settings):
  1006. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1007. outer_message = _construct_outer_message(
  1008. [
  1009. (counter_payload, []),
  1010. ({**distribution_payload, "org_id": 2}, []),
  1011. ]
  1012. )
  1013. batch = IndexerBatch(
  1014. UseCaseKey.PERFORMANCE,
  1015. outer_message,
  1016. True,
  1017. False,
  1018. arroyo_input_codec=_INGEST_SCHEMA,
  1019. )
  1020. assert batch.extract_strings() == (
  1021. {
  1022. 1: {
  1023. "c:sessions/session@none",
  1024. "environment",
  1025. "init",
  1026. "production",
  1027. "session.status",
  1028. },
  1029. 2: {
  1030. "d:sessions/duration@second",
  1031. "environment",
  1032. "healthy",
  1033. "production",
  1034. "session.status",
  1035. },
  1036. }
  1037. )
  1038. caplog.set_level(logging.ERROR)
  1039. snuba_payloads = batch.reconstruct_messages(
  1040. {
  1041. 1: {
  1042. "c:sessions/session@none": 1,
  1043. "environment": None,
  1044. "init": 3,
  1045. "production": 4,
  1046. "session.status": 5,
  1047. },
  1048. 2: {
  1049. "d:sessions/duration@second": 1,
  1050. "environment": 2,
  1051. "healthy": 3,
  1052. "production": 4,
  1053. "session.status": 5,
  1054. },
  1055. },
  1056. {
  1057. 1: {
  1058. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1059. "environment": Metadata(
  1060. id=None,
  1061. fetch_type=FetchType.RATE_LIMITED,
  1062. fetch_type_ext=FetchTypeExt(is_global=False),
  1063. ),
  1064. "init": Metadata(id=3, fetch_type=FetchType.HARDCODED),
  1065. "production": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1066. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1067. },
  1068. 2: {
  1069. "d:sessions/duration@second": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1070. "environment": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1071. "healthy": Metadata(id=3, fetch_type=FetchType.HARDCODED),
  1072. "production": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1073. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1074. },
  1075. },
  1076. )
  1077. assert _get_string_indexer_log_records(caplog) == [
  1078. (
  1079. "process_messages.dropped_message",
  1080. {"num_global_quotas": 0, "org_batch_size": 5, "string_type": "tags"},
  1081. ),
  1082. ]
  1083. assert _deconstruct_messages(snuba_payloads) == [
  1084. (
  1085. {
  1086. "mapping_meta": {
  1087. "c": {
  1088. "1": "d:sessions/duration@second",
  1089. "2": "environment",
  1090. "4": "production",
  1091. "5": "session.status",
  1092. },
  1093. "h": {"3": "healthy"},
  1094. },
  1095. "metric_id": 1,
  1096. "org_id": 2,
  1097. "project_id": 3,
  1098. "retention_days": 90,
  1099. "tags": {"2": 4, "5": 3},
  1100. "timestamp": ts,
  1101. "type": "d",
  1102. "use_case_id": "performance",
  1103. "value": [4, 5, 6],
  1104. },
  1105. [("mapping_sources", b"ch"), ("metric_type", "d")],
  1106. ),
  1107. ]
  1108. def test_cardinality_limiter(caplog, settings):
  1109. """
  1110. Test functionality of the indexer batch related to cardinality-limiting. More concretely, assert that `IndexerBatch.filter_messages`:
  1111. 1. removes the messages from the outgoing batch
  1112. 2. prevents strings from filtered messages from being extracted & indexed
  1113. 3. does not crash when strings from filtered messages are not passed into reconstruct_messages
  1114. 4. still extracts strings that exist both in filtered and unfiltered messages (eg "environment")
  1115. """
  1116. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1117. outer_message = _construct_outer_message(
  1118. [
  1119. (counter_payload, []),
  1120. (distribution_payload, []),
  1121. (set_payload, []),
  1122. ]
  1123. )
  1124. batch = IndexerBatch(
  1125. UseCaseKey.PERFORMANCE,
  1126. outer_message,
  1127. True,
  1128. False,
  1129. arroyo_input_codec=_INGEST_SCHEMA,
  1130. )
  1131. keys_to_remove = list(batch.parsed_payloads_by_offset)[:2]
  1132. # the messages come in a certain order, and Python dictionaries preserve
  1133. # their insertion order. So we can hardcode offsets here.
  1134. assert keys_to_remove == [
  1135. PartitionIdxOffset(partition_idx=0, offset=0),
  1136. PartitionIdxOffset(partition_idx=0, offset=1),
  1137. ]
  1138. batch.filter_messages(keys_to_remove)
  1139. assert batch.extract_strings() == {
  1140. 1: {
  1141. "environment",
  1142. "errored",
  1143. "production",
  1144. # Note, we only extracted one MRI, of the one metric that we didn't
  1145. # drop
  1146. "s:sessions/error@none",
  1147. "session.status",
  1148. },
  1149. }
  1150. snuba_payloads = batch.reconstruct_messages(
  1151. {
  1152. 1: {
  1153. "environment": 1,
  1154. "errored": 2,
  1155. "production": 3,
  1156. "s:sessions/error@none": 4,
  1157. "session.status": 5,
  1158. },
  1159. },
  1160. {
  1161. 1: {
  1162. "environment": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1163. "errored": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1164. "production": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1165. "s:sessions/error@none": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1166. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1167. }
  1168. },
  1169. )
  1170. assert _deconstruct_messages(snuba_payloads) == [
  1171. (
  1172. {
  1173. "mapping_meta": {
  1174. "c": {
  1175. "1": "environment",
  1176. "2": "errored",
  1177. "3": "production",
  1178. "4": "s:sessions/error@none",
  1179. "5": "session.status",
  1180. },
  1181. },
  1182. "metric_id": 4,
  1183. "org_id": 1,
  1184. "project_id": 3,
  1185. "retention_days": 90,
  1186. "tags": {"1": 3, "5": 2},
  1187. "timestamp": ts,
  1188. "type": "s",
  1189. "use_case_id": "performance",
  1190. "value": [3],
  1191. },
  1192. [
  1193. ("mapping_sources", b"c"),
  1194. ("metric_type", "s"),
  1195. ],
  1196. )
  1197. ]