test_eventstream.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. from unittest.mock import Mock, patch
  5. from django.conf import settings
  6. from sentry.event_manager import EventManager
  7. from sentry.eventstream.kafka import KafkaEventStream
  8. from sentry.eventstream.snuba import SnubaEventStream
  9. from sentry.testutils import SnubaTestCase, TestCase
  10. from sentry.utils import json, snuba
  11. from sentry.utils.samples import load_data
  12. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  13. def setUp(self):
  14. super().setUp()
  15. self.kafka_eventstream = KafkaEventStream()
  16. self.kafka_eventstream.producer = Mock()
  17. def __build_event(self, timestamp):
  18. raw_event = {
  19. "event_id": "a" * 32,
  20. "message": "foo",
  21. "timestamp": time.mktime(timestamp.timetuple()),
  22. "level": logging.ERROR,
  23. "logger": "default",
  24. "tags": [],
  25. }
  26. manager = EventManager(raw_event)
  27. manager.normalize()
  28. return manager.save(self.project.id)
  29. def __build_transaction_event(self):
  30. manager = EventManager(load_data("transaction"))
  31. manager.normalize()
  32. return manager.save(self.project.id)
  33. def __produce_event(self, *insert_args, **insert_kwargs):
  34. # pass arguments on to Kafka EventManager
  35. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  36. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  37. assert not produce_args
  38. assert produce_kwargs["topic"] == settings.KAFKA_EVENTS
  39. assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
  40. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  41. assert version == 2
  42. assert type_ == "insert"
  43. # insert what would have been the Kafka payload directly
  44. # into Snuba, expect an HTTP 200 and for the event to now exist
  45. snuba_eventstream = SnubaEventStream()
  46. snuba_eventstream._send(
  47. self.project.id,
  48. "insert",
  49. (payload1, payload2),
  50. is_transaction_event=insert_kwargs["event"].get_event_type() == "transaction",
  51. )
  52. @patch("sentry.eventstream.insert")
  53. def test(self, mock_eventstream_insert):
  54. now = datetime.utcnow()
  55. event = self.__build_event(now)
  56. # verify eventstream was called by EventManager
  57. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  58. assert not insert_args
  59. assert insert_kwargs == {
  60. "event": event,
  61. "is_new_group_environment": True,
  62. "is_new": True,
  63. "is_regression": False,
  64. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  65. "skip_consume": False,
  66. "received_timestamp": event.data["received"],
  67. }
  68. self.__produce_event(*insert_args, **insert_kwargs)
  69. assert (
  70. snuba.query(
  71. start=now - timedelta(days=1),
  72. end=now + timedelta(days=1),
  73. groupby=["project_id"],
  74. filter_keys={"project_id": [self.project.id]},
  75. ).get(self.project.id, 0)
  76. == 1
  77. )
  78. @patch("sentry.eventstream.insert")
  79. def test_issueless(self, mock_eventstream_insert):
  80. now = datetime.utcnow()
  81. event = self.__build_transaction_event()
  82. event.group_id = None
  83. insert_args = ()
  84. insert_kwargs = {
  85. "event": event,
  86. "is_new_group_environment": True,
  87. "is_new": True,
  88. "is_regression": False,
  89. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  90. "skip_consume": False,
  91. "received_timestamp": event.data["received"],
  92. }
  93. self.__produce_event(*insert_args, **insert_kwargs)
  94. result = snuba.raw_query(
  95. dataset=snuba.Dataset.Transactions,
  96. start=now - timedelta(days=1),
  97. end=now + timedelta(days=1),
  98. selected_columns=["event_id"],
  99. groupby=None,
  100. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  101. )
  102. assert len(result["data"]) == 1
  103. @patch("sentry.eventstream.insert")
  104. def test_multiple_groups(self, mock_eventstream_insert):
  105. now = datetime.utcnow()
  106. event = self.__build_transaction_event()
  107. event.group_id = None
  108. event.group_ids = [self.group.id]
  109. insert_args = ()
  110. insert_kwargs = {
  111. "event": event,
  112. "is_new_group_environment": True,
  113. "is_new": True,
  114. "is_regression": False,
  115. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  116. "skip_consume": False,
  117. "received_timestamp": event.data["received"],
  118. }
  119. self.__produce_event(*insert_args, **insert_kwargs)
  120. result = snuba.raw_query(
  121. dataset=snuba.Dataset.Transactions,
  122. start=now - timedelta(days=1),
  123. end=now + timedelta(days=1),
  124. selected_columns=["event_id", "group_ids"],
  125. groupby=None,
  126. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  127. )
  128. assert len(result["data"]) == 1
  129. assert result["data"][0]["group_ids"] == [self.group.id]