Browse Source

test(eventstream): Verify eventstream functionality from EventManager to Snuba (#10790)

Brett Hoerner 6 years ago
parent
commit
ccde26ed10

+ 1 - 1
src/sentry/eventstream/kafka.py

@@ -96,7 +96,7 @@ class KafkaEventStream(EventStream):
 
         try:
             self.producer.produce(
-                self.publish_topic,
+                topic=self.publish_topic,
                 key=key.encode('utf-8'),
                 value=json.dumps(
                     (EVENT_PROTOCOL_VERSION, _type) + extra_data

+ 4 - 1
src/sentry/utils/snuba.py

@@ -585,10 +585,13 @@ def insert_raw(data):
     data = json.dumps(data)
     try:
         with timer('snuba_insert_raw'):
-            return _snuba_pool.urlopen(
+            resp = _snuba_pool.urlopen(
                 'POST', '/tests/insert',
                 body=data,
             )
+            if resp.status != 200:
+                raise SnubaError("Non-200 response from Snuba insert!")
+            return resp
     except urllib3.exceptions.HTTPError as err:
         raise SnubaError(err)
 

+ 1 - 0
tests/snuba/eventstream/__init__.py

@@ -0,0 +1 @@
+from __future__ import absolute_import

+ 78 - 0
tests/snuba/eventstream/test_eventstream.py

@@ -0,0 +1,78 @@
+from __future__ import absolute_import
+
+from datetime import datetime, timedelta
+import six
+import time
+import logging
+from mock import patch, Mock
+
+from sentry.event_manager import EventManager
+from sentry.eventstream.kafka import KafkaEventStream
+from sentry.testutils import SnubaTestCase
+from sentry.utils import snuba, json
+
+
+class SnubaEventStreamTest(SnubaTestCase):
+    def setUp(self):
+        super(SnubaEventStreamTest, self).setUp()
+
+        self.kafka_eventstream = KafkaEventStream()
+        self.kafka_eventstream.producer = Mock()
+
+    @patch('sentry.eventstream.insert')
+    def test(self, mock_eventstream_insert):
+        now = datetime.utcnow()
+
+        def _get_event_count():
+            return snuba.query(
+                start=now - timedelta(days=1),
+                end=now + timedelta(days=1),
+                groupby=['project_id'],
+                filter_keys={'project_id': [self.project.id]},
+            ).get(self.project.id, 0)
+
+        assert _get_event_count() == 0
+
+        raw_event = {
+            'event_id': 'a' * 32,
+            'message': 'foo',
+            'timestamp': time.mktime(now.timetuple()),
+            'level': logging.ERROR,
+            'logger': 'default',
+            'tags': [],
+        }
+
+        manager = EventManager(raw_event)
+        manager.normalize()
+        event = manager.save(self.project.id)
+
+        # verify eventstream was called by EventManager
+        insert_args, insert_kwargs = list(mock_eventstream_insert.call_args)
+        assert not insert_args
+        assert insert_kwargs == {
+            'event': event,
+            'group': event.group,
+            'is_new_group_environment': True,
+            'is_new': True,
+            'is_regression': False,
+            'is_sample': False,
+            'primary_hash': 'acbd18db4cc2f85cedef654fccc4a4d8',
+            'skip_consume': False
+        }
+
+        # pass arguments on to Kafka EventManager
+        self.kafka_eventstream.insert(*insert_args, **insert_kwargs)
+
+        produce_args, produce_kwargs = list(self.kafka_eventstream.producer.produce.call_args)
+        assert not produce_args
+        assert produce_kwargs['topic'] == 'events'
+        assert produce_kwargs['key'] == six.text_type(self.project.id)
+
+        version, type_, primary_payload = json.loads(produce_kwargs['value'])[:3]
+        assert version == 2
+        assert type_ == 'insert'
+
+        # insert what would have been the Kafka payload directly
+        # into Snuba, expect an HTTP 200 and for the event to now exist
+        snuba.insert_raw([primary_payload])
+        assert _get_event_count() == 1