test_batch.py 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064
  1. import logging
  2. from collections.abc import MutableMapping
  3. from datetime import datetime, timezone
  4. from unittest.mock import patch
  5. import pytest
  6. import sentry_kafka_schemas
  7. from arroyo.backends.kafka import KafkaPayload
  8. from arroyo.types import BrokerValue, Message, Partition, Topic, Value
  9. from sentry.sentry_metrics.aggregation_option_registry import AggregationOption
  10. from sentry.sentry_metrics.configuration import (
  11. GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME,
  12. RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME,
  13. )
  14. from sentry.sentry_metrics.consumers.indexer.batch import IndexerBatch
  15. from sentry.sentry_metrics.consumers.indexer.common import BrokerMeta
  16. from sentry.sentry_metrics.consumers.indexer.processing import INGEST_CODEC
  17. from sentry.sentry_metrics.consumers.indexer.schema_validator import MetricsSchemaValidator
  18. from sentry.sentry_metrics.consumers.indexer.tags_validator import (
  19. GenericMetricsTagsValidator,
  20. ReleaseHealthTagsValidator,
  21. )
  22. from sentry.sentry_metrics.indexer.base import FetchType, FetchTypeExt, Metadata
  23. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  24. from sentry.snuba.metrics.naming_layer.mri import SessionMRI, TransactionMRI
  25. from sentry.testutils.helpers.options import override_options
  26. from sentry.utils import json
  27. MOCK_METRIC_ID_AGG_OPTION = {
  28. "d:transactions/measurements.fcp@millisecond": AggregationOption.HIST,
  29. "d:transactions/measurements.lcp@millisecond": AggregationOption.HIST,
  30. "d:transactions/alert@none": AggregationOption.TEN_SECOND,
  31. }
  32. MOCK_USE_CASE_AGG_OPTION = {UseCaseID.TRANSACTIONS: AggregationOption.TEN_SECOND}
  33. pytestmark = pytest.mark.sentry_metrics
  34. BROKER_TIMESTAMP = datetime.now(tz=timezone.utc)
  35. ts = int(datetime.now(tz=timezone.utc).timestamp())
  36. counter_payload = {
  37. "name": SessionMRI.RAW_SESSION.value,
  38. "tags": {
  39. "environment": "production",
  40. "session.status": "init",
  41. },
  42. "timestamp": ts,
  43. "type": "c",
  44. "value": 1,
  45. "org_id": 1,
  46. "retention_days": 90,
  47. "project_id": 3,
  48. }
  49. counter_headers = [("namespace", b"sessions")]
  50. distribution_payload = {
  51. "name": SessionMRI.RAW_DURATION.value,
  52. "tags": {
  53. "environment": "production",
  54. "session.status": "healthy",
  55. },
  56. "timestamp": ts,
  57. "type": "d",
  58. "value": [4, 5, 6],
  59. "org_id": 1,
  60. "retention_days": 90,
  61. "project_id": 3,
  62. }
  63. distribution_headers = [("namespace", b"sessions")]
  64. set_payload = {
  65. "name": SessionMRI.RAW_ERROR.value,
  66. "tags": {
  67. "environment": "production",
  68. "session.status": "errored",
  69. },
  70. "timestamp": ts,
  71. "type": "s",
  72. "value": [3],
  73. "org_id": 1,
  74. "retention_days": 90,
  75. "project_id": 3,
  76. }
  77. set_headers = [("namespace", b"sessions")]
  78. extracted_string_output = {
  79. UseCaseID.SESSIONS: {
  80. 1: {
  81. "c:sessions/session@none",
  82. "d:sessions/duration@second",
  83. "environment",
  84. "errored",
  85. "healthy",
  86. "init",
  87. "production",
  88. "s:sessions/error@none",
  89. "session.status",
  90. }
  91. }
  92. }
  93. def _construct_messages(payloads):
  94. message_batch = []
  95. for i, (payload, headers) in enumerate(payloads):
  96. message_batch.append(
  97. Message(
  98. BrokerValue(
  99. KafkaPayload(None, json.dumps(payload).encode("utf-8"), headers or []),
  100. Partition(Topic("topic"), 0),
  101. i,
  102. BROKER_TIMESTAMP,
  103. )
  104. )
  105. )
  106. return message_batch
  107. def _construct_outer_message(payloads):
  108. message_batch = _construct_messages(payloads)
  109. # the outer message uses the last message's partition, offset, and timestamp
  110. last = message_batch[-1]
  111. outer_message = Message(Value(message_batch, last.committable))
  112. return outer_message
  113. def _deconstruct_messages(snuba_messages, kafka_logical_topic="snuba-metrics"):
  114. """
  115. Convert a list of messages returned by `reconstruct_messages` into python
  116. primitives, to run assertions on:
  117. assert _deconstruct_messages(batch.reconstruct_messages(...)) == [ ... ]
  118. This is slightly nicer to work with than:
  119. assert batch.reconstruct_messages(...) == _construct_messages([ ... ])
  120. ...because pytest's assertion diffs work better with python primitives.
  121. """
  122. rv = []
  123. codec = sentry_kafka_schemas.get_codec(kafka_logical_topic)
  124. for msg in snuba_messages:
  125. decoded = codec.decode(msg.payload.value, validate=True)
  126. rv.append((decoded, msg.payload.headers))
  127. return rv
  128. def _deconstruct_routing_messages(snuba_messages):
  129. """
  130. Similar to `_deconstruct_messages`, but for routing messages.
  131. """
  132. all_messages = []
  133. for msg in snuba_messages:
  134. headers: MutableMapping[str, str] = {}
  135. for key, value in msg.payload.routing_header.items():
  136. headers.update({key: value})
  137. payload = json.loads(msg.payload.routing_message.value.decode("utf-8"))
  138. all_messages.append((headers, payload, msg.payload.routing_message.headers))
  139. return all_messages
  140. def _get_string_indexer_log_records(caplog):
  141. """
  142. Get all log records and relevant extra arguments for easy snapshotting.
  143. """
  144. return [
  145. (
  146. rec.message,
  147. {
  148. k: v
  149. for k, v in rec.__dict__.items()
  150. if k
  151. in (
  152. "string_type",
  153. "is_global_quota",
  154. "num_global_quotas",
  155. "num_global_quotas",
  156. "org_batch_size",
  157. )
  158. },
  159. )
  160. for rec in caplog.records
  161. ]
  162. @pytest.mark.django_db
  163. @pytest.mark.parametrize(
  164. "should_index_tag_values, expected",
  165. [
  166. pytest.param(
  167. True,
  168. {
  169. UseCaseID.SESSIONS: {
  170. 1: {
  171. "c:sessions/session@none",
  172. "d:sessions/duration@second",
  173. "environment",
  174. "errored",
  175. "healthy",
  176. "init",
  177. "production",
  178. "s:sessions/error@none",
  179. "session.status",
  180. },
  181. }
  182. },
  183. id="index tag values true",
  184. ),
  185. pytest.param(
  186. False,
  187. {
  188. UseCaseID.SESSIONS: {
  189. 1: {
  190. "c:sessions/session@none",
  191. "d:sessions/duration@second",
  192. "environment",
  193. "s:sessions/error@none",
  194. "session.status",
  195. },
  196. }
  197. },
  198. id="index tag values false",
  199. ),
  200. ],
  201. )
  202. def test_extract_strings_with_rollout(should_index_tag_values, expected):
  203. """
  204. Test that the indexer batch extracts the correct strings from the messages
  205. based on whether tag values should be indexed or not.
  206. """
  207. outer_message = _construct_outer_message(
  208. [
  209. (counter_payload, counter_headers),
  210. (distribution_payload, distribution_headers),
  211. (set_payload, set_headers),
  212. ]
  213. )
  214. batch = IndexerBatch(
  215. outer_message,
  216. should_index_tag_values,
  217. False,
  218. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  219. schema_validator=MetricsSchemaValidator(
  220. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  221. ).validate,
  222. )
  223. assert batch.extract_strings() == expected
  224. assert not batch.invalid_msg_meta
  225. @pytest.mark.django_db
  226. def test_extract_strings_with_multiple_use_case_ids():
  227. """
  228. Verify that the extract string method can handle payloads that has multiple
  229. (generic) uses cases
  230. """
  231. counter_payload = {
  232. "name": "c:spans/session@none",
  233. "tags": {
  234. "environment": "production",
  235. "session.status": "init",
  236. },
  237. "timestamp": ts,
  238. "type": "c",
  239. "value": 1,
  240. "org_id": 1,
  241. "retention_days": 90,
  242. "project_id": 3,
  243. }
  244. distribution_payload = {
  245. "name": "d:escalating_issues/duration@second",
  246. "tags": {
  247. "environment": "production",
  248. "session.status": "healthy",
  249. },
  250. "timestamp": ts,
  251. "type": "d",
  252. "value": [4, 5, 6],
  253. "org_id": 1,
  254. "retention_days": 90,
  255. "project_id": 3,
  256. }
  257. set_payload = {
  258. "name": "s:escalating_issues/error@none",
  259. "tags": {
  260. "environment": "production",
  261. "session.status": "errored",
  262. },
  263. "timestamp": ts,
  264. "type": "s",
  265. "value": [3],
  266. "org_id": 1,
  267. "retention_days": 90,
  268. "project_id": 3,
  269. }
  270. outer_message = _construct_outer_message(
  271. [
  272. (counter_payload, [("namespace", b"spans")]),
  273. (distribution_payload, [("namespace", b"escalating_issues")]),
  274. (set_payload, [("namespace", b"escalating_issues")]),
  275. ]
  276. )
  277. batch = IndexerBatch(
  278. outer_message,
  279. True,
  280. False,
  281. tags_validator=GenericMetricsTagsValidator().is_allowed,
  282. schema_validator=MetricsSchemaValidator(
  283. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  284. ).validate,
  285. )
  286. assert batch.extract_strings() == {
  287. UseCaseID.SPANS: {
  288. 1: {
  289. "c:spans/session@none",
  290. "environment",
  291. "production",
  292. "session.status",
  293. "init",
  294. }
  295. },
  296. UseCaseID.ESCALATING_ISSUES: {
  297. 1: {
  298. "d:escalating_issues/duration@second",
  299. "environment",
  300. "production",
  301. "session.status",
  302. "healthy",
  303. "s:escalating_issues/error@none",
  304. "environment",
  305. "production",
  306. "session.status",
  307. "errored",
  308. }
  309. },
  310. }
  311. @pytest.mark.django_db
  312. @override_options({"sentry-metrics.indexer.disabled-namespaces": ["escalating_issues"]})
  313. def test_extract_strings_with_single_use_case_ids_blocked():
  314. """
  315. Verify that the extract string method will work normally when a single use case ID is blocked
  316. """
  317. counter_payload = {
  318. "name": "c:spans/session@none",
  319. "tags": {
  320. "environment": "production",
  321. "session.status": "init",
  322. },
  323. "timestamp": ts,
  324. "type": "c",
  325. "value": 1,
  326. "org_id": 1,
  327. "retention_days": 90,
  328. "project_id": 3,
  329. }
  330. distribution_payload = {
  331. "name": "d:escalating_issues/duration@second",
  332. "tags": {
  333. "environment": "production",
  334. "session.status": "healthy",
  335. },
  336. "timestamp": ts,
  337. "type": "d",
  338. "value": [4, 5, 6],
  339. "org_id": 1,
  340. "retention_days": 90,
  341. "project_id": 3,
  342. }
  343. set_payload = {
  344. "name": "s:escalating_issues/error@none",
  345. "tags": {
  346. "environment": "production",
  347. "session.status": "errored",
  348. },
  349. "timestamp": ts,
  350. "type": "s",
  351. "value": [3],
  352. "org_id": 1,
  353. "retention_days": 90,
  354. "project_id": 3,
  355. }
  356. outer_message = _construct_outer_message(
  357. [
  358. (counter_payload, [("namespace", b"spans")]),
  359. (distribution_payload, [("namespace", b"escalating_issues")]),
  360. (set_payload, [("namespace", b"escalating_issues")]),
  361. ]
  362. )
  363. batch = IndexerBatch(
  364. outer_message,
  365. True,
  366. False,
  367. tags_validator=GenericMetricsTagsValidator().is_allowed,
  368. schema_validator=MetricsSchemaValidator(
  369. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  370. ).validate,
  371. )
  372. assert batch.extract_strings() == {
  373. UseCaseID.SPANS: {
  374. 1: {
  375. "c:spans/session@none",
  376. "environment",
  377. "production",
  378. "session.status",
  379. "init",
  380. }
  381. }
  382. }
  383. assert not batch.invalid_msg_meta
  384. @pytest.mark.django_db
  385. @override_options({"sentry-metrics.indexer.disabled-namespaces": ["spans", "escalating_issues"]})
  386. def test_extract_strings_with_multiple_use_case_ids_blocked():
  387. """
  388. Verify that the extract string method will work normally when multiple use case IDs are blocked
  389. """
  390. custom_uc_counter_payload = {
  391. "name": "c:spans/session@none",
  392. "tags": {
  393. "environment": "production",
  394. "session.status": "init",
  395. },
  396. "timestamp": ts,
  397. "type": "c",
  398. "value": 1,
  399. "org_id": 1,
  400. "retention_days": 90,
  401. "project_id": 3,
  402. }
  403. perf_distribution_payload = {
  404. "name": TransactionMRI.MEASUREMENTS_FCP.value,
  405. "tags": {
  406. "environment": "production",
  407. "session.status": "healthy",
  408. },
  409. "timestamp": ts,
  410. "type": "d",
  411. "value": [4, 5, 6],
  412. "org_id": 1,
  413. "retention_days": 90,
  414. "project_id": 3,
  415. }
  416. custom_uc_set_payload = {
  417. "name": "s:escalating_issues/error@none",
  418. "tags": {
  419. "environment": "production",
  420. "session.status": "errored",
  421. },
  422. "timestamp": ts,
  423. "type": "s",
  424. "value": [3],
  425. "org_id": 2,
  426. "retention_days": 90,
  427. "project_id": 3,
  428. }
  429. outer_message = _construct_outer_message(
  430. [
  431. (custom_uc_counter_payload, [("namespace", b"spans")]),
  432. (perf_distribution_payload, [("namespace", b"transactions")]),
  433. (custom_uc_set_payload, [("namespace", b"escalating_issues")]),
  434. ]
  435. )
  436. batch = IndexerBatch(
  437. outer_message,
  438. True,
  439. False,
  440. tags_validator=GenericMetricsTagsValidator().is_allowed,
  441. schema_validator=MetricsSchemaValidator(
  442. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  443. ).validate,
  444. )
  445. assert batch.extract_strings() == {
  446. UseCaseID.TRANSACTIONS: {
  447. 1: {
  448. TransactionMRI.MEASUREMENTS_FCP.value,
  449. "environment",
  450. "production",
  451. "session.status",
  452. "healthy",
  453. }
  454. },
  455. }
  456. assert not batch.invalid_msg_meta
  457. @pytest.mark.django_db
  458. def test_extract_strings_with_invalid_mri():
  459. """
  460. Verify that extract strings will drop payload that has invalid MRI in name field but continue processing the rest
  461. """
  462. bad_counter_payload = {
  463. "name": "invalid_MRI",
  464. "tags": {
  465. "environment": "production",
  466. "session.status": "init",
  467. },
  468. "timestamp": ts,
  469. "type": "c",
  470. "value": 1,
  471. "org_id": 100,
  472. "retention_days": 90,
  473. "project_id": 3,
  474. }
  475. counter_payload = {
  476. "name": "c:spans/session@none",
  477. "tags": {
  478. "environment": "production",
  479. "session.status": "init",
  480. },
  481. "timestamp": ts,
  482. "type": "c",
  483. "value": 1,
  484. "org_id": 1,
  485. "retention_days": 90,
  486. "project_id": 3,
  487. }
  488. distribution_payload = {
  489. "name": "d:escalating_issues/duration@second",
  490. "tags": {
  491. "environment": "production",
  492. "session.status": "healthy",
  493. },
  494. "timestamp": ts,
  495. "type": "d",
  496. "value": [4, 5, 6],
  497. "org_id": 1,
  498. "retention_days": 90,
  499. "project_id": 3,
  500. }
  501. set_payload = {
  502. "name": "s:escalating_issues/error@none",
  503. "tags": {
  504. "environment": "production",
  505. "session.status": "errored",
  506. },
  507. "timestamp": ts,
  508. "type": "s",
  509. "value": [3],
  510. "org_id": 1,
  511. "retention_days": 90,
  512. "project_id": 3,
  513. }
  514. outer_message = _construct_outer_message(
  515. [
  516. (bad_counter_payload, [("namespace", b"")]),
  517. (counter_payload, [("namespace", b"spans")]),
  518. (distribution_payload, [("namespace", b"escalating_issues")]),
  519. (set_payload, [("namespace", b"escalating_issues")]),
  520. ]
  521. )
  522. batch = IndexerBatch(
  523. outer_message,
  524. True,
  525. False,
  526. tags_validator=GenericMetricsTagsValidator().is_allowed,
  527. schema_validator=MetricsSchemaValidator(
  528. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  529. ).validate,
  530. )
  531. assert batch.extract_strings() == {
  532. UseCaseID.SPANS: {
  533. 1: {
  534. "c:spans/session@none",
  535. "environment",
  536. "production",
  537. "session.status",
  538. "init",
  539. }
  540. },
  541. UseCaseID.ESCALATING_ISSUES: {
  542. 1: {
  543. "d:escalating_issues/duration@second",
  544. "environment",
  545. "production",
  546. "session.status",
  547. "healthy",
  548. "s:escalating_issues/error@none",
  549. "environment",
  550. "production",
  551. "session.status",
  552. "errored",
  553. }
  554. },
  555. }
  556. assert batch.invalid_msg_meta == {BrokerMeta(Partition(Topic("topic"), 0), 0)}
  557. @pytest.mark.django_db
  558. def test_extract_strings_with_multiple_use_case_ids_and_org_ids():
  559. """
  560. Verify that the extract string method can handle payloads that has multiple
  561. (generic) uses cases and from different orgs
  562. """
  563. custom_uc_counter_payload = {
  564. "name": "c:spans/session@none",
  565. "tags": {
  566. "environment": "production",
  567. "session.status": "init",
  568. },
  569. "timestamp": ts,
  570. "type": "c",
  571. "value": 1,
  572. "org_id": 1,
  573. "retention_days": 90,
  574. "project_id": 3,
  575. }
  576. perf_distribution_payload = {
  577. "name": TransactionMRI.MEASUREMENTS_FCP.value,
  578. "tags": {
  579. "environment": "production",
  580. "session.status": "healthy",
  581. },
  582. "timestamp": ts,
  583. "type": "d",
  584. "value": [4, 5, 6],
  585. "org_id": 1,
  586. "retention_days": 90,
  587. "project_id": 3,
  588. }
  589. custom_uc_set_payload = {
  590. "name": "s:spans/error@none",
  591. "tags": {
  592. "environment": "production",
  593. "session.status": "errored",
  594. },
  595. "timestamp": ts,
  596. "type": "s",
  597. "value": [3],
  598. "org_id": 2,
  599. "retention_days": 90,
  600. "project_id": 3,
  601. }
  602. outer_message = _construct_outer_message(
  603. [
  604. (custom_uc_counter_payload, [("namespace", b"spans")]),
  605. (perf_distribution_payload, [("namespace", b"transactions")]),
  606. (custom_uc_set_payload, [("namespace", b"spans")]),
  607. ]
  608. )
  609. batch = IndexerBatch(
  610. outer_message,
  611. True,
  612. False,
  613. tags_validator=GenericMetricsTagsValidator().is_allowed,
  614. schema_validator=MetricsSchemaValidator(
  615. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  616. ).validate,
  617. )
  618. assert batch.extract_strings() == {
  619. UseCaseID.SPANS: {
  620. 1: {
  621. "c:spans/session@none",
  622. "environment",
  623. "production",
  624. "session.status",
  625. "init",
  626. },
  627. 2: {
  628. "s:spans/error@none",
  629. "environment",
  630. "production",
  631. "session.status",
  632. "errored",
  633. },
  634. },
  635. UseCaseID.TRANSACTIONS: {
  636. 1: {
  637. TransactionMRI.MEASUREMENTS_FCP.value,
  638. "environment",
  639. "production",
  640. "session.status",
  641. "healthy",
  642. }
  643. },
  644. }
  645. assert not batch.invalid_msg_meta
  646. @pytest.mark.django_db
  647. @patch(
  648. "sentry.sentry_metrics.aggregation_option_registry.METRIC_ID_AGG_OPTION",
  649. MOCK_METRIC_ID_AGG_OPTION,
  650. )
  651. @patch(
  652. "sentry.sentry_metrics.aggregation_option_registry.USE_CASE_AGG_OPTION",
  653. MOCK_USE_CASE_AGG_OPTION,
  654. )
  655. @override_options({"sentry-metrics.10s-granularity": True})
  656. def test_resolved_with_aggregation_options(caplog, settings):
  657. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  658. counter_metric_id = "c:transactions/alert@none"
  659. dist_metric_id = "d:transactions/measurements.fcp@millisecond"
  660. set_metric_id = "s:transactions/on_demand@none"
  661. outer_message = _construct_outer_message(
  662. [
  663. (
  664. {
  665. **counter_payload,
  666. "name": counter_metric_id,
  667. },
  668. [],
  669. ),
  670. ({**distribution_payload, "name": dist_metric_id}, []),
  671. ({**set_payload, "name": set_metric_id}, []),
  672. ]
  673. )
  674. batch = IndexerBatch(
  675. outer_message,
  676. False,
  677. False,
  678. tags_validator=GenericMetricsTagsValidator().is_allowed,
  679. schema_validator=MetricsSchemaValidator(
  680. INGEST_CODEC, GENERIC_METRICS_SCHEMA_VALIDATION_RULES_OPTION_NAME
  681. ).validate,
  682. )
  683. assert batch.extract_strings() == (
  684. {
  685. UseCaseID.TRANSACTIONS: {
  686. 1: {
  687. counter_metric_id,
  688. dist_metric_id,
  689. "environment",
  690. set_metric_id,
  691. "session.status",
  692. }
  693. }
  694. }
  695. )
  696. assert not batch.invalid_msg_meta
  697. caplog.set_level(logging.ERROR)
  698. snuba_payloads = batch.reconstruct_messages(
  699. {
  700. UseCaseID.TRANSACTIONS: {
  701. 1: {
  702. counter_metric_id: 1,
  703. dist_metric_id: 2,
  704. "environment": 3,
  705. set_metric_id: 8,
  706. "session.status": 9,
  707. }
  708. }
  709. },
  710. {
  711. UseCaseID.TRANSACTIONS: {
  712. 1: {
  713. counter_metric_id: Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  714. dist_metric_id: Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  715. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  716. set_metric_id: Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  717. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  718. }
  719. },
  720. },
  721. ).data
  722. assert _get_string_indexer_log_records(caplog) == []
  723. assert _deconstruct_messages(snuba_payloads, kafka_logical_topic="snuba-generic-metrics") == [
  724. (
  725. {
  726. "mapping_meta": {
  727. "c": {
  728. "1": counter_metric_id,
  729. "3": "environment",
  730. "9": "session.status",
  731. },
  732. },
  733. "metric_id": 1,
  734. "org_id": 1,
  735. "project_id": 3,
  736. "retention_days": 90,
  737. "tags": {"3": "production", "9": "init"},
  738. "timestamp": ts,
  739. "type": "c",
  740. "use_case_id": "transactions",
  741. "value": 1.0,
  742. "aggregation_option": AggregationOption.TEN_SECOND.value,
  743. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  744. "version": 2,
  745. },
  746. [("mapping_sources", b"c"), ("metric_type", "c")],
  747. ),
  748. (
  749. {
  750. "mapping_meta": {
  751. "c": {
  752. "2": dist_metric_id,
  753. "3": "environment",
  754. "9": "session.status",
  755. },
  756. },
  757. "metric_id": 2,
  758. "org_id": 1,
  759. "project_id": 3,
  760. "retention_days": 90,
  761. "tags": {"3": "production", "9": "healthy"},
  762. "timestamp": ts,
  763. "type": "d",
  764. "use_case_id": "transactions",
  765. "value": [4, 5, 6],
  766. "aggregation_option": AggregationOption.HIST.value,
  767. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  768. "version": 2,
  769. },
  770. [("mapping_sources", b"c"), ("metric_type", "d")],
  771. ),
  772. (
  773. {
  774. "mapping_meta": {
  775. "c": {
  776. "3": "environment",
  777. "8": set_metric_id,
  778. "9": "session.status",
  779. },
  780. },
  781. "metric_id": 8,
  782. "org_id": 1,
  783. "project_id": 3,
  784. "retention_days": 90,
  785. "tags": {"3": "production", "9": "errored"},
  786. "timestamp": ts,
  787. "type": "s",
  788. "use_case_id": "transactions",
  789. "value": [3],
  790. "aggregation_option": AggregationOption.TEN_SECOND.value,
  791. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  792. "version": 2,
  793. },
  794. [("mapping_sources", b"c"), ("metric_type", "s")],
  795. ),
  796. ]
  797. @pytest.mark.django_db
  798. def test_all_resolved(caplog, settings):
  799. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  800. outer_message = _construct_outer_message(
  801. [
  802. (counter_payload, counter_headers),
  803. (distribution_payload, distribution_headers),
  804. (set_payload, set_headers),
  805. ]
  806. )
  807. batch = IndexerBatch(
  808. outer_message,
  809. True,
  810. False,
  811. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  812. schema_validator=MetricsSchemaValidator(
  813. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  814. ).validate,
  815. )
  816. assert batch.extract_strings() == (
  817. {
  818. UseCaseID.SESSIONS: {
  819. 1: {
  820. "c:sessions/session@none",
  821. "d:sessions/duration@second",
  822. "environment",
  823. "errored",
  824. "healthy",
  825. "init",
  826. "production",
  827. "s:sessions/error@none",
  828. "session.status",
  829. }
  830. }
  831. }
  832. )
  833. assert not batch.invalid_msg_meta
  834. caplog.set_level(logging.ERROR)
  835. snuba_payloads = batch.reconstruct_messages(
  836. {
  837. UseCaseID.SESSIONS: {
  838. 1: {
  839. "c:sessions/session@none": 1,
  840. "d:sessions/duration@second": 2,
  841. "environment": 3,
  842. "errored": 4,
  843. "healthy": 5,
  844. "init": 6,
  845. "production": 7,
  846. "s:sessions/error@none": 8,
  847. "session.status": 9,
  848. }
  849. }
  850. },
  851. {
  852. UseCaseID.SESSIONS: {
  853. 1: {
  854. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  855. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  856. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  857. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  858. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  859. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  860. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  861. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  862. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  863. }
  864. },
  865. },
  866. ).data
  867. assert _get_string_indexer_log_records(caplog) == []
  868. assert _deconstruct_messages(snuba_payloads) == [
  869. (
  870. {
  871. "mapping_meta": {
  872. "c": {
  873. "1": "c:sessions/session@none",
  874. "3": "environment",
  875. "7": "production",
  876. "9": "session.status",
  877. },
  878. "h": {"6": "init"},
  879. },
  880. "metric_id": 1,
  881. "org_id": 1,
  882. "project_id": 3,
  883. "retention_days": 90,
  884. "tags": {"3": 7, "9": 6},
  885. "timestamp": ts,
  886. "type": "c",
  887. "use_case_id": "sessions",
  888. "value": 1.0,
  889. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  890. },
  891. [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
  892. ),
  893. (
  894. {
  895. "mapping_meta": {
  896. "c": {
  897. "2": "d:sessions/duration@second",
  898. "3": "environment",
  899. "7": "production",
  900. "9": "session.status",
  901. },
  902. "h": {"5": "healthy"},
  903. },
  904. "metric_id": 2,
  905. "org_id": 1,
  906. "project_id": 3,
  907. "retention_days": 90,
  908. "tags": {"3": 7, "9": 5},
  909. "timestamp": ts,
  910. "type": "d",
  911. "use_case_id": "sessions",
  912. "value": [4, 5, 6],
  913. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  914. },
  915. [*distribution_headers, ("mapping_sources", b"ch"), ("metric_type", "d")],
  916. ),
  917. (
  918. {
  919. "mapping_meta": {
  920. "c": {
  921. "3": "environment",
  922. "7": "production",
  923. "8": "s:sessions/error@none",
  924. "9": "session.status",
  925. },
  926. "d": {"4": "errored"},
  927. },
  928. "metric_id": 8,
  929. "org_id": 1,
  930. "project_id": 3,
  931. "retention_days": 90,
  932. "tags": {"3": 7, "9": 4},
  933. "timestamp": ts,
  934. "type": "s",
  935. "use_case_id": "sessions",
  936. "value": [3],
  937. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  938. },
  939. [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
  940. ),
  941. ]
  942. @pytest.mark.django_db
  943. def test_all_resolved_with_routing_information(caplog, settings):
  944. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  945. outer_message = _construct_outer_message(
  946. [
  947. (counter_payload, counter_headers),
  948. (distribution_payload, distribution_headers),
  949. (set_payload, set_headers),
  950. ]
  951. )
  952. batch = IndexerBatch(
  953. outer_message,
  954. True,
  955. True,
  956. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  957. schema_validator=MetricsSchemaValidator(
  958. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  959. ).validate,
  960. )
  961. assert batch.extract_strings() == (
  962. {
  963. UseCaseID.SESSIONS: {
  964. 1: {
  965. "c:sessions/session@none",
  966. "d:sessions/duration@second",
  967. "environment",
  968. "errored",
  969. "healthy",
  970. "init",
  971. "production",
  972. "s:sessions/error@none",
  973. "session.status",
  974. }
  975. }
  976. }
  977. )
  978. caplog.set_level(logging.ERROR)
  979. snuba_payloads = batch.reconstruct_messages(
  980. {
  981. UseCaseID.SESSIONS: {
  982. 1: {
  983. "c:sessions/session@none": 1,
  984. "d:sessions/duration@second": 2,
  985. "environment": 3,
  986. "errored": 4,
  987. "healthy": 5,
  988. "init": 6,
  989. "production": 7,
  990. "s:sessions/error@none": 8,
  991. "session.status": 9,
  992. }
  993. }
  994. },
  995. {
  996. UseCaseID.SESSIONS: {
  997. 1: {
  998. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  999. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1000. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1001. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  1002. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  1003. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  1004. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  1005. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  1006. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  1007. }
  1008. }
  1009. },
  1010. ).data
  1011. assert _get_string_indexer_log_records(caplog) == []
  1012. assert _deconstruct_routing_messages(snuba_payloads) == [
  1013. (
  1014. {"org_id": 1},
  1015. {
  1016. "mapping_meta": {
  1017. "c": {
  1018. "1": "c:sessions/session@none",
  1019. "3": "environment",
  1020. "7": "production",
  1021. "9": "session.status",
  1022. },
  1023. "h": {"6": "init"},
  1024. },
  1025. "metric_id": 1,
  1026. "org_id": 1,
  1027. "project_id": 3,
  1028. "retention_days": 90,
  1029. "tags": {"3": 7, "9": 6},
  1030. "timestamp": ts,
  1031. "type": "c",
  1032. "use_case_id": "sessions",
  1033. "value": 1.0,
  1034. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1035. },
  1036. [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
  1037. ),
  1038. (
  1039. {"org_id": 1},
  1040. {
  1041. "mapping_meta": {
  1042. "c": {
  1043. "2": "d:sessions/duration@second",
  1044. "3": "environment",
  1045. "7": "production",
  1046. "9": "session.status",
  1047. },
  1048. "h": {"5": "healthy"},
  1049. },
  1050. "metric_id": 2,
  1051. "org_id": 1,
  1052. "project_id": 3,
  1053. "retention_days": 90,
  1054. "tags": {"3": 7, "9": 5},
  1055. "timestamp": ts,
  1056. "type": "d",
  1057. "use_case_id": "sessions",
  1058. "value": [4, 5, 6],
  1059. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1060. },
  1061. [
  1062. *distribution_headers,
  1063. ("mapping_sources", b"ch"),
  1064. ("metric_type", "d"),
  1065. ],
  1066. ),
  1067. (
  1068. {"org_id": 1},
  1069. {
  1070. "mapping_meta": {
  1071. "c": {
  1072. "3": "environment",
  1073. "7": "production",
  1074. "8": "s:sessions/error@none",
  1075. "9": "session.status",
  1076. },
  1077. "d": {"4": "errored"},
  1078. },
  1079. "metric_id": 8,
  1080. "org_id": 1,
  1081. "project_id": 3,
  1082. "retention_days": 90,
  1083. "tags": {"3": 7, "9": 4},
  1084. "timestamp": ts,
  1085. "type": "s",
  1086. "use_case_id": "sessions",
  1087. "value": [3],
  1088. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1089. },
  1090. [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
  1091. ),
  1092. ]
  1093. @pytest.mark.django_db
  1094. def test_all_resolved_retention_days_honored(caplog, settings):
  1095. """
  1096. Tests that the indexer batch honors the incoming retention_days values
  1097. from Relay or falls back to 90.
  1098. """
  1099. distribution_payload_modified = distribution_payload.copy()
  1100. distribution_payload_modified["retention_days"] = 30
  1101. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1102. outer_message = _construct_outer_message(
  1103. [
  1104. (counter_payload, counter_headers),
  1105. (distribution_payload_modified, distribution_headers),
  1106. (set_payload, set_headers),
  1107. ]
  1108. )
  1109. batch = IndexerBatch(
  1110. outer_message,
  1111. True,
  1112. False,
  1113. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1114. schema_validator=MetricsSchemaValidator(
  1115. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1116. ).validate,
  1117. )
  1118. assert batch.extract_strings() == (
  1119. {
  1120. UseCaseID.SESSIONS: {
  1121. 1: {
  1122. "c:sessions/session@none",
  1123. "d:sessions/duration@second",
  1124. "environment",
  1125. "errored",
  1126. "healthy",
  1127. "init",
  1128. "production",
  1129. "s:sessions/error@none",
  1130. "session.status",
  1131. }
  1132. }
  1133. }
  1134. )
  1135. assert not batch.invalid_msg_meta
  1136. caplog.set_level(logging.ERROR)
  1137. snuba_payloads = batch.reconstruct_messages(
  1138. {
  1139. UseCaseID.SESSIONS: {
  1140. 1: {
  1141. "c:sessions/session@none": 1,
  1142. "d:sessions/duration@second": 2,
  1143. "environment": 3,
  1144. "errored": 4,
  1145. "healthy": 5,
  1146. "init": 6,
  1147. "production": 7,
  1148. "s:sessions/error@none": 8,
  1149. "session.status": 9,
  1150. }
  1151. }
  1152. },
  1153. {
  1154. UseCaseID.SESSIONS: {
  1155. 1: {
  1156. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1157. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1158. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1159. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  1160. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  1161. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  1162. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  1163. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  1164. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  1165. }
  1166. }
  1167. },
  1168. ).data
  1169. assert _get_string_indexer_log_records(caplog) == []
  1170. assert _deconstruct_messages(snuba_payloads) == [
  1171. (
  1172. {
  1173. "mapping_meta": {
  1174. "c": {
  1175. "1": "c:sessions/session@none",
  1176. "3": "environment",
  1177. "7": "production",
  1178. "9": "session.status",
  1179. },
  1180. "h": {"6": "init"},
  1181. },
  1182. "metric_id": 1,
  1183. "org_id": 1,
  1184. "project_id": 3,
  1185. "retention_days": 90,
  1186. "tags": {"3": 7, "9": 6},
  1187. "timestamp": ts,
  1188. "type": "c",
  1189. "use_case_id": "sessions",
  1190. "value": 1.0,
  1191. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1192. },
  1193. [*counter_headers, ("mapping_sources", b"ch"), ("metric_type", "c")],
  1194. ),
  1195. (
  1196. {
  1197. "mapping_meta": {
  1198. "c": {
  1199. "2": "d:sessions/duration@second",
  1200. "3": "environment",
  1201. "7": "production",
  1202. "9": "session.status",
  1203. },
  1204. "h": {"5": "healthy"},
  1205. },
  1206. "metric_id": 2,
  1207. "org_id": 1,
  1208. "project_id": 3,
  1209. "retention_days": 30,
  1210. "tags": {"3": 7, "9": 5},
  1211. "timestamp": ts,
  1212. "type": "d",
  1213. "use_case_id": "sessions",
  1214. "value": [4, 5, 6],
  1215. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1216. },
  1217. [*distribution_headers, ("mapping_sources", b"ch"), ("metric_type", "d")],
  1218. ),
  1219. (
  1220. {
  1221. "mapping_meta": {
  1222. "c": {
  1223. "3": "environment",
  1224. "7": "production",
  1225. "8": "s:sessions/error@none",
  1226. "9": "session.status",
  1227. },
  1228. "d": {"4": "errored"},
  1229. },
  1230. "metric_id": 8,
  1231. "org_id": 1,
  1232. "project_id": 3,
  1233. "retention_days": 90,
  1234. "tags": {"3": 7, "9": 4},
  1235. "timestamp": ts,
  1236. "type": "s",
  1237. "use_case_id": "sessions",
  1238. "value": [3],
  1239. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1240. },
  1241. [*set_headers, ("mapping_sources", b"cd"), ("metric_type", "s")],
  1242. ),
  1243. ]
  1244. @pytest.mark.django_db
  1245. def test_batch_resolve_with_values_not_indexed(caplog, settings):
  1246. """
  1247. Tests that the indexer batch skips resolving tag values for indexing and
  1248. sends the raw tag value to Snuba.
  1249. The difference between this test and test_all_resolved is that the tag values are
  1250. strings instead of integers. Because of that indexed tag keys are
  1251. different and mapping_meta is smaller. The payload also contains the
  1252. version field to specify that the tag values are not indexed.
  1253. """
  1254. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1255. outer_message = _construct_outer_message(
  1256. [
  1257. (counter_payload, counter_headers),
  1258. (distribution_payload, distribution_headers),
  1259. (set_payload, set_headers),
  1260. ]
  1261. )
  1262. batch = IndexerBatch(
  1263. outer_message,
  1264. False,
  1265. False,
  1266. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1267. schema_validator=MetricsSchemaValidator(
  1268. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1269. ).validate,
  1270. )
  1271. assert batch.extract_strings() == (
  1272. {
  1273. UseCaseID.SESSIONS: {
  1274. 1: {
  1275. "c:sessions/session@none",
  1276. "d:sessions/duration@second",
  1277. "environment",
  1278. "s:sessions/error@none",
  1279. "session.status",
  1280. }
  1281. }
  1282. }
  1283. )
  1284. assert not batch.invalid_msg_meta
  1285. caplog.set_level(logging.ERROR)
  1286. snuba_payloads = batch.reconstruct_messages(
  1287. {
  1288. UseCaseID.SESSIONS: {
  1289. 1: {
  1290. "c:sessions/session@none": 1,
  1291. "d:sessions/duration@second": 2,
  1292. "environment": 3,
  1293. "s:sessions/error@none": 4,
  1294. "session.status": 5,
  1295. }
  1296. }
  1297. },
  1298. {
  1299. UseCaseID.SESSIONS: {
  1300. 1: {
  1301. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1302. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1303. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1304. "s:sessions/error@none": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1305. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1306. }
  1307. }
  1308. },
  1309. ).data
  1310. assert _get_string_indexer_log_records(caplog) == []
  1311. assert _deconstruct_messages(snuba_payloads, kafka_logical_topic="snuba-generic-metrics") == [
  1312. (
  1313. {
  1314. "version": 2,
  1315. "mapping_meta": {
  1316. "c": {
  1317. "1": "c:sessions/session@none",
  1318. "3": "environment",
  1319. "5": "session.status",
  1320. },
  1321. },
  1322. "metric_id": 1,
  1323. "org_id": 1,
  1324. "project_id": 3,
  1325. "retention_days": 90,
  1326. "tags": {"3": "production", "5": "init"},
  1327. "timestamp": ts,
  1328. "type": "c",
  1329. "use_case_id": "sessions",
  1330. "value": 1.0,
  1331. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1332. },
  1333. [*counter_headers, ("mapping_sources", b"c"), ("metric_type", "c")],
  1334. ),
  1335. (
  1336. {
  1337. "version": 2,
  1338. "mapping_meta": {
  1339. "c": {
  1340. "2": "d:sessions/duration@second",
  1341. "3": "environment",
  1342. "5": "session.status",
  1343. },
  1344. },
  1345. "metric_id": 2,
  1346. "org_id": 1,
  1347. "project_id": 3,
  1348. "retention_days": 90,
  1349. "tags": {"3": "production", "5": "healthy"},
  1350. "timestamp": ts,
  1351. "type": "d",
  1352. "use_case_id": "sessions",
  1353. "value": [4, 5, 6],
  1354. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1355. },
  1356. [
  1357. *distribution_headers,
  1358. ("mapping_sources", b"c"),
  1359. ("metric_type", "d"),
  1360. ],
  1361. ),
  1362. (
  1363. {
  1364. "version": 2,
  1365. "mapping_meta": {
  1366. "c": {
  1367. "3": "environment",
  1368. "4": "s:sessions/error@none",
  1369. "5": "session.status",
  1370. },
  1371. },
  1372. "metric_id": 4,
  1373. "org_id": 1,
  1374. "project_id": 3,
  1375. "retention_days": 90,
  1376. "tags": {"3": "production", "5": "errored"},
  1377. "timestamp": ts,
  1378. "type": "s",
  1379. "use_case_id": "sessions",
  1380. "value": [3],
  1381. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1382. },
  1383. [
  1384. *set_headers,
  1385. ("mapping_sources", b"c"),
  1386. ("metric_type", "s"),
  1387. ],
  1388. ),
  1389. ]
  1390. @pytest.mark.django_db
  1391. def test_metric_id_rate_limited(caplog, settings):
  1392. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1393. outer_message = _construct_outer_message(
  1394. [
  1395. (counter_payload, counter_headers),
  1396. (distribution_payload, distribution_headers),
  1397. (set_payload, set_headers),
  1398. ]
  1399. )
  1400. batch = IndexerBatch(
  1401. outer_message,
  1402. True,
  1403. False,
  1404. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1405. schema_validator=MetricsSchemaValidator(
  1406. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1407. ).validate,
  1408. )
  1409. assert batch.extract_strings() == (
  1410. {
  1411. UseCaseID.SESSIONS: {
  1412. 1: {
  1413. "c:sessions/session@none",
  1414. "d:sessions/duration@second",
  1415. "environment",
  1416. "errored",
  1417. "healthy",
  1418. "init",
  1419. "production",
  1420. "s:sessions/error@none",
  1421. "session.status",
  1422. }
  1423. }
  1424. }
  1425. )
  1426. assert not batch.invalid_msg_meta
  1427. caplog.set_level(logging.ERROR)
  1428. snuba_payloads = batch.reconstruct_messages(
  1429. {
  1430. UseCaseID.SESSIONS: {
  1431. 1: {
  1432. "c:sessions/session@none": None,
  1433. "d:sessions/duration@second": None,
  1434. "environment": 3,
  1435. "errored": 4,
  1436. "healthy": 5,
  1437. "init": 6,
  1438. "production": 7,
  1439. "s:sessions/error@none": 8,
  1440. "session.status": 9,
  1441. }
  1442. }
  1443. },
  1444. {
  1445. UseCaseID.SESSIONS: {
  1446. 1: {
  1447. "c:sessions/session@none": Metadata(
  1448. id=None,
  1449. fetch_type=FetchType.RATE_LIMITED,
  1450. fetch_type_ext=FetchTypeExt(is_global=False),
  1451. ),
  1452. "d:sessions/duration@second": Metadata(
  1453. id=None, fetch_type=FetchType.RATE_LIMITED, fetch_type_ext=None
  1454. ),
  1455. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1456. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  1457. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  1458. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  1459. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  1460. "s:sessions/error@none": Metadata(id=None, fetch_type=FetchType.DB_READ),
  1461. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  1462. }
  1463. }
  1464. },
  1465. ).data
  1466. assert _deconstruct_messages(snuba_payloads) == [
  1467. (
  1468. {
  1469. "mapping_meta": {
  1470. "c": {"3": "environment", "7": "production", "9": "session.status"},
  1471. "d": {"4": "errored", "None": "s:sessions/error@none"},
  1472. },
  1473. "metric_id": 8,
  1474. "org_id": 1,
  1475. "project_id": 3,
  1476. "retention_days": 90,
  1477. "tags": {"3": 7, "9": 4},
  1478. "timestamp": ts,
  1479. "type": "s",
  1480. "use_case_id": "sessions",
  1481. "value": [3],
  1482. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1483. },
  1484. [
  1485. *set_headers,
  1486. ("mapping_sources", b"cd"),
  1487. ("metric_type", "s"),
  1488. ],
  1489. ),
  1490. ]
  1491. assert _get_string_indexer_log_records(caplog) == [
  1492. (
  1493. "process_messages.dropped_message",
  1494. {"org_batch_size": 9, "is_global_quota": False, "string_type": "metric_id"},
  1495. ),
  1496. (
  1497. "process_messages.dropped_message",
  1498. {"org_batch_size": 9, "is_global_quota": False, "string_type": "metric_id"},
  1499. ),
  1500. ]
  1501. @pytest.mark.django_db
  1502. def test_tag_key_rate_limited(caplog, settings):
  1503. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1504. outer_message = _construct_outer_message(
  1505. [
  1506. (counter_payload, counter_headers),
  1507. (distribution_payload, distribution_headers),
  1508. (set_payload, set_headers),
  1509. ]
  1510. )
  1511. batch = IndexerBatch(
  1512. outer_message,
  1513. True,
  1514. False,
  1515. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1516. schema_validator=MetricsSchemaValidator(
  1517. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1518. ).validate,
  1519. )
  1520. assert batch.extract_strings() == (
  1521. {
  1522. UseCaseID.SESSIONS: {
  1523. 1: {
  1524. "c:sessions/session@none",
  1525. "d:sessions/duration@second",
  1526. "environment",
  1527. "errored",
  1528. "healthy",
  1529. "init",
  1530. "production",
  1531. "s:sessions/error@none",
  1532. "session.status",
  1533. }
  1534. }
  1535. }
  1536. )
  1537. assert not batch.invalid_msg_meta
  1538. caplog.set_level(logging.ERROR)
  1539. snuba_payloads = batch.reconstruct_messages(
  1540. {
  1541. UseCaseID.SESSIONS: {
  1542. 1: {
  1543. "c:sessions/session@none": 1,
  1544. "d:sessions/duration@second": 2,
  1545. "environment": None,
  1546. "errored": 4,
  1547. "healthy": 5,
  1548. "init": 6,
  1549. "production": 7,
  1550. "s:sessions/error@none": 8,
  1551. "session.status": 9,
  1552. }
  1553. }
  1554. },
  1555. {
  1556. UseCaseID.SESSIONS: {
  1557. 1: {
  1558. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1559. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1560. "environment": Metadata(
  1561. id=None,
  1562. fetch_type=FetchType.RATE_LIMITED,
  1563. fetch_type_ext=FetchTypeExt(is_global=False),
  1564. ),
  1565. "errored": Metadata(id=4, fetch_type=FetchType.DB_READ),
  1566. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  1567. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  1568. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  1569. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  1570. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  1571. }
  1572. }
  1573. },
  1574. ).data
  1575. assert _get_string_indexer_log_records(caplog) == [
  1576. (
  1577. "process_messages.dropped_message",
  1578. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  1579. ),
  1580. (
  1581. "process_messages.dropped_message",
  1582. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  1583. ),
  1584. (
  1585. "process_messages.dropped_message",
  1586. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  1587. ),
  1588. ]
  1589. assert _deconstruct_messages(snuba_payloads) == []
  1590. @pytest.mark.django_db
  1591. def test_tag_value_rate_limited(caplog, settings):
  1592. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1593. outer_message = _construct_outer_message(
  1594. [
  1595. (counter_payload, counter_headers),
  1596. (distribution_payload, distribution_headers),
  1597. (set_payload, set_headers),
  1598. ]
  1599. )
  1600. batch = IndexerBatch(
  1601. outer_message,
  1602. True,
  1603. False,
  1604. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1605. schema_validator=MetricsSchemaValidator(
  1606. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1607. ).validate,
  1608. )
  1609. assert batch.extract_strings() == (
  1610. {
  1611. UseCaseID.SESSIONS: {
  1612. 1: {
  1613. "c:sessions/session@none",
  1614. "d:sessions/duration@second",
  1615. "environment",
  1616. "errored",
  1617. "healthy",
  1618. "init",
  1619. "production",
  1620. "s:sessions/error@none",
  1621. "session.status",
  1622. }
  1623. }
  1624. }
  1625. )
  1626. assert not batch.invalid_msg_meta
  1627. caplog.set_level(logging.ERROR)
  1628. snuba_payloads = batch.reconstruct_messages(
  1629. {
  1630. UseCaseID.SESSIONS: {
  1631. 1: {
  1632. "c:sessions/session@none": 1,
  1633. "d:sessions/duration@second": 2,
  1634. "environment": 3,
  1635. "errored": None,
  1636. "healthy": 5,
  1637. "init": 6,
  1638. "production": 7,
  1639. "s:sessions/error@none": 8,
  1640. "session.status": 9,
  1641. }
  1642. }
  1643. },
  1644. {
  1645. UseCaseID.SESSIONS: {
  1646. 1: {
  1647. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1648. "d:sessions/duration@second": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1649. "environment": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1650. "errored": Metadata(
  1651. id=None,
  1652. fetch_type=FetchType.RATE_LIMITED,
  1653. fetch_type_ext=FetchTypeExt(is_global=False),
  1654. ),
  1655. "healthy": Metadata(id=5, fetch_type=FetchType.HARDCODED),
  1656. "init": Metadata(id=6, fetch_type=FetchType.HARDCODED),
  1657. "production": Metadata(id=7, fetch_type=FetchType.CACHE_HIT),
  1658. "s:sessions/error@none": Metadata(id=8, fetch_type=FetchType.CACHE_HIT),
  1659. "session.status": Metadata(id=9, fetch_type=FetchType.CACHE_HIT),
  1660. }
  1661. }
  1662. },
  1663. ).data
  1664. assert _get_string_indexer_log_records(caplog) == [
  1665. (
  1666. "process_messages.dropped_message",
  1667. {"num_global_quotas": 0, "org_batch_size": 9, "string_type": "tags"},
  1668. ),
  1669. ]
  1670. assert _deconstruct_messages(snuba_payloads) == [
  1671. (
  1672. {
  1673. "mapping_meta": {
  1674. "c": {
  1675. "1": "c:sessions/session@none",
  1676. "3": "environment",
  1677. "7": "production",
  1678. "9": "session.status",
  1679. },
  1680. "h": {"6": "init"},
  1681. },
  1682. "metric_id": 1,
  1683. "org_id": 1,
  1684. "project_id": 3,
  1685. "retention_days": 90,
  1686. "tags": {"3": 7, "9": 6},
  1687. "timestamp": ts,
  1688. "type": "c",
  1689. "use_case_id": "sessions",
  1690. "value": 1.0,
  1691. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1692. },
  1693. [
  1694. *counter_headers,
  1695. ("mapping_sources", b"ch"),
  1696. ("metric_type", "c"),
  1697. ],
  1698. ),
  1699. (
  1700. {
  1701. "mapping_meta": {
  1702. "c": {
  1703. "2": "d:sessions/duration@second",
  1704. "3": "environment",
  1705. "7": "production",
  1706. "9": "session.status",
  1707. },
  1708. "h": {"5": "healthy"},
  1709. },
  1710. "metric_id": 2,
  1711. "org_id": 1,
  1712. "project_id": 3,
  1713. "retention_days": 90,
  1714. "tags": {"3": 7, "9": 5},
  1715. "timestamp": ts,
  1716. "type": "d",
  1717. "use_case_id": "sessions",
  1718. "value": [4, 5, 6],
  1719. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1720. },
  1721. [
  1722. *distribution_headers,
  1723. ("mapping_sources", b"ch"),
  1724. ("metric_type", "d"),
  1725. ],
  1726. ),
  1727. ]
  1728. @pytest.mark.django_db
  1729. def test_one_org_limited(caplog, settings):
  1730. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1731. outer_message = _construct_outer_message(
  1732. [
  1733. (counter_payload, counter_headers),
  1734. ({**distribution_payload, "org_id": 2}, distribution_headers),
  1735. ]
  1736. )
  1737. batch = IndexerBatch(
  1738. outer_message,
  1739. True,
  1740. False,
  1741. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1742. schema_validator=MetricsSchemaValidator(
  1743. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1744. ).validate,
  1745. )
  1746. assert batch.extract_strings() == (
  1747. {
  1748. UseCaseID.SESSIONS: {
  1749. 1: {
  1750. "c:sessions/session@none",
  1751. "environment",
  1752. "init",
  1753. "production",
  1754. "session.status",
  1755. },
  1756. 2: {
  1757. "d:sessions/duration@second",
  1758. "environment",
  1759. "healthy",
  1760. "production",
  1761. "session.status",
  1762. },
  1763. }
  1764. }
  1765. )
  1766. assert not batch.invalid_msg_meta
  1767. caplog.set_level(logging.ERROR)
  1768. snuba_payloads = batch.reconstruct_messages(
  1769. {
  1770. UseCaseID.SESSIONS: {
  1771. 1: {
  1772. "c:sessions/session@none": 1,
  1773. "environment": None,
  1774. "init": 3,
  1775. "production": 4,
  1776. "session.status": 5,
  1777. },
  1778. 2: {
  1779. "d:sessions/duration@second": 1,
  1780. "environment": 2,
  1781. "healthy": 3,
  1782. "production": 4,
  1783. "session.status": 5,
  1784. },
  1785. }
  1786. },
  1787. {
  1788. UseCaseID.SESSIONS: {
  1789. 1: {
  1790. "c:sessions/session@none": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1791. "environment": Metadata(
  1792. id=None,
  1793. fetch_type=FetchType.RATE_LIMITED,
  1794. fetch_type_ext=FetchTypeExt(is_global=False),
  1795. ),
  1796. "init": Metadata(id=3, fetch_type=FetchType.HARDCODED),
  1797. "production": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1798. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1799. },
  1800. 2: {
  1801. "d:sessions/duration@second": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1802. "environment": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1803. "healthy": Metadata(id=3, fetch_type=FetchType.HARDCODED),
  1804. "production": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1805. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1806. },
  1807. }
  1808. },
  1809. ).data
  1810. assert _get_string_indexer_log_records(caplog) == [
  1811. (
  1812. "process_messages.dropped_message",
  1813. {"num_global_quotas": 0, "org_batch_size": 5, "string_type": "tags"},
  1814. ),
  1815. ]
  1816. assert _deconstruct_messages(snuba_payloads) == [
  1817. (
  1818. {
  1819. "mapping_meta": {
  1820. "c": {
  1821. "1": "d:sessions/duration@second",
  1822. "2": "environment",
  1823. "4": "production",
  1824. "5": "session.status",
  1825. },
  1826. "h": {"3": "healthy"},
  1827. },
  1828. "metric_id": 1,
  1829. "org_id": 2,
  1830. "project_id": 3,
  1831. "retention_days": 90,
  1832. "tags": {"2": 4, "5": 3},
  1833. "timestamp": ts,
  1834. "type": "d",
  1835. "use_case_id": "sessions",
  1836. "value": [4, 5, 6],
  1837. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1838. },
  1839. [
  1840. *distribution_headers,
  1841. ("mapping_sources", b"ch"),
  1842. ("metric_type", "d"),
  1843. ],
  1844. ),
  1845. ]
  1846. @pytest.mark.django_db
  1847. def test_cardinality_limiter(caplog, settings):
  1848. """
  1849. Test functionality of the indexer batch related to cardinality-limiting. More concretely, assert that `IndexerBatch.filter_messages`:
  1850. 1. removes the messages from the outgoing batch
  1851. 2. prevents strings from filtered messages from being extracted & indexed
  1852. 3. does not crash when strings from filtered messages are not passed into reconstruct_messages
  1853. 4. still extracts strings that exist both in filtered and unfiltered messages (eg "environment")
  1854. """
  1855. settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
  1856. outer_message = _construct_outer_message(
  1857. [
  1858. (counter_payload, counter_headers),
  1859. (distribution_payload, distribution_headers),
  1860. (set_payload, set_headers),
  1861. ]
  1862. )
  1863. batch = IndexerBatch(
  1864. outer_message,
  1865. True,
  1866. False,
  1867. tags_validator=ReleaseHealthTagsValidator().is_allowed,
  1868. schema_validator=MetricsSchemaValidator(
  1869. INGEST_CODEC, RELEASE_HEALTH_SCHEMA_VALIDATION_RULES_OPTION_NAME
  1870. ).validate,
  1871. )
  1872. keys_to_remove = list(batch.parsed_payloads_by_meta)[:2]
  1873. # the messages come in a certain order, and Python dictionaries preserve
  1874. # their insertion order. So we can hardcode offsets here.
  1875. assert keys_to_remove == [
  1876. BrokerMeta(partition=Partition(Topic("topic"), 0), offset=0),
  1877. BrokerMeta(partition=Partition(Topic("topic"), 0), offset=1),
  1878. ]
  1879. batch.filter_messages(keys_to_remove)
  1880. assert batch.extract_strings() == {
  1881. UseCaseID.SESSIONS: {
  1882. 1: {
  1883. "environment",
  1884. "errored",
  1885. "production",
  1886. # Note, we only extracted one MRI, of the one metric that we didn't
  1887. # drop
  1888. "s:sessions/error@none",
  1889. "session.status",
  1890. },
  1891. }
  1892. }
  1893. assert not batch.invalid_msg_meta
  1894. snuba_payloads = batch.reconstruct_messages(
  1895. {
  1896. UseCaseID.SESSIONS: {
  1897. 1: {
  1898. "environment": 1,
  1899. "errored": 2,
  1900. "production": 3,
  1901. "s:sessions/error@none": 4,
  1902. "session.status": 5,
  1903. },
  1904. }
  1905. },
  1906. {
  1907. UseCaseID.SESSIONS: {
  1908. 1: {
  1909. "environment": Metadata(id=1, fetch_type=FetchType.CACHE_HIT),
  1910. "errored": Metadata(id=2, fetch_type=FetchType.CACHE_HIT),
  1911. "production": Metadata(id=3, fetch_type=FetchType.CACHE_HIT),
  1912. "s:sessions/error@none": Metadata(id=4, fetch_type=FetchType.CACHE_HIT),
  1913. "session.status": Metadata(id=5, fetch_type=FetchType.CACHE_HIT),
  1914. }
  1915. }
  1916. },
  1917. ).data
  1918. assert _deconstruct_messages(snuba_payloads) == [
  1919. (
  1920. {
  1921. "mapping_meta": {
  1922. "c": {
  1923. "1": "environment",
  1924. "2": "errored",
  1925. "3": "production",
  1926. "4": "s:sessions/error@none",
  1927. "5": "session.status",
  1928. },
  1929. },
  1930. "metric_id": 4,
  1931. "org_id": 1,
  1932. "project_id": 3,
  1933. "retention_days": 90,
  1934. "tags": {"1": 3, "5": 2},
  1935. "timestamp": ts,
  1936. "type": "s",
  1937. "use_case_id": "sessions",
  1938. "value": [3],
  1939. "sentry_received_timestamp": BROKER_TIMESTAMP.timestamp(),
  1940. },
  1941. [
  1942. *set_headers,
  1943. ("mapping_sources", b"c"),
  1944. ("metric_type", "s"),
  1945. ],
  1946. )
  1947. ]