Browse Source

ref(tagstore): Move `index_event_tags.delay` into `TagStorage` backend (#11857)

ted kaemming 6 years ago
parent
commit
5e41899f23

+ 2 - 4
src/sentry/event_manager.py

@@ -20,7 +20,7 @@ from django.utils.encoding import force_text
 from django.utils.functional import cached_property
 from sentry import options
 
-from sentry import buffer, eventtypes, eventstream, features, tsdb, filters
+from sentry import buffer, eventtypes, eventstream, features, tagstore, tsdb, filters
 from sentry.constants import (
     CLIENT_RESERVED_ATTRS, LOG_LEVELS, LOG_LEVELS_MAP, DEFAULT_LOG_LEVEL,
     DEFAULT_LOGGER_NAME, MAX_CULPRIT_LENGTH, VALID_PLATFORMS, MAX_TAG_VALUE_LENGTH,
@@ -849,8 +849,6 @@ class EventManager(object):
                 self.normalize()
             self._normalized = True
 
-        from sentry.tasks.post_process import index_event_tags
-
         data = self._data
 
         project = Project.objects.get_from_cache(id=project_id)
@@ -1160,7 +1158,7 @@ class EventManager(object):
                 )
                 return event
 
-            index_event_tags.delay(
+            tagstore.delay_index_event_tags(
                 organization_id=project.organization_id,
                 project_id=project.id,
                 group_id=group.id,

+ 6 - 0
src/sentry/tagstore/base.py

@@ -89,6 +89,8 @@ class TagStorage(Service):
         'incr_group_tag_value_times_seen',
         'update_group_tag_key_values_seen',
         'update_group_for_events',
+
+        'delay_index_event_tags',
     ])
 
     __all__ = frozenset([
@@ -440,3 +442,7 @@ class TagStorage(Service):
                     project_id, group_id, environment_id, tk.key)
 
         return tag_keys
+
+    def delay_index_event_tags(self, organization_id, project_id, group_id,
+                               environment_id, event_id, tags, date_added):
+        raise NotImplementedError

+ 13 - 0
src/sentry/tagstore/legacy/backend.py

@@ -26,6 +26,7 @@ from sentry.utils import db
 
 from . import models
 from sentry.tagstore.types import TagKey, TagValue, GroupTagKey, GroupTagValue
+from sentry.tasks.post_process import index_event_tags
 
 
 transformers = {
@@ -759,3 +760,15 @@ class LegacyTagStorage(TagStorage):
             project_id=project_id,
             event_id__in=event_ids,
         ).update(group_id=destination_id)
+
+    def delay_index_event_tags(self, organization_id, project_id, group_id,
+                               environment_id, event_id, tags, date_added):
+        index_event_tags.delay(
+            organization_id=organization_id,
+            project_id=project_id,
+            group_id=group_id,
+            environment_id=environment_id,
+            event_id=event_id,
+            tags=tags,
+            date_added=date_added,
+        )

+ 7 - 0
src/sentry/tagstore/snuba/backend.py

@@ -707,6 +707,7 @@ class SnubaCompatibilityTagStorage(SnubaTagStorage):
     in the future, this subclass can be removed (along with the entire
     ``TagStorage`` write interface from the base implementation.)
     """
+
     def get_or_create_group_tag_key(self, project_id, group_id, environment_id, key, **kwargs):
         # Called by ``unmerge.repair_tag_data``. The return value is not used.
         pass
@@ -753,3 +754,9 @@ class SnubaCompatibilityTagStorage(SnubaTagStorage):
     def update_group_tag_key_values_seen(self, project_id, group_ids):
         # Called by ``unmerge``. The return value is not used.
         pass
+
+    def delay_index_event_tags(self, organization_id, project_id, group_id,
+                               environment_id, event_id, tags, date_added):
+        # Called by ``EventManager.save``. The return value is not
+        # used.
+        pass

+ 13 - 0
src/sentry/tagstore/v2/backend.py

@@ -27,6 +27,7 @@ from sentry.utils import db
 
 from . import models
 from sentry.tagstore.types import TagKey, TagValue, GroupTagKey, GroupTagValue
+from sentry.tasks.post_process import index_event_tags
 
 
 logger = logging.getLogger('sentry.tagstore.v2')
@@ -1121,3 +1122,15 @@ class V2TagStorage(TagStorage):
             return queryset.filter(_key__environment_id=environment_id)
         else:
             raise ValueError("queryset of unsupported model '%s' provided" % queryset.model)
+
+    def delay_index_event_tags(self, organization_id, project_id, group_id,
+                               environment_id, event_id, tags, date_added):
+        index_event_tags.delay(
+            organization_id=organization_id,
+            project_id=project_id,
+            group_id=group_id,
+            environment_id=environment_id,
+            event_id=event_id,
+            tags=tags,
+            date_added=date_added,
+        )

+ 4 - 1
tests/snuba/eventstream/test_eventstream.py

@@ -20,7 +20,8 @@ class SnubaEventStreamTest(SnubaTestCase):
         self.kafka_eventstream.producer = Mock()
 
     @patch('sentry.eventstream.insert')
-    def test(self, mock_eventstream_insert):
+    @patch('sentry.tagstore.delay_index_event_tags')
+    def test(self, mock_delay_index_event_tags, mock_eventstream_insert):
         now = datetime.utcnow()
 
         def _get_event_count():
@@ -60,6 +61,8 @@ class SnubaEventStreamTest(SnubaTestCase):
             'skip_consume': False
         }
 
+        assert mock_delay_index_event_tags.call_count == 1
+
         # pass arguments on to Kafka EventManager
         self.kafka_eventstream.insert(*insert_args, **insert_kwargs)