test_eventstream.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. from unittest.mock import Mock, patch
  5. from sentry.event_manager import EventManager
  6. from sentry.eventstream.kafka import KafkaEventStream
  7. from sentry.eventstream.snuba import SnubaEventStream
  8. from sentry.testutils import SnubaTestCase, TestCase
  9. from sentry.utils import json, snuba
  10. from sentry.utils.samples import load_data
  11. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  12. def setUp(self):
  13. super().setUp()
  14. self.kafka_eventstream = KafkaEventStream()
  15. self.kafka_eventstream.producer = Mock()
  16. def __build_event(self, timestamp):
  17. raw_event = {
  18. "event_id": "a" * 32,
  19. "message": "foo",
  20. "timestamp": time.mktime(timestamp.timetuple()),
  21. "level": logging.ERROR,
  22. "logger": "default",
  23. "tags": [],
  24. }
  25. manager = EventManager(raw_event)
  26. manager.normalize()
  27. return manager.save(self.project.id)
  28. def __build_transaction_event(self):
  29. manager = EventManager(load_data("transaction"))
  30. manager.normalize()
  31. return manager.save(self.project.id)
  32. def __produce_event(self, *insert_args, **insert_kwargs):
  33. # pass arguments on to Kafka EventManager
  34. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  35. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  36. assert not produce_args
  37. assert produce_kwargs["topic"] == "events"
  38. assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
  39. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  40. assert version == 2
  41. assert type_ == "insert"
  42. # insert what would have been the Kafka payload directly
  43. # into Snuba, expect an HTTP 200 and for the event to now exist
  44. snuba_eventstream = SnubaEventStream()
  45. snuba_eventstream._send(self.project.id, "insert", (payload1, payload2))
  46. @patch("sentry.eventstream.insert")
  47. def test(self, mock_eventstream_insert):
  48. now = datetime.utcnow()
  49. event = self.__build_event(now)
  50. # verify eventstream was called by EventManager
  51. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  52. assert not insert_args
  53. assert insert_kwargs == {
  54. "event": event,
  55. "group": event.group,
  56. "is_new_group_environment": True,
  57. "is_new": True,
  58. "is_regression": False,
  59. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  60. "skip_consume": False,
  61. "received_timestamp": event.data["received"],
  62. }
  63. self.__produce_event(*insert_args, **insert_kwargs)
  64. assert (
  65. snuba.query(
  66. start=now - timedelta(days=1),
  67. end=now + timedelta(days=1),
  68. groupby=["project_id"],
  69. filter_keys={"project_id": [self.project.id]},
  70. ).get(self.project.id, 0)
  71. == 1
  72. )
  73. @patch("sentry.eventstream.insert")
  74. def test_issueless(self, mock_eventstream_insert):
  75. now = datetime.utcnow()
  76. event = self.__build_transaction_event()
  77. event.group_id = None
  78. insert_args = ()
  79. insert_kwargs = {
  80. "event": event,
  81. "group": None,
  82. "is_new_group_environment": True,
  83. "is_new": True,
  84. "is_regression": False,
  85. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  86. "skip_consume": False,
  87. "received_timestamp": event.data["received"],
  88. }
  89. self.__produce_event(*insert_args, **insert_kwargs)
  90. result = snuba.raw_query(
  91. dataset=snuba.Dataset.Transactions,
  92. start=now - timedelta(days=1),
  93. end=now + timedelta(days=1),
  94. selected_columns=["event_id"],
  95. groupby=None,
  96. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  97. )
  98. assert len(result["data"]) == 1