test_metrics_layer.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. from datetime import datetime, timedelta, timezone
  2. from typing import Literal, Mapping
  3. import pytest
  4. from snuba_sdk import (
  5. Column,
  6. Condition,
  7. Direction,
  8. Metric,
  9. MetricsQuery,
  10. MetricsScope,
  11. Op,
  12. Request,
  13. Rollup,
  14. Timeseries,
  15. )
  16. from sentry.api.utils import InvalidParams
  17. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  18. from sentry.snuba.metrics.naming_layer import TransactionMRI
  19. from sentry.snuba.metrics_layer.query import run_query
  20. from sentry.testutils.cases import BaseMetricsTestCase, TestCase
  21. pytestmark = pytest.mark.sentry_metrics
  22. class SnQLTest(TestCase, BaseMetricsTestCase):
  23. def ts(self, dt: datetime) -> int:
  24. return int(dt.timestamp())
  25. def setUp(self) -> None:
  26. super().setUp()
  27. self.metrics: Mapping[str, Literal["counter", "set", "distribution"]] = {
  28. TransactionMRI.DURATION.value: "distribution",
  29. TransactionMRI.USER.value: "set",
  30. TransactionMRI.COUNT_PER_ROOT_PROJECT.value: "counter",
  31. }
  32. self.now = datetime.now(tz=timezone.utc).replace(microsecond=0)
  33. self.hour_ago = self.now - timedelta(hours=1)
  34. self.org_id = self.project.organization_id
  35. # Store a data point every 10 seconds for an hour
  36. for mri, metric_type in self.metrics.items():
  37. assert metric_type in {"counter", "distribution", "set"}
  38. for i in range(360):
  39. self.store_metric(
  40. self.org_id,
  41. self.project.id,
  42. metric_type,
  43. mri,
  44. {
  45. "transaction": f"transaction_{i % 2}",
  46. "status_code": "500" if i % 10 == 0 else "200",
  47. "device": "BlackBerry" if i % 3 == 0 else "Nokia",
  48. },
  49. self.ts(self.hour_ago + timedelta(minutes=1 * i)),
  50. i,
  51. UseCaseID.TRANSACTIONS,
  52. )
  53. def test_basic(self) -> None:
  54. query = MetricsQuery(
  55. query=Timeseries(
  56. metric=Metric(
  57. "transaction.duration",
  58. TransactionMRI.DURATION.value,
  59. ),
  60. aggregate="max",
  61. ),
  62. start=self.hour_ago,
  63. end=self.now,
  64. rollup=Rollup(interval=60, granularity=60),
  65. scope=MetricsScope(
  66. org_ids=[self.org_id],
  67. project_ids=[self.project.id],
  68. use_case_id=UseCaseID.TRANSACTIONS.value,
  69. ),
  70. )
  71. request = Request(
  72. dataset="generic_metrics",
  73. app_id="tests",
  74. query=query,
  75. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  76. )
  77. result = run_query(request)
  78. assert len(result["data"]) == 61
  79. rows = result["data"]
  80. for i in range(61):
  81. assert rows[i]["aggregate_value"] == i
  82. assert (
  83. rows[i]["time"]
  84. == (
  85. self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
  86. ).isoformat()
  87. )
  88. def test_groupby(self) -> None:
  89. query = MetricsQuery(
  90. query=Timeseries(
  91. metric=Metric(
  92. "transaction.duration",
  93. TransactionMRI.DURATION.value,
  94. ),
  95. aggregate="quantiles",
  96. aggregate_params=[0.5, 0.99],
  97. groupby=[Column("transaction")],
  98. ),
  99. start=self.hour_ago,
  100. end=self.now,
  101. rollup=Rollup(interval=60, granularity=60),
  102. scope=MetricsScope(
  103. org_ids=[self.org_id],
  104. project_ids=[self.project.id],
  105. use_case_id=UseCaseID.TRANSACTIONS.value,
  106. ),
  107. )
  108. request = Request(
  109. dataset="generic_metrics",
  110. app_id="tests",
  111. query=query,
  112. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  113. )
  114. result = run_query(request)
  115. assert len(result["data"]) == 61
  116. rows = result["data"]
  117. for i in range(61):
  118. assert rows[i]["aggregate_value"] == [i, i]
  119. assert rows[i]["transaction"] == f"transaction_{i % 2}"
  120. assert (
  121. rows[i]["time"]
  122. == (
  123. self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=1 * i)
  124. ).isoformat()
  125. )
  126. def test_filters(self) -> None:
  127. query = MetricsQuery(
  128. query=Timeseries(
  129. metric=Metric(
  130. "transaction.duration",
  131. TransactionMRI.DURATION.value,
  132. ),
  133. aggregate="quantiles",
  134. aggregate_params=[0.5],
  135. filters=[Condition(Column("status_code"), Op.EQ, "500")],
  136. ),
  137. filters=[Condition(Column("device"), Op.EQ, "BlackBerry")],
  138. start=self.hour_ago,
  139. end=self.now,
  140. rollup=Rollup(interval=60, granularity=60),
  141. scope=MetricsScope(
  142. org_ids=[self.org_id],
  143. project_ids=[self.project.id],
  144. use_case_id=UseCaseID.TRANSACTIONS.value,
  145. ),
  146. )
  147. request = Request(
  148. dataset="generic_metrics",
  149. app_id="tests",
  150. query=query,
  151. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  152. )
  153. result = run_query(request)
  154. assert len(result["data"]) == 3
  155. rows = result["data"]
  156. for i in range(3): # 500 status codes on Blackberry are sparse
  157. assert rows[i]["aggregate_value"] == [i * 30]
  158. assert (
  159. rows[i]["time"]
  160. == (
  161. self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=30 * i)
  162. ).isoformat()
  163. )
  164. def test_complex(self) -> None:
  165. query = MetricsQuery(
  166. query=Timeseries(
  167. metric=Metric(
  168. "transaction.duration",
  169. TransactionMRI.DURATION.value,
  170. ),
  171. aggregate="quantiles",
  172. aggregate_params=[0.5],
  173. filters=[Condition(Column("status_code"), Op.EQ, "500")],
  174. groupby=[Column("transaction")],
  175. ),
  176. filters=[Condition(Column("device"), Op.EQ, "BlackBerry")],
  177. start=self.hour_ago,
  178. end=self.now,
  179. rollup=Rollup(interval=60, granularity=60),
  180. scope=MetricsScope(
  181. org_ids=[self.org_id],
  182. project_ids=[self.project.id],
  183. use_case_id=UseCaseID.TRANSACTIONS.value,
  184. ),
  185. )
  186. request = Request(
  187. dataset="generic_metrics",
  188. app_id="tests",
  189. query=query,
  190. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  191. )
  192. result = run_query(request)
  193. assert len(result["data"]) == 3
  194. rows = result["data"]
  195. for i in range(3): # 500 status codes on BB are sparse
  196. assert rows[i]["aggregate_value"] == [i * 30]
  197. assert rows[i]["transaction"] == "transaction_0"
  198. assert (
  199. rows[i]["time"]
  200. == (
  201. self.hour_ago.replace(second=0, microsecond=0) + timedelta(minutes=30 * i)
  202. ).isoformat()
  203. )
  204. def test_totals(self) -> None:
  205. query = MetricsQuery(
  206. query=Timeseries(
  207. metric=Metric(
  208. "transaction.duration",
  209. TransactionMRI.DURATION.value,
  210. ),
  211. aggregate="max",
  212. filters=[Condition(Column("status_code"), Op.EQ, "200")],
  213. groupby=[Column("transaction")],
  214. ),
  215. start=self.hour_ago,
  216. end=self.now,
  217. rollup=Rollup(totals=True, granularity=60, orderby=Direction.ASC),
  218. scope=MetricsScope(
  219. org_ids=[self.org_id],
  220. project_ids=[self.project.id],
  221. use_case_id=UseCaseID.TRANSACTIONS.value,
  222. ),
  223. )
  224. request = Request(
  225. dataset="generic_metrics",
  226. app_id="tests",
  227. query=query,
  228. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  229. )
  230. result = run_query(request)
  231. assert len(result["data"]) == 2
  232. rows = result["data"]
  233. assert rows[0]["aggregate_value"] == 58
  234. assert rows[0]["transaction"] == "transaction_0"
  235. assert rows[1]["aggregate_value"] == 59
  236. assert rows[1]["transaction"] == "transaction_1"
  237. def test_meta_data_in_response(self) -> None:
  238. query = MetricsQuery(
  239. query=Timeseries(
  240. metric=Metric(
  241. "transaction.duration",
  242. TransactionMRI.DURATION.value,
  243. ),
  244. aggregate="max",
  245. filters=[Condition(Column("status_code"), Op.EQ, "200")],
  246. groupby=[Column("transaction")],
  247. ),
  248. start=self.hour_ago.replace(minute=16, second=59),
  249. end=self.now.replace(minute=16, second=59),
  250. rollup=Rollup(interval=60, granularity=60),
  251. scope=MetricsScope(
  252. org_ids=[self.org_id],
  253. project_ids=[self.project.id],
  254. use_case_id=UseCaseID.TRANSACTIONS.value,
  255. ),
  256. )
  257. request = Request(
  258. dataset="generic_metrics",
  259. app_id="tests",
  260. query=query,
  261. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  262. )
  263. result = run_query(request)
  264. assert result["modified_start"] == self.hour_ago.replace(minute=16, second=0)
  265. assert result["modified_end"] == self.now.replace(minute=17, second=0)
  266. assert result["indexer_mappings"] == {
  267. "d:transactions/duration@millisecond": 9223372036854775909,
  268. "status_code": 10000,
  269. "transaction": 9223372036854776020,
  270. }
  271. def test_bad_query(self) -> None:
  272. query = MetricsQuery(
  273. query=Timeseries(
  274. metric=Metric(
  275. "transaction.duration",
  276. "not a real MRI",
  277. ),
  278. aggregate="max",
  279. ),
  280. start=self.hour_ago.replace(minute=16, second=59),
  281. end=self.now.replace(minute=16, second=59),
  282. rollup=Rollup(interval=60, granularity=60),
  283. scope=MetricsScope(
  284. org_ids=[self.org_id],
  285. project_ids=[self.project.id],
  286. use_case_id=UseCaseID.TRANSACTIONS.value,
  287. ),
  288. )
  289. request = Request(
  290. dataset="generic_metrics",
  291. app_id="tests",
  292. query=query,
  293. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  294. )
  295. with pytest.raises(InvalidParams):
  296. run_query(request)
  297. def test_interval_with_totals(self) -> None:
  298. query = MetricsQuery(
  299. query=Timeseries(
  300. metric=Metric(
  301. "transaction.duration",
  302. TransactionMRI.DURATION.value,
  303. ),
  304. aggregate="max",
  305. filters=[Condition(Column("status_code"), Op.EQ, "200")],
  306. groupby=[Column("transaction")],
  307. ),
  308. start=self.hour_ago,
  309. end=self.now,
  310. rollup=Rollup(interval=60, totals=True, granularity=60),
  311. scope=MetricsScope(
  312. org_ids=[self.org_id],
  313. project_ids=[self.project.id],
  314. use_case_id=UseCaseID.TRANSACTIONS.value,
  315. ),
  316. )
  317. request = Request(
  318. dataset="generic_metrics",
  319. app_id="tests",
  320. query=query,
  321. tenant_ids={"referrer": "metrics.testing.test", "organization_id": self.org_id},
  322. )
  323. result = run_query(request)
  324. assert len(result["data"]) == 54
  325. assert result["totals"]["aggregate_value"] == 59