test_producer.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. import uuid
  2. from datetime import datetime
  3. from unittest.mock import MagicMock, patch
  4. import pytest
  5. from arroyo import Topic as ArroyoTopic
  6. from arroyo.backends.kafka import KafkaPayload
  7. from django.test import override_settings
  8. from sentry.issues.ingest import process_occurrence_data
  9. from sentry.issues.issue_occurrence import IssueOccurrence
  10. from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
  11. from sentry.issues.status_change_message import StatusChangeMessage
  12. from sentry.models.activity import Activity
  13. from sentry.models.group import GroupStatus
  14. from sentry.models.grouphistory import STRING_TO_STATUS_LOOKUP, GroupHistory, GroupHistoryStatus
  15. from sentry.testutils.cases import TestCase
  16. from sentry.testutils.helpers.datetime import before_now
  17. from sentry.testutils.helpers.features import apply_feature_flag_on_cls
  18. from sentry.testutils.skips import requires_snuba
  19. from sentry.types.activity import ActivityType
  20. from sentry.types.group import GROUP_SUBSTATUS_TO_GROUP_HISTORY_STATUS, GroupSubStatus
  21. from sentry.utils import json
  22. from sentry.utils.samples import load_data
  23. from tests.sentry.issues.test_utils import OccurrenceTestMixin
  24. pytestmark = [requires_snuba]
  25. @apply_feature_flag_on_cls("organizations:profile-file-io-main-thread-ingest")
  26. class TestProduceOccurrenceToKafka(TestCase, OccurrenceTestMixin):
  27. def test_event_id_mismatch(self) -> None:
  28. with self.assertRaisesMessage(
  29. ValueError, "Event id on occurrence and event_data must be the same"
  30. ):
  31. produce_occurrence_to_kafka(
  32. payload_type=PayloadType.OCCURRENCE,
  33. occurrence=self.build_occurrence(),
  34. event_data={"event_id": uuid.uuid4().hex},
  35. )
  36. def test_with_event(self) -> None:
  37. occurrence = self.build_occurrence(project_id=self.project.id)
  38. produce_occurrence_to_kafka(
  39. payload_type=PayloadType.OCCURRENCE,
  40. occurrence=occurrence,
  41. event_data={
  42. "event_id": occurrence.event_id,
  43. "project_id": self.project.id,
  44. "title": "some problem",
  45. "platform": "python",
  46. "tags": {"my_tag": "2"},
  47. "timestamp": before_now(minutes=1).isoformat(),
  48. "received": before_now(minutes=1).isoformat(),
  49. },
  50. )
  51. stored_occurrence = IssueOccurrence.fetch(occurrence.id, occurrence.project_id)
  52. assert stored_occurrence
  53. assert occurrence.event_id == stored_occurrence.event_id
  54. def test_without_payload_type(self) -> None:
  55. # Ensure the occurrence is processes without a payload_type too.
  56. occurrence = self.build_occurrence(project_id=self.project.id)
  57. produce_occurrence_to_kafka(
  58. occurrence=occurrence,
  59. event_data={
  60. "event_id": occurrence.event_id,
  61. "project_id": self.project.id,
  62. "title": "some problem",
  63. "platform": "python",
  64. "tags": {"my_tag": "2"},
  65. "timestamp": before_now(minutes=1).isoformat(),
  66. "received": before_now(minutes=1).isoformat(),
  67. },
  68. )
  69. stored_occurrence = IssueOccurrence.fetch(occurrence.id, occurrence.project_id)
  70. assert stored_occurrence
  71. assert occurrence.event_id == stored_occurrence.event_id
  72. def test_with_only_occurrence(self) -> None:
  73. event = self.store_event(data=load_data("transaction"), project_id=self.project.id)
  74. occurrence = self.build_occurrence(event_id=event.event_id, project_id=self.project.id)
  75. produce_occurrence_to_kafka(
  76. payload_type=PayloadType.OCCURRENCE,
  77. occurrence=occurrence,
  78. )
  79. stored_occurrence = IssueOccurrence.fetch(occurrence.id, occurrence.project_id)
  80. assert stored_occurrence
  81. assert occurrence.event_id == stored_occurrence.event_id
  82. @patch(
  83. "sentry.issues.producer._prepare_occurrence_message", return_value={"mock_data": "great"}
  84. )
  85. @patch("sentry.issues.producer._occurrence_producer.produce")
  86. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  87. def test_payload_sent_to_kafka_with_partition_key(
  88. self, mock_produce: MagicMock, mock_prepare_occurrence_message: MagicMock
  89. ) -> None:
  90. occurrence = self.build_occurrence(project_id=self.project.id, fingerprint=["group-1"])
  91. produce_occurrence_to_kafka(
  92. payload_type=PayloadType.OCCURRENCE,
  93. occurrence=occurrence,
  94. event_data={},
  95. )
  96. mock_produce.assert_called_once_with(
  97. ArroyoTopic(name="ingest-occurrences"),
  98. KafkaPayload(
  99. f"{occurrence.fingerprint[0]}-{occurrence.project_id}".encode(),
  100. json.dumps({"mock_data": "great"}).encode("utf-8"),
  101. [],
  102. ),
  103. )
  104. @patch(
  105. "sentry.issues.producer._prepare_occurrence_message", return_value={"mock_data": "great"}
  106. )
  107. @patch("sentry.issues.producer._occurrence_producer.produce")
  108. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  109. def test_payload_sent_to_kafka_with_partition_key_no_fingerprint(
  110. self, mock_produce: MagicMock, mock_prepare_occurrence_message: MagicMock
  111. ) -> None:
  112. occurrence = self.build_occurrence(project_id=self.project.id, fingerprint=[])
  113. produce_occurrence_to_kafka(
  114. payload_type=PayloadType.OCCURRENCE,
  115. occurrence=occurrence,
  116. event_data={},
  117. )
  118. mock_produce.assert_called_once_with(
  119. ArroyoTopic(name="ingest-occurrences"),
  120. KafkaPayload(None, json.dumps({"mock_data": "great"}).encode("utf-8"), []),
  121. )
  122. @patch(
  123. "sentry.issues.producer._prepare_occurrence_message", return_value={"mock_data": "great"}
  124. )
  125. @patch("sentry.issues.producer._occurrence_producer.produce")
  126. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  127. def test_payload_sent_to_kafka_with_partition_key_no_occurrence(
  128. self, mock_produce: MagicMock, mock_prepare_occurrence_message: MagicMock
  129. ) -> None:
  130. produce_occurrence_to_kafka(
  131. payload_type=PayloadType.OCCURRENCE,
  132. occurrence=None,
  133. event_data={},
  134. )
  135. mock_produce.assert_called_once_with(
  136. ArroyoTopic(name="ingest-occurrences"),
  137. KafkaPayload(None, json.dumps({"mock_data": "great"}).encode("utf-8"), []),
  138. )
  139. class TestProduceOccurrenceForStatusChange(TestCase, OccurrenceTestMixin):
  140. def setUp(self) -> None:
  141. self.fingerprint = ["group-1"]
  142. self.event = self.store_event(
  143. data={
  144. "event_id": "a" * 32,
  145. "message": "oh no",
  146. "timestamp": datetime.now().isoformat(),
  147. "fingerprint": self.fingerprint,
  148. },
  149. project_id=self.project.id,
  150. )
  151. self.group = self.event.group
  152. assert self.group
  153. self.group.substatus = GroupSubStatus.ONGOING
  154. self.group.save()
  155. self.initial_status = self.group.status
  156. self.initial_substatus = self.group.substatus
  157. def test_with_invalid_payloads(self) -> None:
  158. with pytest.raises(ValueError, match="occurrence must be provided"):
  159. # Should raise an error because the occurrence is not provided for the OCCURRENCE payload type.
  160. produce_occurrence_to_kafka(
  161. payload_type=PayloadType.OCCURRENCE,
  162. )
  163. with pytest.raises(ValueError, match="status_change must be provided"):
  164. # Should raise an error because the status_change object is not provided for the STATUS_CHANGE payload type.
  165. produce_occurrence_to_kafka(
  166. payload_type=PayloadType.STATUS_CHANGE,
  167. )
  168. with pytest.raises(NotImplementedError, match="Unknown payload type: invalid"):
  169. # Should raise an error because the payload type is not supported.
  170. produce_occurrence_to_kafka(payload_type="invalid") # type: ignore[arg-type]
  171. def test_with_no_status_change(self) -> None:
  172. status_change = StatusChangeMessage(
  173. fingerprint=self.fingerprint,
  174. project_id=self.group.project_id,
  175. new_status=self.initial_status,
  176. new_substatus=self.initial_substatus,
  177. )
  178. produce_occurrence_to_kafka(
  179. payload_type=PayloadType.STATUS_CHANGE,
  180. status_change=status_change,
  181. )
  182. self.group.refresh_from_db()
  183. assert self.group.status == self.initial_status
  184. assert self.group.substatus == self.initial_substatus
  185. assert not Activity.objects.filter(group=self.group).exists()
  186. assert not GroupHistory.objects.filter(group=self.group).exists()
  187. def test_with_status_change_resolved(self) -> None:
  188. status_change = StatusChangeMessage(
  189. fingerprint=self.fingerprint,
  190. project_id=self.group.project_id,
  191. new_status=GroupStatus.RESOLVED,
  192. new_substatus=None,
  193. )
  194. produce_occurrence_to_kafka(
  195. payload_type=PayloadType.STATUS_CHANGE,
  196. status_change=status_change,
  197. )
  198. self.group.refresh_from_db()
  199. assert self.group.status == GroupStatus.RESOLVED
  200. assert self.group.substatus is None
  201. assert Activity.objects.filter(
  202. group=self.group, type=ActivityType.SET_RESOLVED.value
  203. ).exists()
  204. assert GroupHistory.objects.filter(
  205. group=self.group, status=GroupHistoryStatus.RESOLVED
  206. ).exists()
  207. def test_with_status_change_archived(self) -> None:
  208. for substatus in [
  209. GroupSubStatus.UNTIL_ESCALATING,
  210. GroupSubStatus.UNTIL_CONDITION_MET,
  211. GroupSubStatus.FOREVER,
  212. ]:
  213. status_change = StatusChangeMessage(
  214. fingerprint=self.fingerprint,
  215. project_id=self.group.project_id,
  216. new_status=GroupStatus.IGNORED,
  217. new_substatus=substatus,
  218. )
  219. produce_occurrence_to_kafka(
  220. payload_type=PayloadType.STATUS_CHANGE,
  221. status_change=status_change,
  222. )
  223. self.group.refresh_from_db()
  224. assert self.group.status == GroupStatus.IGNORED
  225. assert self.group.substatus == substatus
  226. assert Activity.objects.filter(
  227. group=self.group, type=ActivityType.SET_IGNORED.value
  228. ).exists()
  229. gh_status = GROUP_SUBSTATUS_TO_GROUP_HISTORY_STATUS[substatus]
  230. assert GroupHistory.objects.filter(
  231. group=self.group,
  232. status=STRING_TO_STATUS_LOOKUP[gh_status],
  233. ).exists()
  234. def test_with_status_change_unresolved(self) -> None:
  235. # We modify a single group through different substatuses that are supported in the UI
  236. # to ensure the status change is processed correctly.
  237. self.group.update(status=GroupStatus.IGNORED, substatus=GroupSubStatus.UNTIL_ESCALATING)
  238. for substatus, activity_type in [
  239. (GroupSubStatus.ESCALATING, ActivityType.SET_ESCALATING),
  240. (GroupSubStatus.ONGOING, ActivityType.AUTO_SET_ONGOING),
  241. (GroupSubStatus.REGRESSED, ActivityType.SET_REGRESSION),
  242. (GroupSubStatus.ONGOING, ActivityType.SET_UNRESOLVED),
  243. ]:
  244. # Produce the status change message and process it
  245. status_change = StatusChangeMessage(
  246. fingerprint=self.fingerprint,
  247. project_id=self.group.project_id,
  248. new_status=GroupStatus.UNRESOLVED,
  249. new_substatus=substatus,
  250. )
  251. produce_occurrence_to_kafka(
  252. payload_type=PayloadType.STATUS_CHANGE,
  253. status_change=status_change,
  254. )
  255. self.group.refresh_from_db()
  256. assert self.group.status == GroupStatus.UNRESOLVED
  257. assert self.group.substatus == substatus
  258. assert Activity.objects.filter(group=self.group, type=activity_type.value).exists()
  259. gh_status = GROUP_SUBSTATUS_TO_GROUP_HISTORY_STATUS[substatus]
  260. assert GroupHistory.objects.filter(
  261. group=self.group,
  262. status=STRING_TO_STATUS_LOOKUP[gh_status],
  263. ).exists()
  264. @patch("sentry.issues.status_change_consumer.logger.error")
  265. def test_with_invalid_status_change(self, mock_logger_error: MagicMock) -> None:
  266. for status, substatus, error_msg in [
  267. (
  268. GroupStatus.RESOLVED,
  269. GroupSubStatus.FOREVER,
  270. "group.update_status.unexpected_substatus",
  271. ),
  272. (GroupStatus.IGNORED, None, "group.update_status.missing_substatus"),
  273. (
  274. GroupStatus.IGNORED,
  275. GroupSubStatus.REGRESSED,
  276. "group.update_status.invalid_substatus",
  277. ),
  278. (GroupStatus.UNRESOLVED, GroupSubStatus.NEW, "group.update_status.invalid_substatus"),
  279. ]:
  280. bad_status_change = StatusChangeMessage(
  281. fingerprint=self.fingerprint,
  282. project_id=self.group.project_id,
  283. new_status=status,
  284. new_substatus=substatus,
  285. )
  286. produce_occurrence_to_kafka(
  287. payload_type=PayloadType.STATUS_CHANGE,
  288. status_change=bad_status_change,
  289. )
  290. processed_fingerprint = {"fingerprint": ["group-1"]}
  291. process_occurrence_data(processed_fingerprint)
  292. self.group.refresh_from_db()
  293. mock_logger_error.assert_called_with(
  294. error_msg,
  295. extra={
  296. "project_id": self.group.project_id,
  297. "fingerprint": processed_fingerprint["fingerprint"],
  298. "new_status": status,
  299. "new_substatus": substatus,
  300. },
  301. )
  302. assert self.group.status == self.initial_status
  303. assert self.group.substatus == self.initial_substatus
  304. @patch("sentry.issues.status_change_consumer.metrics.incr")
  305. def test_invalid_hashes(self, mock_metrics_incr: MagicMock) -> None:
  306. event = self.store_event(
  307. data={
  308. "event_id": "a" * 32,
  309. "message": "oh no",
  310. "timestamp": datetime.now().isoformat(),
  311. "fingerprint": ["group-2"],
  312. },
  313. project_id=self.project.id,
  314. )
  315. group = event.group
  316. assert group
  317. initial_status = group.status
  318. initial_substatus = group.substatus
  319. wrong_fingerprint = {"fingerprint": ["wronghash"]}
  320. process_occurrence_data(wrong_fingerprint)
  321. bad_status_change_resolve = StatusChangeMessage(
  322. fingerprint=["wronghash"],
  323. project_id=group.project_id,
  324. new_status=GroupStatus.RESOLVED,
  325. new_substatus=GroupSubStatus.FOREVER,
  326. )
  327. produce_occurrence_to_kafka(
  328. payload_type=PayloadType.STATUS_CHANGE,
  329. status_change=bad_status_change_resolve,
  330. )
  331. group.refresh_from_db()
  332. mock_metrics_incr.assert_any_call("occurrence_ingest.grouphash.not_found")
  333. assert group.status == initial_status
  334. assert group.substatus == initial_substatus
  335. def test_generate_status_changes_id(self) -> None:
  336. status_change_1 = StatusChangeMessage(
  337. fingerprint=["status-change-1"],
  338. project_id=self.project.id,
  339. new_status=GroupStatus.RESOLVED,
  340. new_substatus=GroupSubStatus.FOREVER,
  341. )
  342. status_change_2 = StatusChangeMessage(
  343. fingerprint=["status-change-2"],
  344. project_id=self.project.id,
  345. new_status=GroupStatus.RESOLVED,
  346. new_substatus=GroupSubStatus.FOREVER,
  347. )
  348. assert status_change_1.id
  349. assert status_change_1.id != status_change_2.id
  350. @patch(
  351. "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
  352. )
  353. @patch("sentry.issues.producer._occurrence_producer.produce")
  354. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  355. def test_payload_sent_to_kafka_with_partition_key(
  356. self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
  357. ) -> None:
  358. status_change = StatusChangeMessage(
  359. project_id=self.project.id,
  360. fingerprint=["group-1"],
  361. new_status=GroupStatus.RESOLVED,
  362. new_substatus=GroupSubStatus.FOREVER,
  363. )
  364. produce_occurrence_to_kafka(
  365. payload_type=PayloadType.STATUS_CHANGE,
  366. status_change=status_change,
  367. event_data={},
  368. )
  369. mock_produce.assert_called_once_with(
  370. ArroyoTopic(name="ingest-occurrences"),
  371. KafkaPayload(
  372. f"{status_change.fingerprint[0]}-{status_change.project_id}".encode(),
  373. json.dumps({"mock_data": "great"}).encode("utf-8"),
  374. [],
  375. ),
  376. )
  377. @patch(
  378. "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
  379. )
  380. @patch("sentry.issues.producer._occurrence_producer.produce")
  381. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  382. def test_payload_sent_to_kafka_with_partition_key_no_fingerprint(
  383. self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
  384. ) -> None:
  385. status_change = StatusChangeMessage(
  386. project_id=self.project.id,
  387. fingerprint=[],
  388. new_status=GroupStatus.RESOLVED,
  389. new_substatus=GroupSubStatus.FOREVER,
  390. )
  391. produce_occurrence_to_kafka(
  392. payload_type=PayloadType.STATUS_CHANGE,
  393. status_change=status_change,
  394. event_data={},
  395. )
  396. mock_produce.assert_called_once_with(
  397. ArroyoTopic(name="ingest-occurrences"),
  398. KafkaPayload(
  399. None,
  400. json.dumps({"mock_data": "great"}).encode("utf-8"),
  401. [],
  402. ),
  403. )
  404. @patch(
  405. "sentry.issues.producer._prepare_status_change_message", return_value={"mock_data": "great"}
  406. )
  407. @patch("sentry.issues.producer._occurrence_producer.produce")
  408. @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
  409. def test_payload_sent_to_kafka_with_partition_key_no_status_change(
  410. self, mock_produce: MagicMock, mock_prepare_status_change_message: MagicMock
  411. ) -> None:
  412. produce_occurrence_to_kafka(
  413. payload_type=PayloadType.STATUS_CHANGE,
  414. status_change=None,
  415. event_data={},
  416. )
  417. mock_produce.assert_called_once_with(
  418. ArroyoTopic(name="ingest-occurrences"),
  419. KafkaPayload(
  420. None,
  421. json.dumps({"mock_data": "great"}).encode("utf-8"),
  422. [],
  423. ),
  424. )