test_eventstream.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. from unittest.mock import Mock, patch
  5. from django.conf import settings
  6. from sentry.event_manager import EventManager
  7. from sentry.eventstream.kafka import KafkaEventStream
  8. from sentry.eventstream.snuba import SnubaEventStream
  9. from sentry.testutils import SnubaTestCase, TestCase
  10. from sentry.testutils.silo import region_silo_test
  11. from sentry.utils import json, snuba
  12. from sentry.utils.samples import load_data
  13. @region_silo_test
  14. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  15. def setUp(self):
  16. super().setUp()
  17. self.kafka_eventstream = KafkaEventStream()
  18. self.producer_mock = Mock()
  19. self.kafka_eventstream.get_producer = Mock(return_value=self.producer_mock)
  20. def __build_event(self, timestamp):
  21. raw_event = {
  22. "event_id": "a" * 32,
  23. "message": "foo",
  24. "timestamp": time.mktime(timestamp.timetuple()),
  25. "level": logging.ERROR,
  26. "logger": "default",
  27. "tags": [],
  28. }
  29. manager = EventManager(raw_event)
  30. manager.normalize()
  31. return manager.save(self.project.id)
  32. def __build_transaction_event(self):
  33. manager = EventManager(load_data("transaction"))
  34. manager.normalize()
  35. return manager.save(self.project.id)
  36. def __produce_event(self, *insert_args, **insert_kwargs):
  37. is_transaction_event = insert_kwargs["event"].get_event_type() == "transaction"
  38. # pass arguments on to Kafka EventManager
  39. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  40. producer = self.producer_mock
  41. produce_args, produce_kwargs = list(producer.produce.call_args)
  42. assert not produce_args
  43. assert produce_kwargs["topic"] == settings.KAFKA_EVENTS
  44. assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
  45. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  46. assert version == 2
  47. assert type_ == "insert"
  48. # insert what would have been the Kafka payload directly
  49. # into Snuba, expect an HTTP 200 and for the event to now exist
  50. snuba_eventstream = SnubaEventStream()
  51. snuba_eventstream._send(
  52. self.project.id,
  53. "insert",
  54. (payload1, payload2),
  55. is_transaction_event=is_transaction_event,
  56. )
  57. @patch("sentry.eventstream.insert")
  58. def test(self, mock_eventstream_insert):
  59. now = datetime.utcnow()
  60. event = self.__build_event(now)
  61. # verify eventstream was called by EventManager
  62. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  63. assert not insert_args
  64. assert insert_kwargs == {
  65. "event": event,
  66. "is_new_group_environment": True,
  67. "is_new": True,
  68. "is_regression": False,
  69. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  70. "skip_consume": False,
  71. "received_timestamp": event.data["received"],
  72. }
  73. self.__produce_event(*insert_args, **insert_kwargs)
  74. assert (
  75. snuba.query(
  76. start=now - timedelta(days=1),
  77. end=now + timedelta(days=1),
  78. groupby=["project_id"],
  79. filter_keys={"project_id": [self.project.id]},
  80. ).get(self.project.id, 0)
  81. == 1
  82. )
  83. @patch("sentry.eventstream.insert")
  84. def test_issueless(self, mock_eventstream_insert):
  85. now = datetime.utcnow()
  86. event = self.__build_transaction_event()
  87. event.group_id = None
  88. insert_args = ()
  89. insert_kwargs = {
  90. "event": event,
  91. "is_new_group_environment": True,
  92. "is_new": True,
  93. "is_regression": False,
  94. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  95. "skip_consume": False,
  96. "received_timestamp": event.data["received"],
  97. }
  98. self.__produce_event(*insert_args, **insert_kwargs)
  99. result = snuba.raw_query(
  100. dataset=snuba.Dataset.Transactions,
  101. start=now - timedelta(days=1),
  102. end=now + timedelta(days=1),
  103. selected_columns=["event_id"],
  104. groupby=None,
  105. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  106. )
  107. assert len(result["data"]) == 1
  108. @patch("sentry.eventstream.insert")
  109. def test_multiple_groups(self, mock_eventstream_insert):
  110. now = datetime.utcnow()
  111. event = self.__build_transaction_event()
  112. event.group_id = None
  113. event.groups = [self.group]
  114. insert_args = ()
  115. insert_kwargs = {
  116. "event": event,
  117. "is_new_group_environment": True,
  118. "is_new": True,
  119. "is_regression": False,
  120. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  121. "skip_consume": False,
  122. "received_timestamp": event.data["received"],
  123. }
  124. self.__produce_event(*insert_args, **insert_kwargs)
  125. result = snuba.raw_query(
  126. dataset=snuba.Dataset.Transactions,
  127. start=now - timedelta(days=1),
  128. end=now + timedelta(days=1),
  129. selected_columns=["event_id", "group_ids"],
  130. groupby=None,
  131. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  132. )
  133. assert len(result["data"]) == 1
  134. assert result["data"][0]["group_ids"] == [self.group.id]