test_eventstream.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import logging
  2. import time
  3. from datetime import datetime, timedelta
  4. from sentry.event_manager import EventManager
  5. from sentry.eventstream.kafka import KafkaEventStream
  6. from sentry.eventstream.snuba import SnubaEventStream
  7. from sentry.testutils import SnubaTestCase, TestCase
  8. from sentry.utils import json, snuba
  9. from sentry.utils.compat.mock import Mock, patch
  10. from sentry.utils.samples import load_data
  11. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  12. def setUp(self):
  13. super().setUp()
  14. self.kafka_eventstream = KafkaEventStream()
  15. self.kafka_eventstream.producer = Mock()
  16. def __build_event(self, timestamp):
  17. raw_event = {
  18. "event_id": "a" * 32,
  19. "message": "foo",
  20. "timestamp": time.mktime(timestamp.timetuple()),
  21. "level": logging.ERROR,
  22. "logger": "default",
  23. "tags": [],
  24. }
  25. manager = EventManager(raw_event)
  26. manager.normalize()
  27. return manager.save(self.project.id)
  28. def __build_transaction_event(self):
  29. manager = EventManager(load_data("transaction"))
  30. manager.normalize()
  31. return manager.save(self.project.id)
  32. def __produce_event(self, *insert_args, **insert_kwargs):
  33. # pass arguments on to Kafka EventManager
  34. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  35. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  36. assert not produce_args
  37. assert produce_kwargs["topic"] == "events"
  38. assert produce_kwargs["key"] == str(self.project.id).encode("utf-8")
  39. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  40. assert version == 2
  41. assert type_ == "insert"
  42. # insert what would have been the Kafka payload directly
  43. # into Snuba, expect an HTTP 200 and for the event to now exist
  44. snuba_eventstream = SnubaEventStream()
  45. snuba_eventstream._send(self.project.id, "insert", (payload1, payload2))
  46. @patch("sentry.eventstream.insert")
  47. def test(self, mock_eventstream_insert):
  48. now = datetime.utcnow()
  49. event = self.__build_event(now)
  50. # verify eventstream was called by EventManager
  51. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  52. assert not insert_args
  53. assert insert_kwargs == {
  54. "event": event,
  55. "group": event.group,
  56. "is_new_group_environment": True,
  57. "is_new": True,
  58. "is_regression": False,
  59. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  60. "skip_consume": False,
  61. "received_timestamp": event.data["received"],
  62. }
  63. self.__produce_event(*insert_args, **insert_kwargs)
  64. assert (
  65. snuba.query(
  66. start=now - timedelta(days=1),
  67. end=now + timedelta(days=1),
  68. groupby=["project_id"],
  69. filter_keys={"project_id": [self.project.id]},
  70. ).get(self.project.id, 0)
  71. == 1
  72. )
  73. @patch("sentry.eventstream.insert")
  74. def test_issueless(self, mock_eventstream_insert):
  75. now = datetime.utcnow()
  76. event = self.__build_transaction_event()
  77. event.group_id = None
  78. insert_args = ()
  79. insert_kwargs = {
  80. "event": event,
  81. "group": None,
  82. "is_new_group_environment": True,
  83. "is_new": True,
  84. "is_regression": False,
  85. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  86. "skip_consume": False,
  87. "received_timestamp": event.data["received"],
  88. }
  89. self.__produce_event(*insert_args, **insert_kwargs)
  90. result = snuba.raw_query(
  91. dataset=snuba.Dataset.Transactions,
  92. start=now - timedelta(days=1),
  93. end=now + timedelta(days=1),
  94. selected_columns=["event_id"],
  95. groupby=None,
  96. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  97. )
  98. assert len(result["data"]) == 1