test_eventstream.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from __future__ import absolute_import
  2. from datetime import datetime, timedelta
  3. import six
  4. import time
  5. import logging
  6. from mock import patch, Mock
  7. from sentry.event_manager import EventManager
  8. from sentry.eventstream.kafka import KafkaEventStream
  9. from sentry.testutils import SnubaTestCase
  10. from sentry.utils import snuba, json
  11. class SnubaEventStreamTest(SnubaTestCase):
  12. def setUp(self):
  13. super(SnubaEventStreamTest, self).setUp()
  14. self.kafka_eventstream = KafkaEventStream()
  15. self.kafka_eventstream.producer = Mock()
  16. @patch('sentry.eventstream.insert')
  17. def test(self, mock_eventstream_insert):
  18. now = datetime.utcnow()
  19. def _get_event_count():
  20. return snuba.query(
  21. start=now - timedelta(days=1),
  22. end=now + timedelta(days=1),
  23. groupby=['project_id'],
  24. filter_keys={'project_id': [self.project.id]},
  25. ).get(self.project.id, 0)
  26. assert _get_event_count() == 0
  27. raw_event = {
  28. 'event_id': 'a' * 32,
  29. 'message': 'foo',
  30. 'timestamp': time.mktime(now.timetuple()),
  31. 'level': logging.ERROR,
  32. 'logger': 'default',
  33. 'tags': [],
  34. }
  35. manager = EventManager(raw_event)
  36. manager.normalize()
  37. event = manager.save(self.project.id)
  38. # verify eventstream was called by EventManager
  39. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  40. assert not insert_args
  41. assert insert_kwargs == {
  42. 'event': event,
  43. 'group': event.group,
  44. 'is_new_group_environment': True,
  45. 'is_new': True,
  46. 'is_regression': False,
  47. 'is_sample': False,
  48. 'primary_hash': 'acbd18db4cc2f85cedef654fccc4a4d8',
  49. 'skip_consume': False
  50. }
  51. # pass arguments on to Kafka EventManager
  52. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  53. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  54. assert not produce_args
  55. assert produce_kwargs['topic'] == 'events'
  56. assert produce_kwargs['key'] == six.text_type(self.project.id)
  57. version, type_, primary_payload = json.loads(produce_kwargs['value'])[:3]
  58. assert version == 2
  59. assert type_ == 'insert'
  60. # insert what would have been the Kafka payload directly
  61. # into Snuba, expect an HTTP 200 and for the event to now exist
  62. snuba.insert_raw([primary_payload])
  63. assert _get_event_count() == 1