test_eventstream.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from __future__ import absolute_import
  2. from datetime import datetime, timedelta
  3. import six
  4. import time
  5. import logging
  6. from mock import patch, Mock
  7. from sentry.event_manager import EventManager
  8. from sentry.eventstream.kafka import KafkaEventStream
  9. from sentry.eventstream.snuba import SnubaEventStream
  10. from sentry.testutils import SnubaTestCase, TestCase
  11. from sentry.utils import snuba, json
  12. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  13. def setUp(self):
  14. super(SnubaEventStreamTest, self).setUp()
  15. self.kafka_eventstream = KafkaEventStream()
  16. self.kafka_eventstream.producer = Mock()
  17. def __build_event(self, timestamp):
  18. raw_event = {
  19. "event_id": "a" * 32,
  20. "message": "foo",
  21. "timestamp": time.mktime(timestamp.timetuple()),
  22. "level": logging.ERROR,
  23. "logger": "default",
  24. "tags": [],
  25. }
  26. manager = EventManager(raw_event)
  27. manager.normalize()
  28. return manager.save(self.project.id)
  29. def __produce_event(self, *insert_args, **insert_kwargs):
  30. # pass arguments on to Kafka EventManager
  31. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  32. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  33. assert not produce_args
  34. assert produce_kwargs["topic"] == "events"
  35. assert produce_kwargs["key"] == six.text_type(self.project.id)
  36. version, type_, payload1, payload2 = json.loads(produce_kwargs["value"])
  37. assert version == 2
  38. assert type_ == "insert"
  39. # insert what would have been the Kafka payload directly
  40. # into Snuba, expect an HTTP 200 and for the event to now exist
  41. snuba_eventstream = SnubaEventStream()
  42. snuba_eventstream._send(self.project.id, "insert", (payload1, payload2))
  43. @patch("sentry.eventstream.insert")
  44. def test(self, mock_eventstream_insert):
  45. now = datetime.utcnow()
  46. def _get_event_count():
  47. return snuba.query(
  48. start=now - timedelta(days=1),
  49. end=now + timedelta(days=1),
  50. groupby=["project_id"],
  51. filter_keys={"project_id": [self.project.id]},
  52. ).get(self.project.id, 0)
  53. assert _get_event_count() == 0
  54. event = self.__build_event(now)
  55. # verify eventstream was called by EventManager
  56. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  57. assert not insert_args
  58. assert insert_kwargs == {
  59. "event": event,
  60. "group": event.group,
  61. "is_new_group_environment": True,
  62. "is_new": True,
  63. "is_regression": False,
  64. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  65. "skip_consume": False,
  66. }
  67. self.__produce_event(*insert_args, **insert_kwargs)
  68. assert _get_event_count() == 1
  69. @patch("sentry.eventstream.insert")
  70. def test_issueless(self, mock_eventstream_insert):
  71. now = datetime.utcnow()
  72. event = self.__build_event(now)
  73. event.group_id = None
  74. insert_args = ()
  75. insert_kwargs = {
  76. "event": event,
  77. "group": None,
  78. "is_new_group_environment": True,
  79. "is_new": True,
  80. "is_regression": False,
  81. "primary_hash": "acbd18db4cc2f85cedef654fccc4a4d8",
  82. "skip_consume": False,
  83. }
  84. self.__produce_event(*insert_args, **insert_kwargs)
  85. result = snuba.raw_query(
  86. start=now - timedelta(days=1),
  87. end=now + timedelta(days=1),
  88. selected_columns=["event_id", "group_id"],
  89. groupby=None,
  90. filter_keys={"project_id": [self.project.id], "event_id": [event.event_id]},
  91. )
  92. assert len(result["data"]) == 1
  93. assert result["data"][0]["group_id"] is None