test_run.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. from collections.abc import Mapping, MutableMapping
  2. from datetime import datetime
  3. from typing import Any
  4. from unittest import mock
  5. from arroyo.backends.kafka import KafkaPayload
  6. from arroyo.types import BrokerValue, Message, Partition
  7. from arroyo.types import Topic as ArroyoTopic
  8. from django.db import close_old_connections
  9. from sentry import features
  10. from sentry.conf.types.kafka_definition import Topic
  11. from sentry.issues.occurrence_consumer import _process_message, process_occurrence_group
  12. from sentry.issues.producer import (
  13. PayloadType,
  14. _prepare_occurrence_message,
  15. _prepare_status_change_message,
  16. )
  17. from sentry.issues.run import OccurrenceStrategyFactory
  18. from sentry.issues.status_change_message import StatusChangeMessage
  19. from sentry.testutils.cases import TestCase
  20. from sentry.testutils.helpers.datetime import before_now
  21. from sentry.testutils.helpers.features import apply_feature_flag_on_cls, with_feature
  22. from sentry.types.group import PriorityLevel
  23. from sentry.utils import json
  24. from sentry.utils.kafka_config import get_topic_definition
  25. from tests.sentry.issues.test_utils import OccurrenceTestMixin, StatusChangeTestMixin
  26. # need to shut down the connections in the thread for tests to pass
  27. def process_occurrence_group_with_shutdown(items: list[Mapping[str, Any]]) -> None:
  28. process_occurrence_group(items)
  29. close_old_connections()
  30. class TestOccurrenceConsumer(TestCase, OccurrenceTestMixin):
  31. def build_mock_message(
  32. self, data: MutableMapping[str, Any] | None, topic: ArroyoTopic | None = None
  33. ) -> mock.Mock:
  34. message = mock.Mock()
  35. message.value.return_value = json.dumps(data)
  36. if topic:
  37. message.topic.return_value = topic
  38. return message
  39. @with_feature("organizations:profile-file-io-main-thread-ingest")
  40. @mock.patch("sentry.issues.occurrence_consumer.save_issue_occurrence")
  41. def test_saves_issue_occurrence(self, mock_save_issue_occurrence: mock.MagicMock) -> None:
  42. topic = ArroyoTopic(get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"])
  43. partition_1 = Partition(topic, 0)
  44. partition_2 = Partition(topic, 1)
  45. mock_commit = mock.Mock()
  46. strategy = OccurrenceStrategyFactory(
  47. num_processes=2,
  48. input_block_size=1,
  49. max_batch_size=2,
  50. max_batch_time=1,
  51. output_block_size=1,
  52. ).create_with_partitions(
  53. commit=mock_commit,
  54. partitions={},
  55. )
  56. occurrence = self.build_occurrence(project_id=self.project.id)
  57. payload_data = _prepare_occurrence_message(
  58. occurrence,
  59. {
  60. "project_id": self.project.id,
  61. "event_id": occurrence.event_id,
  62. "platform": "python",
  63. "tags": {"my_tag": "2"},
  64. "timestamp": before_now(minutes=1).isoformat(),
  65. "received": before_now(minutes=1).isoformat(),
  66. },
  67. )
  68. message = self.build_mock_message(payload_data, topic)
  69. strategy.submit(
  70. Message(
  71. BrokerValue(
  72. KafkaPayload(b"key", message.value().encode("utf-8"), []),
  73. partition_1,
  74. 1,
  75. datetime.now(),
  76. )
  77. )
  78. )
  79. strategy.submit(
  80. Message(
  81. BrokerValue(
  82. KafkaPayload(b"key", message.value().encode("utf-8"), []),
  83. partition_2,
  84. 1,
  85. datetime.now(),
  86. )
  87. )
  88. )
  89. calls = [
  90. mock.call({partition_1: 2}),
  91. mock.call({partition_2: 2}),
  92. ]
  93. mock_commit.assert_has_calls(calls=calls, any_order=True)
  94. strategy.poll()
  95. strategy.join(1)
  96. strategy.terminate()
  97. assert mock_save_issue_occurrence.call_count == 2
  98. occurrence_data = occurrence.to_dict()
  99. # need to modify some fields because they get mutated
  100. occurrence_data["initial_issue_priority"] = PriorityLevel.LOW
  101. occurrence_data["fingerprint"] = ["cdfb5fbc0959e8e2f27a6e6027c6335b"]
  102. mock_save_issue_occurrence.assert_called_with(occurrence_data, mock.ANY)
  103. @with_feature("organizations:profile-file-io-main-thread-ingest")
  104. @mock.patch("sentry.issues.run.logger")
  105. @mock.patch("sentry.issues.occurrence_consumer.save_issue_occurrence")
  106. def test_malformed_json_payload(
  107. self, mock_save_issue_occurrence: mock.MagicMock, mock_logger: mock.MagicMock
  108. ) -> None:
  109. topic = ArroyoTopic(get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"])
  110. partition = Partition(topic, 0)
  111. mock_commit = mock.Mock()
  112. strategy = OccurrenceStrategyFactory(
  113. num_processes=2,
  114. input_block_size=1,
  115. max_batch_size=1,
  116. max_batch_time=1,
  117. output_block_size=1,
  118. ).create_with_partitions(
  119. commit=mock_commit,
  120. partitions={},
  121. )
  122. message = mock.Mock()
  123. message.value.return_value = "malformed json"
  124. message.topic.return_value = topic
  125. strategy.submit(
  126. Message(
  127. BrokerValue(
  128. KafkaPayload(b"key", message.value().encode("utf-8"), []),
  129. partition,
  130. 1,
  131. datetime.now(),
  132. )
  133. )
  134. )
  135. strategy.poll()
  136. strategy.join(1)
  137. strategy.terminate()
  138. assert mock_save_issue_occurrence.call_count == 0
  139. mock_logger.exception.assert_called_once_with("failed to process message payload")
  140. class TestBatchedOccurrenceConsumer(TestCase, OccurrenceTestMixin, StatusChangeTestMixin):
  141. def build_mock_message(
  142. self, data: MutableMapping[str, Any] | None, topic: ArroyoTopic | None = None
  143. ) -> mock.Mock:
  144. message = mock.Mock()
  145. message.value.return_value = json.dumps(data)
  146. if topic:
  147. message.topic.return_value = topic
  148. return message
  149. # @mock.patch.object(cache, "get")
  150. @with_feature("organizations:profile-file-io-main-thread-ingest")
  151. @mock.patch(
  152. "sentry.issues.occurrence_consumer.process_occurrence_group",
  153. side_effect=process_occurrence_group_with_shutdown,
  154. )
  155. @mock.patch("sentry.issues.occurrence_consumer.save_issue_occurrence")
  156. def test_saves_issue_occurrence(
  157. self,
  158. mock_save_issue_occurrence: mock.MagicMock,
  159. mock_process_occurrence_group: mock.MagicMock,
  160. ) -> None:
  161. topic = ArroyoTopic(get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"])
  162. partition_1 = Partition(topic, 0)
  163. partition_2 = Partition(topic, 1)
  164. mock_commit = mock.Mock()
  165. strategy = OccurrenceStrategyFactory(
  166. mode="batched-parallel",
  167. max_batch_size=3,
  168. max_batch_time=1,
  169. ).create_with_partitions(
  170. commit=mock_commit,
  171. partitions={},
  172. )
  173. # create 3 occurrences with 2 having the same fingerprint
  174. occurrence1 = self.build_occurrence(project_id=self.project.id, fingerprint=["1"])
  175. occurrence2 = self.build_occurrence(project_id=self.project.id, fingerprint=["2"])
  176. occurrence3 = self.build_occurrence(project_id=self.project.id, fingerprint=["2"])
  177. payload_data1 = _prepare_occurrence_message(
  178. occurrence1,
  179. {
  180. "project_id": self.project.id,
  181. "event_id": occurrence1.event_id,
  182. "platform": "python",
  183. "tags": {"my_tag": "1"},
  184. "timestamp": before_now(minutes=1).isoformat(),
  185. "received": before_now(minutes=1).isoformat(),
  186. "environment": "production",
  187. },
  188. )
  189. payload_data2 = _prepare_occurrence_message(
  190. occurrence2,
  191. {
  192. "project_id": self.project.id,
  193. "event_id": occurrence2.event_id,
  194. "platform": "python",
  195. "tags": {"my_tag": "2"},
  196. "timestamp": before_now(minutes=1).isoformat(),
  197. "received": before_now(minutes=1).isoformat(),
  198. "environment": "production",
  199. },
  200. )
  201. payload_data3 = _prepare_occurrence_message(
  202. occurrence3,
  203. {
  204. "project_id": self.project.id,
  205. "event_id": occurrence3.event_id,
  206. "platform": "python",
  207. "tags": {"my_tag": "3"},
  208. "timestamp": before_now(minutes=1).isoformat(),
  209. "received": before_now(minutes=1).isoformat(),
  210. "environment": "production",
  211. },
  212. )
  213. message1 = self.build_mock_message(payload_data1, topic)
  214. message2 = self.build_mock_message(payload_data2, topic)
  215. message3 = self.build_mock_message(payload_data3, topic)
  216. with self.tasks():
  217. strategy.submit(
  218. Message(
  219. BrokerValue(
  220. KafkaPayload(b"group-1", message1.value().encode("utf-8"), []),
  221. partition_1,
  222. 1,
  223. datetime.now(),
  224. )
  225. )
  226. )
  227. strategy.submit(
  228. Message(
  229. BrokerValue(
  230. KafkaPayload(b"group-2", message2.value().encode("utf-8"), []),
  231. partition_2,
  232. 1,
  233. datetime.now(),
  234. )
  235. )
  236. )
  237. strategy.submit(
  238. Message(
  239. BrokerValue(
  240. KafkaPayload(b"group-2", message3.value().encode("utf-8"), []),
  241. partition_2,
  242. 1,
  243. datetime.now(),
  244. )
  245. )
  246. )
  247. strategy.poll()
  248. strategy.join(1)
  249. strategy.terminate()
  250. calls = [mock.call({partition_1: 2, partition_2: 2})]
  251. mock_commit.assert_has_calls(calls=calls, any_order=True)
  252. assert mock_save_issue_occurrence.call_count == 3
  253. occurrence_data1 = occurrence1.to_dict()
  254. occurrence_data2 = occurrence2.to_dict()
  255. occurrence_data3 = occurrence3.to_dict()
  256. # need to modify some fields because they get mutated
  257. occurrence_data1["initial_issue_priority"] = PriorityLevel.LOW
  258. occurrence_data1["fingerprint"] = ["28c8edde3d61a0411511d3b1866f0636"]
  259. occurrence_data2["initial_issue_priority"] = PriorityLevel.LOW
  260. occurrence_data2["fingerprint"] = ["665f644e43731ff9db3d341da5c827e1"]
  261. occurrence_data3["initial_issue_priority"] = PriorityLevel.LOW
  262. occurrence_data3["fingerprint"] = ["665f644e43731ff9db3d341da5c827e1"]
  263. assert any(
  264. call.args[0] == occurrence_data1 for call in mock_save_issue_occurrence.mock_calls
  265. )
  266. assert any(
  267. call.args[0] == occurrence_data2 for call in mock_save_issue_occurrence.mock_calls
  268. )
  269. assert any(
  270. call.args[0] == occurrence_data3 for call in mock_save_issue_occurrence.mock_calls
  271. )
  272. # verify we group by the fingerprint
  273. assert mock_process_occurrence_group.call_count == 2
  274. item_list1 = mock_process_occurrence_group.mock_calls[0].args[0]
  275. item_list2 = mock_process_occurrence_group.mock_calls[1].args[0]
  276. assert len(item_list1) == 1
  277. assert item_list1[0]["event_id"] == occurrence1.event_id
  278. assert len(item_list2) == 2
  279. assert item_list2[0]["event_id"] == occurrence2.event_id
  280. assert item_list2[1]["event_id"] == occurrence3.event_id
  281. @with_feature("organizations:profile-file-io-main-thread-ingest")
  282. @mock.patch(
  283. "sentry.issues.occurrence_consumer.process_occurrence_group",
  284. side_effect=process_occurrence_group_with_shutdown,
  285. )
  286. @mock.patch("sentry.issues.occurrence_consumer.save_issue_occurrence")
  287. def test_issue_occurrence_status_change_mix(
  288. self,
  289. mock_save_issue_occurrence: mock.MagicMock,
  290. mock_process_occurrence_group: mock.MagicMock,
  291. ) -> None:
  292. topic = ArroyoTopic(get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"])
  293. partition_1 = Partition(topic, 0)
  294. partition_2 = Partition(topic, 1)
  295. mock_commit = mock.Mock()
  296. strategy = OccurrenceStrategyFactory(
  297. mode="batched-parallel",
  298. max_batch_size=3,
  299. max_batch_time=1,
  300. ).create_with_partitions(
  301. commit=mock_commit,
  302. partitions={},
  303. )
  304. # create 3 occurrences with 2 having the same fingerprint
  305. occurrence1 = self.build_occurrence(project_id=self.project.id, fingerprint=["1"])
  306. occurrence2 = self.build_occurrence(project_id=self.project.id, fingerprint=["2"])
  307. status_change3 = self.build_statuschange(project_id=self.project.id, fingerprint=["2"])
  308. payload_data1 = _prepare_occurrence_message(
  309. occurrence1,
  310. {
  311. "project_id": self.project.id,
  312. "event_id": occurrence1.event_id,
  313. "platform": "python",
  314. "tags": {"my_tag": "1"},
  315. "timestamp": before_now(minutes=1).isoformat(),
  316. "received": before_now(minutes=1).isoformat(),
  317. "environment": "production",
  318. },
  319. )
  320. payload_data2 = _prepare_occurrence_message(
  321. occurrence2,
  322. {
  323. "project_id": self.project.id,
  324. "event_id": occurrence2.event_id,
  325. "platform": "python",
  326. "tags": {"my_tag": "2"},
  327. "timestamp": before_now(minutes=1).isoformat(),
  328. "received": before_now(minutes=1).isoformat(),
  329. "environment": "production",
  330. },
  331. )
  332. payload_data3 = _prepare_status_change_message(
  333. status_change3,
  334. )
  335. message1 = self.build_mock_message(payload_data1, topic)
  336. message2 = self.build_mock_message(payload_data2, topic)
  337. message3 = self.build_mock_message(payload_data3, topic)
  338. with self.tasks():
  339. strategy.submit(
  340. Message(
  341. BrokerValue(
  342. KafkaPayload(b"group-1", message1.value().encode("utf-8"), []),
  343. partition_1,
  344. 1,
  345. datetime.now(),
  346. )
  347. )
  348. )
  349. strategy.submit(
  350. Message(
  351. BrokerValue(
  352. KafkaPayload(b"group-2", message2.value().encode("utf-8"), []),
  353. partition_2,
  354. 1,
  355. datetime.now(),
  356. )
  357. )
  358. )
  359. strategy.submit(
  360. Message(
  361. BrokerValue(
  362. KafkaPayload(b"group-2", message3.value().encode("utf-8"), []),
  363. partition_2,
  364. 1,
  365. datetime.now(),
  366. )
  367. )
  368. )
  369. strategy.poll()
  370. strategy.join(1)
  371. strategy.terminate()
  372. calls = [mock.call({partition_1: 2, partition_2: 2})]
  373. separate_calls = [mock.call({partition_1: 2}), mock.call({partition_2: 2})]
  374. try:
  375. mock_commit.assert_has_calls(calls=calls, any_order=True)
  376. except AssertionError:
  377. mock_commit.assert_has_calls(calls=separate_calls, any_order=True)
  378. assert mock_save_issue_occurrence.call_count == 2
  379. occurrence_data1 = occurrence1.to_dict()
  380. occurrence_data2 = occurrence2.to_dict()
  381. # need to modify some fields because they get mutated
  382. occurrence_data1["initial_issue_priority"] = PriorityLevel.LOW
  383. occurrence_data1["fingerprint"] = ["28c8edde3d61a0411511d3b1866f0636"]
  384. occurrence_data2["initial_issue_priority"] = PriorityLevel.LOW
  385. occurrence_data2["fingerprint"] = ["665f644e43731ff9db3d341da5c827e1"]
  386. assert any(
  387. call.args[0] == occurrence_data1 for call in mock_save_issue_occurrence.mock_calls
  388. )
  389. assert any(
  390. call.args[0] == occurrence_data2 for call in mock_save_issue_occurrence.mock_calls
  391. )
  392. # verify we group by the fingerprint
  393. assert mock_process_occurrence_group.call_count == 2
  394. item_list1 = mock_process_occurrence_group.mock_calls[0].args[0]
  395. item_list2 = mock_process_occurrence_group.mock_calls[1].args[0]
  396. assert len(item_list1) == 1
  397. assert item_list1[0]["event_id"] == occurrence1.event_id
  398. assert len(item_list2) == 2
  399. assert item_list2[0]["event_id"] == occurrence2.event_id
  400. assert item_list2[1]["payload_type"] == PayloadType.STATUS_CHANGE.value
  401. @with_feature("organizations:profile-file-io-main-thread-ingest")
  402. @mock.patch(
  403. "sentry.issues.occurrence_consumer._process_message",
  404. side_effect=_process_message,
  405. )
  406. @mock.patch(
  407. "sentry.issues.occurrence_consumer.process_occurrence_group",
  408. side_effect=process_occurrence_group_with_shutdown,
  409. )
  410. @mock.patch("sentry.issues.occurrence_consumer.save_issue_occurrence")
  411. def test_issue_multiple_status_changes(
  412. self,
  413. mock_save_issue_occurrence: mock.MagicMock,
  414. mock_process_occurrence_group: mock.MagicMock,
  415. mock__process_message: mock.MagicMock,
  416. ) -> None:
  417. topic = ArroyoTopic(get_topic_definition(Topic.INGEST_OCCURRENCES)["real_topic_name"])
  418. partition = Partition(topic, 0)
  419. mock_commit = mock.Mock()
  420. strategy = OccurrenceStrategyFactory(
  421. mode="batched-parallel",
  422. max_batch_size=6,
  423. max_batch_time=1,
  424. ).create_with_partitions(
  425. commit=mock_commit,
  426. partitions={},
  427. )
  428. messages = [
  429. self.build_statuschange(project_id=self.project.id, fingerprint=["1"]) for _ in range(3)
  430. ] + [self.build_occurrence(project_id=self.project.id, fingerprint=["1"]) for _ in range(3)]
  431. payloads = [
  432. (
  433. _prepare_status_change_message(m)
  434. if isinstance(m, StatusChangeMessage)
  435. else _prepare_occurrence_message(
  436. m,
  437. {
  438. "project_id": self.project.id,
  439. "event_id": m.event_id,
  440. "platform": "python",
  441. "tags": {"my_tag": "1"},
  442. "timestamp": before_now(minutes=1).isoformat(),
  443. "received": before_now(minutes=1).isoformat(),
  444. "environment": "production",
  445. },
  446. )
  447. )
  448. for m in messages
  449. ]
  450. mock_messages = [self.build_mock_message(payload, topic) for payload in payloads]
  451. with self.tasks():
  452. for message in mock_messages:
  453. strategy.submit(
  454. Message(
  455. BrokerValue(
  456. KafkaPayload(b"group-1", message.value().encode("utf-8"), []),
  457. partition,
  458. 1,
  459. datetime.now(),
  460. )
  461. )
  462. )
  463. strategy.poll()
  464. strategy.join(1)
  465. strategy.terminate()
  466. assert mock_save_issue_occurrence.call_count == 3
  467. assert mock_process_occurrence_group.call_count == 1
  468. item_list = mock_process_occurrence_group.mock_calls[0].args[0]
  469. assert len(item_list) == 6
  470. # this behavior depends on the feature flag
  471. if features.has("organizations:occurence-consumer-prune-status-changes", self.organization):
  472. # two status change messages should be pruned
  473. assert len(mock__process_message.mock_calls) == 4
  474. # there should be only one status change message, and it should be the last message
  475. assert (
  476. mock__process_message.mock_calls[-1].args[0]["payload_type"]
  477. == PayloadType.STATUS_CHANGE.value
  478. )
  479. assert (
  480. len(
  481. [
  482. call
  483. for call in mock__process_message.mock_calls
  484. if call.args[0]["payload_type"] == PayloadType.STATUS_CHANGE.value
  485. ]
  486. )
  487. == 1
  488. )
  489. else:
  490. assert len(mock__process_message.mock_calls) == 6
  491. assert (
  492. len(
  493. [
  494. call
  495. for call in mock__process_message.mock_calls
  496. if call.args[0]["payload_type"] == PayloadType.STATUS_CHANGE.value
  497. ]
  498. )
  499. == 3
  500. )
  501. #
  502. @apply_feature_flag_on_cls("organizations:occurence-consumer-prune-status-changes")
  503. class TestOccurrenceConsumerWithFlags(TestOccurrenceConsumer):
  504. pass
  505. # @override_options({"issues.occurrence_consumer.use_orjson": True})
  506. @apply_feature_flag_on_cls("organizations:occurence-consumer-prune-status-changes")
  507. class TestBatchedOccurrenceConsumerWithFlags(TestBatchedOccurrenceConsumer):
  508. pass