test_eventstream.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from __future__ import absolute_import
  2. from datetime import datetime, timedelta
  3. import six
  4. import time
  5. import logging
  6. from mock import patch, Mock
  7. from sentry.event_manager import EventManager
  8. from sentry.eventstream.kafka import KafkaEventStream
  9. from sentry.eventstream.snuba import SnubaEventStream
  10. from sentry.testutils import SnubaTestCase, TestCase
  11. from sentry.utils import snuba, json
  12. class SnubaEventStreamTest(TestCase, SnubaTestCase):
  13. def setUp(self):
  14. super(SnubaEventStreamTest, self).setUp()
  15. self.kafka_eventstream = KafkaEventStream()
  16. self.kafka_eventstream.producer = Mock()
  17. @patch('sentry.eventstream.insert')
  18. @patch('sentry.tagstore.delay_index_event_tags')
  19. def test(self, mock_delay_index_event_tags, mock_eventstream_insert):
  20. now = datetime.utcnow()
  21. def _get_event_count():
  22. return snuba.query(
  23. start=now - timedelta(days=1),
  24. end=now + timedelta(days=1),
  25. groupby=['project_id'],
  26. filter_keys={'project_id': [self.project.id]},
  27. ).get(self.project.id, 0)
  28. assert _get_event_count() == 0
  29. raw_event = {
  30. 'event_id': 'a' * 32,
  31. 'message': 'foo',
  32. 'timestamp': time.mktime(now.timetuple()),
  33. 'level': logging.ERROR,
  34. 'logger': 'default',
  35. 'tags': [],
  36. }
  37. manager = EventManager(raw_event)
  38. manager.normalize()
  39. event = manager.save(self.project.id)
  40. # verify eventstream was called by EventManager
  41. insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
  42. assert not insert_args
  43. assert insert_kwargs == {
  44. 'event': event,
  45. 'group': event.group,
  46. 'is_new_group_environment': True,
  47. 'is_new': True,
  48. 'is_regression': False,
  49. 'is_sample': False,
  50. 'primary_hash': 'acbd18db4cc2f85cedef654fccc4a4d8',
  51. 'skip_consume': False
  52. }
  53. assert mock_delay_index_event_tags.call_count == 1
  54. # pass arguments on to Kafka EventManager
  55. self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
  56. produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
  57. assert not produce_args
  58. assert produce_kwargs['topic'] == 'events'
  59. assert produce_kwargs['key'] == six.text_type(self.project.id)
  60. version, type_, payload1, payload2 = json.loads(produce_kwargs['value'])
  61. assert version == 2
  62. assert type_ == 'insert'
  63. # insert what would have been the Kafka payload directly
  64. # into Snuba, expect an HTTP 200 and for the event to now exist
  65. snuba_eventstream = SnubaEventStream()
  66. snuba_eventstream._send(self.project.id, 'insert', (payload1, payload2))
  67. assert _get_event_count() == 1