test_eventstream_api.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. from unittest.mock import Mock, patch
  5. from django.conf import settings
  6. from sentry.event_manager import EventManager
  7. from sentry.eventstream.errors.backend import (
  8. ErrorsInsertData,
  9. KafkaErrorsEventStreamAPI,
  10. SnubaErrorsEventStreamAPI,
  11. )
  12. from sentry.eventstream.transactions.backend import (
  13. KafkaTransactionsEventStreamAPI,
  14. SnubaTransactionsEventStreamAPI,
  15. TransactionsInsertData,
  16. )
  17. from sentry.testutils import SnubaTestCase, TestCase
  18. from sentry.utils import json, snuba
  19. from sentry.utils.samples import load_data
  20. class TransactionsEventStreamTest(TestCase, SnubaTestCase):
  21. def setUp(self):
  22. super().setUp()
  23. self.kafka_eventstream = KafkaTransactionsEventStreamAPI()
  24. self.kafka_eventstream._backend.producer = Mock()
  25. def __build_transaction_event(self):
  26. manager = EventManager(load_data("transaction"))
  27. manager.normalize()
  28. return manager.save(self.project.id)
  29. def __produce_event(self, insert_data: TransactionsInsertData):
  30. self.kafka_eventstream.insert(insert_data)
  31. produce_args, produce_kwargs = list(
  32. self.kafka_eventstream._backend.producer.produce.call_args
  33. )
  34. assert not produce_args
  35. assert produce_kwargs["topic"] == settings.KAFKA_TRANSACTIONS
  36. # we don't semantically partition on transactions
  37. assert produce_kwargs["key"] is None
  38. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  39. assert version == 2
  40. assert type_ == "insert"
  41. # insert what would have been the Kafka payload directly
  42. # into Snuba, expect an HTTP 200 and for the event to now exist
  43. snuba_eventstream = SnubaTransactionsEventStreamAPI()
  44. snuba_eventstream._backend.send(
  45. self.project.id,
  46. "insert",
  47. (payload1, payload2),
  48. )
  49. @patch("sentry.eventstream.transactions.insert")
  50. def test_insert(self, mock_eventstream_insert):
  51. now = datetime.utcnow()
  52. event = self.__build_transaction_event()
  53. insert_data = TransactionsInsertData(
  54. event=event, received_timestamp=event.data["received"], skip_consume=False
  55. )
  56. self.__produce_event(insert_data)
  57. result = snuba.raw_query(
  58. dataset=snuba.Dataset.Transactions,
  59. start=now - timedelta(days=1),
  60. end=now + timedelta(days=1),
  61. selected_columns=["event_id"],
  62. groupby=None,
  63. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  64. )
  65. assert len(result["data"]) == 1
  66. class ErrorsEventStreamTest(TestCase, SnubaTestCase):
  67. def setUp(self):
  68. super().setUp()
  69. self.kafka_eventstream = KafkaErrorsEventStreamAPI()
  70. self.kafka_eventstream._backend.producer = Mock()
  71. def __build_event(self, timestamp):
  72. raw_event = {
  73. "event_id": "a" * 32,
  74. "message": "foo",
  75. "timestamp": time.mktime(timestamp.timetuple()),
  76. "level": logging.ERROR,
  77. "logger": "default",
  78. "tags": [],
  79. }
  80. manager = EventManager(raw_event)
  81. manager.normalize()
  82. return manager.save(self.project.id)
  83. def __produce_event(self, insert_data: TransactionsInsertData):
  84. # pass arguments on to Kafka EventManager
  85. self.kafka_eventstream.insert(insert_data)
  86. produce_args, produce_kwargs = list(
  87. self.kafka_eventstream._backend.producer.produce.call_args
  88. )
  89. assert not produce_args
  90. assert produce_kwargs["topic"] == settings.KAFKA_EVENTS
  91. assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
  92. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  93. assert version == 2
  94. assert type_ == "insert"
  95. # insert what would have been the Kafka payload directly
  96. # into Snuba, expect an HTTP 200 and for the event to now exist
  97. snuba_eventstream = SnubaErrorsEventStreamAPI()
  98. snuba_eventstream._backend.send(
  99. self.project.id,
  100. "insert",
  101. (payload1, payload2),
  102. )
  103. @patch("sentry.eventstream.errors.insert")
  104. def test_insert(self, mock_eventstream_insert):
  105. now = datetime.utcnow()
  106. event = self.__build_event(now)
  107. insert_data = ErrorsInsertData(
  108. event=event,
  109. group=event.group,
  110. is_new=True,
  111. is_new_group_environment=True,
  112. is_regression=False,
  113. primary_hash="acbd18db4cc2f85cedef654fccc4a4d8",
  114. received_timestamp=event.data["received"],
  115. skip_consume=False,
  116. )
  117. # verify eventstream was called by EventManager
  118. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  119. assert not insert_args
  120. assert insert_kwargs == {"data": insert_data}
  121. self.__produce_event(insert_data)
  122. assert (
  123. snuba.query(
  124. start=now - timedelta(days=1),
  125. end=now + timedelta(days=1),
  126. groupby=["project_id"],
  127. filter_keys={"project_id": [self.project.id]},
  128. ).get(self.project.id, 0)
  129. == 1
  130. )