test_unmerge.py 21 KB


  1. from __future__ import annotations
  2. import functools
  3. import hashlib
  4. import itertools
  5. import logging
  6. import uuid
  7. from datetime import timedelta
  8. from unittest import mock
  9. from unittest.mock import patch
  10. from django.utils import timezone
  11. from sentry import eventstream, tsdb
  12. from sentry.eventstore.models import Event
  13. from sentry.models.environment import Environment
  14. from sentry.models.group import Group
  15. from sentry.models.grouphash import GroupHash
  16. from sentry.models.grouprelease import GroupRelease
  17. from sentry.models.release import Release
  18. from sentry.models.userreport import UserReport
  19. from sentry.similarity import _make_index_backend, features
  20. from sentry.tasks.merge import merge_groups
  21. from sentry.tasks.unmerge import (
  22. get_caches,
  23. get_event_user_from_interface,
  24. get_fingerprint,
  25. get_group_backfill_attributes,
  26. get_group_creation_attributes,
  27. unmerge,
  28. )
  29. from sentry.testutils.cases import SnubaTestCase, TestCase
  30. from sentry.testutils.helpers.datetime import before_now
  31. from sentry.testutils.helpers.features import with_feature
  32. from sentry.tsdb.base import TSDBModel
  33. from sentry.utils import redis
  34. # Use the default redis client as a cluster client in the similarity index
  35. index = _make_index_backend(redis.clusters.get("default").get_local_client(0))
  36. @patch.object(features, "index", new=index)
  37. class UnmergeTestCase(TestCase, SnubaTestCase):
  38. def test_get_fingerprint(self):
  39. assert (
  40. get_fingerprint(
  41. self.store_event(data={"message": "Hello world"}, project_id=self.project.id)
  42. )
  43. == hashlib.md5(b"Hello world").hexdigest()
  44. )
  45. assert (
  46. get_fingerprint(
  47. self.store_event(
  48. data={"message": "Hello world", "fingerprint": ["Not hello world"]},
  49. project_id=self.project.id,
  50. )
  51. )
  52. == hashlib.md5(b"Not hello world").hexdigest()
  53. )
  54. def test_get_group_creation_attributes(self):
  55. now = timezone.now().replace(microsecond=0)
  56. e1 = self.store_event(
  57. data={
  58. "fingerprint": ["group1"],
  59. "platform": "javascript",
  60. "message": "Hello from JavaScript",
  61. "type": "default",
  62. "level": "info",
  63. "tags": {"logger": "javascript"},
  64. "timestamp": now.isoformat(),
  65. },
  66. project_id=self.project.id,
  67. )
  68. e2 = self.store_event(
  69. data={
  70. "fingerprint": ["group1"],
  71. "platform": "python",
  72. "message": "Hello from Python",
  73. "type": "default",
  74. "level": "error",
  75. "tags": {"logger": "python"},
  76. "timestamp": now.isoformat(),
  77. },
  78. project_id=self.project.id,
  79. )
  80. e3 = self.store_event(
  81. data={
  82. "fingerprint": ["group1"],
  83. "platform": "java",
  84. "message": "Hello from Java",
  85. "type": "default",
  86. "level": "debug",
  87. "tags": {"logger": "java"},
  88. "timestamp": now.isoformat(),
  89. },
  90. project_id=self.project.id,
  91. )
  92. events = [e1, e2, e3]
  93. assert get_group_creation_attributes(get_caches(), e1.group, events) == {
  94. "active_at": now,
  95. "first_seen": now,
  96. "last_seen": now,
  97. "platform": "java",
  98. "message": "Hello from JavaScript",
  99. "level": logging.INFO,
  100. "logger": "java",
  101. "times_seen": 3,
  102. "first_release": None,
  103. "culprit": "",
  104. "data": {
  105. "type": "default",
  106. "last_received": e1.data["received"],
  107. "metadata": {"title": "Hello from JavaScript"},
  108. },
  109. "status": e1.group.status,
  110. "substatus": e1.group.substatus,
  111. }
  112. def test_get_group_backfill_attributes(self):
  113. now = timezone.now().replace(microsecond=0)
  114. assert get_group_backfill_attributes(
  115. get_caches(),
  116. Group(
  117. active_at=now,
  118. first_seen=now,
  119. last_seen=now,
  120. platform="javascript",
  121. message="Hello from JavaScript",
  122. level=logging.INFO,
  123. logger="javascript",
  124. times_seen=1,
  125. first_release=None,
  126. culprit="",
  127. data={"type": "default", "last_received": now.timestamp(), "metadata": {}},
  128. ),
  129. [
  130. self.store_event(
  131. data={
  132. "platform": "python",
  133. "message": "Hello from Python",
  134. "timestamp": (now - timedelta(hours=1)).isoformat(),
  135. "type": "default",
  136. "level": "debug",
  137. "tags": {"logger": "java"},
  138. },
  139. project_id=self.project.id,
  140. ),
  141. self.store_event(
  142. data={
  143. "platform": "java",
  144. "message": "Hello from Java",
  145. "timestamp": (now - timedelta(hours=2)).isoformat(),
  146. "type": "default",
  147. "level": "debug",
  148. "tags": {"logger": "java"},
  149. },
  150. project_id=self.project.id,
  151. ),
  152. ],
  153. ) == {
  154. "active_at": now - timedelta(hours=2),
  155. "first_seen": now - timedelta(hours=2),
  156. "platform": "java",
  157. "logger": "java",
  158. "times_seen": 3,
  159. "first_release": None,
  160. }
  161. @with_feature("projects:similarity-indexing")
  162. @mock.patch("sentry.analytics.record")
  163. def test_unmerge(self, mock_record):
  164. now = before_now(minutes=5).replace(microsecond=0)
  165. def time_from_now(offset=0):
  166. return now + timedelta(seconds=offset)
  167. project = self.create_project()
  168. project.date_added = timezone.now() - timedelta(minutes=10)
  169. project.save()
  170. sequence = itertools.count(0)
  171. tag_values = itertools.cycle(["red", "green", "blue"])
  172. user_values = itertools.cycle([{"id": 1}, {"id": 2}])
  173. def create_message_event(
  174. template, parameters, environment, release, fingerprint="group1"
  175. ) -> Event:
  176. i = next(sequence)
  177. event_id = uuid.UUID(fields=(i, 0x0, 0x1000, 0x80, 0x80, 0x808080808080)).hex
  178. tags = [["color", next(tag_values)]]
  179. if release:
  180. tags.append(["sentry:release", release])
  181. event = self.store_event(
  182. data={
  183. "event_id": event_id,
  184. "message": template % parameters,
  185. "type": "default",
  186. "user": next(user_values),
  187. "tags": tags,
  188. "fingerprint": [fingerprint],
  189. "timestamp": (now + timedelta(seconds=i)).isoformat(),
  190. "environment": environment,
  191. "release": release,
  192. },
  193. project_id=project.id,
  194. )
  195. UserReport.objects.create(
  196. project_id=project.id,
  197. group_id=event.group.id,
  198. event_id=event_id,
  199. name="Log Hat",
  200. email="ceo@corptron.com",
  201. comments="Quack",
  202. )
  203. features.record([event])
  204. return event
  205. events: dict[str | None, list[Event]] = {}
  206. for event in (
  207. create_message_event(
  208. "This is message #%s.", i, environment="production", release="version"
  209. )
  210. for i in range(10)
  211. ):
  212. events.setdefault(get_fingerprint(event), []).append(event)
  213. for event in (
  214. create_message_event(
  215. "This is message #%s!",
  216. i,
  217. environment="production",
  218. release="version2",
  219. fingerprint="group2",
  220. )
  221. for i in range(10, 16)
  222. ):
  223. events.setdefault(get_fingerprint(event), []).append(event)
  224. event = create_message_event(
  225. "This is message #%s!",
  226. 17,
  227. environment="staging",
  228. release="version3",
  229. fingerprint="group3",
  230. )
  231. events.setdefault(get_fingerprint(event), []).append(event)
  232. merge_source, source, destination = list(Group.objects.all())
  233. assert len(events) == 3
  234. assert sum(len(x) for x in events.values()) == 17
  235. production_environment = Environment.objects.get(
  236. organization_id=project.organization_id, name="production"
  237. )
  238. with self.tasks():
  239. eventstream_state = eventstream.backend.start_merge(
  240. project.id, [merge_source.id], source.id
  241. )
  242. merge_groups.delay([merge_source.id], source.id)
  243. eventstream.backend.end_merge(eventstream_state)
  244. similar_items = features.compare(source)
  245. assert len(similar_items) == 2
  246. assert similar_items[0][0] == source.id
  247. assert similar_items[0][1]["message:message:character-shingles"] == 1.0
  248. assert similar_items[1][0] == destination.id
  249. assert similar_items[1][1]["message:message:character-shingles"] < 1.0
  250. with self.tasks():
  251. unmerge.delay(
  252. project.id, source.id, destination.id, [list(events.keys())[0]], None, batch_size=5
  253. )
  254. assert (
  255. list(
  256. Group.objects.filter(id=merge_source.id).values_list(
  257. "times_seen", "first_seen", "last_seen"
  258. )
  259. )
  260. == []
  261. )
  262. assert list(
  263. Group.objects.filter(id=source.id).values_list("times_seen", "first_seen", "last_seen")
  264. ) == [(6, time_from_now(10), time_from_now(15))]
  265. assert list(
  266. Group.objects.filter(id=destination.id).values_list(
  267. "times_seen", "first_seen", "last_seen"
  268. )
  269. ) == [(11, time_from_now(0), time_from_now(16))]
  270. assert source.id != destination.id
  271. assert source.project == destination.project
  272. destination_event_ids = set(map(lambda event: event.event_id, list(events.values())[1]))
  273. assert destination_event_ids == set(
  274. UserReport.objects.filter(group_id=source.id).values_list("event_id", flat=True)
  275. )
  276. assert list(
  277. GroupHash.objects.filter(group_id=source.id).values_list("hash", flat=True)
  278. ) == [list(events.keys())[1]]
  279. assert set(
  280. GroupRelease.objects.filter(group_id=source.id).values_list(
  281. "environment", "first_seen", "last_seen"
  282. )
  283. ) == {("production", time_from_now(10), time_from_now(15))}
  284. destination_event_ids = set(
  285. map(lambda event: event.event_id, list(events.values())[0] + list(events.values())[2])
  286. )
  287. assert destination_event_ids == set(
  288. UserReport.objects.filter(group_id=destination.id).values_list("event_id", flat=True)
  289. )
  290. assert set(
  291. GroupHash.objects.filter(group_id=destination.id).values_list("hash", flat=True)
  292. ) == {list(events.keys())[0], list(events.keys())[2]}
  293. assert set(
  294. GroupRelease.objects.filter(group_id=destination.id).values_list(
  295. "environment", "first_seen", "last_seen"
  296. )
  297. ) == {
  298. ("production", time_from_now(0), time_from_now(9)),
  299. ("staging", time_from_now(16), time_from_now(16)),
  300. }
  301. rollup_duration = 3600
  302. time_series = tsdb.backend.get_range(
  303. TSDBModel.group,
  304. [source.id, destination.id],
  305. now - timedelta(seconds=rollup_duration),
  306. time_from_now(17),
  307. rollup_duration,
  308. tenant_ids={"referrer": "get_range", "organization_id": 1},
  309. )
  310. environment_time_series = tsdb.backend.get_range(
  311. TSDBModel.group,
  312. [source.id, destination.id],
  313. now - timedelta(seconds=rollup_duration),
  314. time_from_now(17),
  315. rollup_duration,
  316. environment_ids=[production_environment.id],
  317. tenant_ids={"referrer": "get_range", "organization_id": 1},
  318. )
  319. def get_expected_series_values(rollup, events, function=None):
  320. if function is None:
  321. def function(aggregate, event):
  322. return (aggregate if aggregate is not None else 0) + 1
  323. expected: dict[float, float] = {}
  324. for event in events:
  325. k = float((event.datetime.timestamp() // rollup_duration) * rollup_duration)
  326. expected[k] = function(expected.get(k), event)
  327. return expected
  328. def assert_series_contains(expected, actual, default=0):
  329. actual = dict(actual)
  330. for key, value in expected.items():
  331. assert actual.get(key, 0) == value
  332. for key in set(actual.keys()) - set(expected.keys()):
  333. assert actual.get(key, 0) == default
  334. assert_series_contains(
  335. get_expected_series_values(rollup_duration, list(events.values())[1]),
  336. time_series[source.id],
  337. 0,
  338. )
  339. assert_series_contains(
  340. get_expected_series_values(
  341. rollup_duration, list(events.values())[0] + list(events.values())[2]
  342. ),
  343. time_series[destination.id],
  344. 0,
  345. )
  346. assert_series_contains(
  347. get_expected_series_values(rollup_duration, list(events.values())[1]),
  348. environment_time_series[source.id],
  349. 0,
  350. )
  351. assert_series_contains(
  352. get_expected_series_values(
  353. rollup_duration, list(events.values())[0][:-1] + list(events.values())[2]
  354. ),
  355. environment_time_series[destination.id],
  356. 0,
  357. )
  358. time_series = tsdb.backend.get_distinct_counts_series(
  359. TSDBModel.users_affected_by_group,
  360. [source.id, destination.id],
  361. now - timedelta(seconds=rollup_duration),
  362. time_from_now(17),
  363. rollup_duration,
  364. tenant_ids={"referrer": "r", "organization_id": 1234},
  365. )
  366. environment_time_series = tsdb.backend.get_distinct_counts_series(
  367. TSDBModel.users_affected_by_group,
  368. [source.id, destination.id],
  369. now - timedelta(seconds=rollup_duration),
  370. time_from_now(17),
  371. rollup_duration,
  372. environment_id=production_environment.id,
  373. tenant_ids={"referrer": "r", "organization_id": 1234},
  374. )
  375. def collect_by_user_tag(aggregate, event):
  376. aggregate = aggregate if aggregate is not None else set()
  377. aggregate.add(
  378. get_event_user_from_interface(event.data["user"], event.group.project).tag_value
  379. )
  380. mock_record.assert_called_with(
  381. "eventuser_endpoint.request",
  382. project_id=event.group.project.id,
  383. endpoint="sentry.tasks.unmerge.get_event_user_from_interface",
  384. )
  385. return aggregate
  386. for series in [time_series, environment_time_series]:
  387. assert_series_contains(
  388. {
  389. timestamp: len(values)
  390. for timestamp, values in get_expected_series_values(
  391. rollup_duration, list(events.values())[1], collect_by_user_tag
  392. ).items()
  393. },
  394. series[source.id],
  395. )
  396. assert_series_contains(
  397. {
  398. timestamp: len(values)
  399. for timestamp, values in get_expected_series_values(
  400. rollup_duration,
  401. list(events.values())[0] + list(events.values())[2],
  402. collect_by_user_tag,
  403. ).items()
  404. },
  405. time_series[destination.id],
  406. )
  407. def strip_zeroes(data):
  408. for group_id, series in data.items():
  409. for _, values in series:
  410. for key, val in list(values.items()):
  411. if val == 0:
  412. values.pop(key)
  413. return data
  414. def collect_by_release(group, aggregate, event):
  415. aggregate = aggregate if aggregate is not None else {}
  416. release = event.get_tag("sentry:release")
  417. assert release
  418. release = GroupRelease.objects.get(
  419. group_id=group.id,
  420. environment=event.data["environment"],
  421. release_id=Release.objects.get(
  422. organization_id=project.organization_id, version=release
  423. ).id,
  424. ).id
  425. aggregate[release] = aggregate.get(release, 0) + 1
  426. return aggregate
  427. items = {}
  428. for i in [source.id, destination.id]:
  429. items[i] = list(GroupRelease.objects.filter(group_id=i).values_list("id", flat=True))
  430. time_series = strip_zeroes(
  431. tsdb.backend.get_frequency_series(
  432. TSDBModel.frequent_releases_by_group,
  433. items,
  434. now - timedelta(seconds=rollup_duration),
  435. time_from_now(17),
  436. rollup_duration,
  437. tenant_ids={"referrer": "r", "organization_id": 1234},
  438. )
  439. )
  440. assert_series_contains(
  441. get_expected_series_values(
  442. rollup_duration,
  443. list(events.values())[1],
  444. functools.partial(collect_by_release, source),
  445. ),
  446. time_series[source.id],
  447. {},
  448. )
  449. assert_series_contains(
  450. get_expected_series_values(
  451. rollup_duration,
  452. list(events.values())[0] + list(events.values())[2],
  453. functools.partial(collect_by_release, destination),
  454. ),
  455. time_series[destination.id],
  456. {},
  457. )
  458. items = {}
  459. for i in [source.id, destination.id]:
  460. items[i] = list(Environment.objects.all().values_list("id", flat=True))
  461. time_series = strip_zeroes(
  462. tsdb.backend.get_frequency_series(
  463. TSDBModel.frequent_environments_by_group,
  464. items,
  465. now - timedelta(seconds=rollup_duration),
  466. time_from_now(17),
  467. rollup_duration,
  468. tenant_ids={"referrer": "r", "organization_id": 1234},
  469. )
  470. )
  471. def collect_by_environment(aggregate, event):
  472. aggregate = aggregate if aggregate is not None else {}
  473. environment = Environment.objects.get(
  474. organization_id=project.organization_id, name=event.data["environment"]
  475. ).id
  476. aggregate[environment] = aggregate.get(environment, 0) + 1
  477. return aggregate
  478. assert_series_contains(
  479. get_expected_series_values(
  480. rollup_duration, list(events.values())[1], collect_by_environment
  481. ),
  482. time_series[source.id],
  483. {},
  484. )
  485. assert_series_contains(
  486. get_expected_series_values(
  487. rollup_duration,
  488. list(events.values())[0] + list(events.values())[2],
  489. collect_by_environment,
  490. ),
  491. time_series[destination.id],
  492. {},
  493. )
  494. source_similar_items = features.compare(source)
  495. assert source_similar_items[0] == (
  496. source.id,
  497. {
  498. "exception:message:character-shingles": None,
  499. "exception:stacktrace:application-chunks": None,
  500. "exception:stacktrace:pairs": None,
  501. "message:message:character-shingles": 1.0,
  502. },
  503. )
  504. assert source_similar_items[1][0] == destination.id
  505. assert source_similar_items[1][1]["message:message:character-shingles"] < 1.0
  506. destination_similar_items = features.compare(destination)
  507. assert destination_similar_items[0] == (
  508. destination.id,
  509. {
  510. "exception:message:character-shingles": None,
  511. "exception:stacktrace:application-chunks": None,
  512. "exception:stacktrace:pairs": None,
  513. "message:message:character-shingles": 1.0,
  514. },
  515. )
  516. assert destination_similar_items[1][0] == source.id
  517. assert destination_similar_items[1][1]["message:message:character-shingles"] < 1.0