|
@@ -10,6 +10,7 @@ from __future__ import absolute_import
|
|
|
|
|
|
import six
|
|
|
|
|
|
+import logging
|
|
|
from collections import defaultdict
|
|
|
from datetime import timedelta
|
|
|
from django.db import connections, router, IntegrityError, transaction
|
|
@@ -26,7 +27,10 @@ from sentry.utils import db
|
|
|
from .models import EventTag, GroupTagKey, GroupTagValue, TagKey, TagValue
|
|
|
|
|
|
|
|
|
-class TagStorage(TagStorage):
|
|
|
+logger = logging.getLogger('sentry.tagstore.v2')
|
|
|
+
|
|
|
+
|
|
|
+class V2TagStorage(TagStorage):
|
|
|
"""\
|
|
|
The v2 tagstore backend stores and respects ``environment_id``.
|
|
|
|
|
@@ -36,7 +40,6 @@ class TagStorage(TagStorage):
|
|
|
|
|
|
def setup(self):
|
|
|
self.setup_deletions(
|
|
|
- tagkey_model=TagKey,
|
|
|
tagvalue_model=TagValue,
|
|
|
grouptagkey_model=GroupTagKey,
|
|
|
grouptagvalue_model=GroupTagValue,
|
|
@@ -63,7 +66,45 @@ class TagStorage(TagStorage):
|
|
|
grouptagvalue_model=GroupTagValue,
|
|
|
)
|
|
|
|
|
|
- # TODO(brett): v2-specific receivers for keeping environment aggregates up to date
|
|
|
+ def setup_deletions(self, **kwargs):
|
|
|
+ super(V2TagStorage, self).setup_deletions(**kwargs)
|
|
|
+
|
|
|
+ from sentry.deletions import default_manager as deletion_manager
|
|
|
+ from sentry.deletions.defaults import BulkModelDeletionTask
|
|
|
+ from sentry.deletions.base import ModelRelation
|
|
|
+ from sentry.models import Project
|
|
|
+
|
|
|
+ deletion_manager.register(TagKey, BulkModelDeletionTask)
|
|
|
+ deletion_manager.add_dependencies(Project, [
|
|
|
+ lambda instance: ModelRelation(TagKey, {'project_id': instance.id}),
|
|
|
+ ])
|
|
|
+
|
|
|
+ def setup_receivers(self, **kwargs):
|
|
|
+ super(V2TagStorage, self).setup_receivers(**kwargs)
|
|
|
+
|
|
|
+ from sentry.signals import buffer_incr_complete
|
|
|
+
|
|
|
+ @buffer_incr_complete.connect(sender=TagValue, weak=False)
|
|
|
+ def record_project_tag_count(filters, created, **kwargs):
|
|
|
+ if not created:
|
|
|
+ return
|
|
|
+
|
|
|
+ project_id = filters['project_id']
|
|
|
+ environment_id = filters['environment_id']
|
|
|
+
|
|
|
+ self.incr_tag_key_values_seen(project_id, environment_id, filters['_key_id'])
|
|
|
+
|
|
|
+ @buffer_incr_complete.connect(sender=GroupTagValue, weak=False)
|
|
|
+ def record_group_tag_count(filters, created, extra, **kwargs):
|
|
|
+ if not created:
|
|
|
+ return
|
|
|
+
|
|
|
+ project_id = extra['project_id']
|
|
|
+ group_id = filters['group_id']
|
|
|
+ environment_id = filters['environment_id']
|
|
|
+
|
|
|
+ self.incr_group_tag_key_values_seen(
|
|
|
+ project_id, group_id, environment_id, filters['_key_id'])
|
|
|
|
|
|
def create_tag_key(self, project_id, environment_id, key, **kwargs):
|
|
|
return TagKey.objects.create(
|
|
@@ -82,8 +123,12 @@ class TagStorage(TagStorage):
|
|
|
)
|
|
|
|
|
|
def create_tag_value(self, project_id, environment_id, key, value, **kwargs):
|
|
|
+ tag_key_kwargs = kwargs.copy()
|
|
|
+ for k in ['times_seen', 'first_seen', 'last_seen']:
|
|
|
+ tag_key_kwargs.pop(k, None)
|
|
|
+
|
|
|
tag_key, _ = self.get_or_create_tag_key(
|
|
|
- project_id, environment_id, key, **kwargs)
|
|
|
+ project_id, environment_id, key, **tag_key_kwargs)
|
|
|
|
|
|
return TagValue.objects.create(
|
|
|
project_id=project_id,
|
|
@@ -109,8 +154,11 @@ class TagStorage(TagStorage):
|
|
|
)
|
|
|
|
|
|
def create_group_tag_key(self, project_id, group_id, environment_id, key, **kwargs):
|
|
|
+ tag_key_kwargs = kwargs.copy()
|
|
|
+ tag_key_kwargs.pop('values_seen', None)
|
|
|
+
|
|
|
tag_key, _ = self.get_or_create_tag_key(
|
|
|
- project_id, environment_id, key, **kwargs)
|
|
|
+ project_id, environment_id, key, **tag_key_kwargs)
|
|
|
|
|
|
return GroupTagKey.objects.create(
|
|
|
project_id=project_id,
|
|
@@ -132,12 +180,17 @@ class TagStorage(TagStorage):
|
|
|
**kwargs
|
|
|
)
|
|
|
|
|
|
- def create_group_tag_value(self, project_id, group_id, environment_id, key, value, **kwargs):
|
|
|
+ def create_group_tag_value(self, project_id, group_id, environment_id,
|
|
|
+ key, value, **kwargs):
|
|
|
+ other_kwargs = kwargs.copy()
|
|
|
+ for k in ['times_seen', 'first_seen', 'last_seen']:
|
|
|
+ other_kwargs.pop(k, None)
|
|
|
+
|
|
|
tag_key, _ = self.get_or_create_tag_key(
|
|
|
- project_id, environment_id, key, **kwargs)
|
|
|
+ project_id, environment_id, key, **other_kwargs)
|
|
|
|
|
|
tag_value, _ = self.get_or_create_tag_value(
|
|
|
- project_id, environment_id, key, value, **kwargs)
|
|
|
+ project_id, environment_id, key, value, **other_kwargs)
|
|
|
|
|
|
return GroupTagValue.objects.create(
|
|
|
project_id=project_id,
|
|
@@ -166,6 +219,8 @@ class TagStorage(TagStorage):
|
|
|
)
|
|
|
|
|
|
def create_event_tags(self, project_id, group_id, environment_id, event_id, tags):
|
|
|
+ assert environment_id is not None
|
|
|
+
|
|
|
try:
|
|
|
# don't let a duplicate break the outer transaction
|
|
|
with transaction.atomic():
|
|
@@ -184,7 +239,15 @@ class TagStorage(TagStorage):
|
|
|
for key_id, value_id in tags
|
|
|
])
|
|
|
except IntegrityError:
|
|
|
- pass
|
|
|
+ logger.error(
|
|
|
+ 'tagstore.create_event_tags.integrity_error',
|
|
|
+ extra={
|
|
|
+ 'project_id': project_id,
|
|
|
+ 'environment_id': environment_id,
|
|
|
+ 'group_id': group_id,
|
|
|
+ 'event_id': event_id,
|
|
|
+ }
|
|
|
+ )
|
|
|
|
|
|
def get_tag_key(self, project_id, environment_id, key, status=TagKeyStatus.VISIBLE):
|
|
|
from sentry.tagstore.exceptions import TagKeyNotFound
|
|
@@ -323,31 +386,36 @@ class TagStorage(TagStorage):
|
|
|
).delete()
|
|
|
|
|
|
def incr_tag_key_values_seen(self, project_id, environment_id, key, count=1):
|
|
|
+ # key is passed `key_id` from `buffer_incr_complete.connect(sender=TagValue)`
|
|
|
buffer.incr(TagKey,
|
|
|
columns={
|
|
|
'values_seen': count,
|
|
|
},
|
|
|
filters={
|
|
|
+ 'id': key,
|
|
|
'project_id': project_id,
|
|
|
'environment_id': environment_id,
|
|
|
- 'key': key,
|
|
|
})
|
|
|
|
|
|
def incr_tag_value_times_seen(self, project_id, environment_id,
|
|
|
key, value, extra=None, count=1):
|
|
|
- buffer.incr(TagValue,
|
|
|
- columns={
|
|
|
- 'times_seen': count,
|
|
|
- },
|
|
|
- filters={
|
|
|
- 'project_id': project_id,
|
|
|
- 'environment_id': environment_id,
|
|
|
- 'key': key,
|
|
|
- 'value': value,
|
|
|
- },
|
|
|
- extra=extra)
|
|
|
+ for env in [environment_id, None]:
|
|
|
+ tagkey, _ = self.get_or_create_tag_key(project_id, env, key)
|
|
|
+
|
|
|
+ buffer.incr(TagValue,
|
|
|
+ columns={
|
|
|
+ 'times_seen': count,
|
|
|
+ },
|
|
|
+ filters={
|
|
|
+ 'project_id': project_id,
|
|
|
+ 'environment_id': env,
|
|
|
+ '_key_id': tagkey.id,
|
|
|
+ 'value': value,
|
|
|
+ },
|
|
|
+ extra=extra)
|
|
|
|
|
|
def incr_group_tag_key_values_seen(self, project_id, group_id, environment_id, key, count=1):
|
|
|
+ # key is passed `key_id` from `buffer_incr_complete.connect(sender=GroupTagValue)`
|
|
|
buffer.incr(GroupTagKey,
|
|
|
columns={
|
|
|
'values_seen': count,
|
|
@@ -356,48 +424,56 @@ class TagStorage(TagStorage):
|
|
|
'project_id': project_id,
|
|
|
'group_id': group_id,
|
|
|
'environment_id': environment_id,
|
|
|
- 'key': key,
|
|
|
+ '_key_id': key,
|
|
|
})
|
|
|
|
|
|
def incr_group_tag_value_times_seen(self, project_id, group_id, environment_id,
|
|
|
key, value, extra=None, count=1):
|
|
|
- buffer.incr(GroupTagValue,
|
|
|
- columns={
|
|
|
- 'times_seen': count,
|
|
|
- },
|
|
|
- filters={
|
|
|
- 'project_id': project_id,
|
|
|
- 'group_id': group_id,
|
|
|
- 'environment_id': environment_id,
|
|
|
- 'key': key,
|
|
|
- 'value': value,
|
|
|
- },
|
|
|
- extra=extra)
|
|
|
+ for env in [environment_id, None]:
|
|
|
+ tagkey, _ = self.get_or_create_tag_key(project_id, env, key)
|
|
|
+ tagvalue, _ = self.get_or_create_tag_value(project_id, env, key, value)
|
|
|
+
|
|
|
+ buffer.incr(GroupTagValue,
|
|
|
+ columns={
|
|
|
+ 'times_seen': count,
|
|
|
+ },
|
|
|
+ filters={
|
|
|
+ 'project_id': project_id,
|
|
|
+ 'group_id': group_id,
|
|
|
+ 'environment_id': env,
|
|
|
+ '_key_id': tagkey.id,
|
|
|
+ '_value_id': tagvalue.id,
|
|
|
+ },
|
|
|
+ extra=extra)
|
|
|
|
|
|
def get_group_event_ids(self, project_id, group_id, environment_id, tags):
|
|
|
- tagkeys = dict(
|
|
|
- TagKey.objects.filter(
|
|
|
- project_id=project_id,
|
|
|
- key__in=tags.keys(),
|
|
|
- status=TagKeyStatus.VISIBLE,
|
|
|
- **self._get_environment_filter(environment_id)
|
|
|
- ).values_list('key', 'id')
|
|
|
- )
|
|
|
+ # NOTE: `environment_id=None` needs to be filtered differently in this method.
|
|
|
+ # EventTag never has NULL `environment_id` fields (individual Events always have an environment),
|
|
|
+ # and so `environment_id=None` needs to query EventTag for *all* environments (except, ironically
|
|
|
+ # the aggregate environment, which is NULL).
|
|
|
|
|
|
- tagvalues = {
|
|
|
- (t[1], t[2]): t[0]
|
|
|
- for t in TagValue.objects.filter(
|
|
|
- reduce(or_, (Q(_key__key=k, value=v)
|
|
|
- for k, v in six.iteritems(tags))),
|
|
|
- project_id=project_id,
|
|
|
- **self._get_environment_filter(environment_id)
|
|
|
- ).values_list('id', '_key__key', 'value')
|
|
|
- }
|
|
|
+ if environment_id is None:
|
|
|
+ # filter for all 'real' environments
|
|
|
+ env_filter = {'environment_id__isnull': False}
|
|
|
+ else:
|
|
|
+ env_filter = {'environment_id': environment_id}
|
|
|
+
|
|
|
+ tagvalue_qs = TagValue.objects.filter(
|
|
|
+ reduce(or_, (Q(_key__key=k, _key__status=TagKeyStatus.VISIBLE, value=v)
|
|
|
+ for k, v in six.iteritems(tags))),
|
|
|
+ project_id=project_id,
|
|
|
+ **env_filter
|
|
|
+ ).values_list('_key_id', 'id', '_key__key', 'value')
|
|
|
+
|
|
|
+ tagvalues = defaultdict(list)
|
|
|
+ for key_id, value_id, key, value in tagvalue_qs:
|
|
|
+ tagvalues[(key, value)].append((key_id, value_id))
|
|
|
+ tagvalues = dict(tagvalues)
|
|
|
|
|
|
try:
|
|
|
- tag_lookups = [(tagkeys[k], tagvalues[(k, v)])
|
|
|
- for k, v in six.iteritems(tags)]
|
|
|
- # [(1, 10), ...]
|
|
|
+ # ensure all key/value pairs were found
|
|
|
+ tag_lookups = [tagvalues[(k, v)] for k, v in six.iteritems(tags)]
|
|
|
+ # [[(key0, value0), (key1, value1)], ...]
|
|
|
except KeyError:
|
|
|
# one or more tags were invalid, thus the result should be an empty
|
|
|
# set
|
|
@@ -407,28 +483,26 @@ class TagStorage(TagStorage):
|
|
|
# reasonable matches
|
|
|
|
|
|
# get initial matches to start the filter
|
|
|
- k, v = tag_lookups.pop()
|
|
|
+ kv_pairs = tag_lookups.pop()
|
|
|
matches = list(
|
|
|
EventTag.objects.filter(
|
|
|
+ reduce(or_, (Q(key_id=k, value_id=v)
|
|
|
+ for k, v in kv_pairs)),
|
|
|
project_id=project_id,
|
|
|
group_id=group_id,
|
|
|
- key_id=k,
|
|
|
- value_id=v,
|
|
|
- **self._get_environment_filter(environment_id)
|
|
|
).values_list('event_id', flat=True)[:1000]
|
|
|
)
|
|
|
|
|
|
# for each remaining tag, find matches contained in our
|
|
|
# existing set, pruning it down each iteration
|
|
|
- for k, v in tag_lookups:
|
|
|
+ for kv_pairs in tag_lookups:
|
|
|
matches = list(
|
|
|
EventTag.objects.filter(
|
|
|
+ reduce(or_, (Q(key_id=k, value_id=v)
|
|
|
+ for k, v in kv_pairs)),
|
|
|
project_id=project_id,
|
|
|
group_id=group_id,
|
|
|
event_id__in=matches,
|
|
|
- key_id=k,
|
|
|
- value_id=v,
|
|
|
- **self._get_environment_filter(environment_id)
|
|
|
).values_list('event_id', flat=True)[:1000]
|
|
|
)
|
|
|
if not matches:
|
|
@@ -456,17 +530,17 @@ class TagStorage(TagStorage):
|
|
|
"""
|
|
|
SELECT SUM(t)
|
|
|
FROM (
|
|
|
- SELECT times_seen as t
|
|
|
+ SELECT tagstore_grouptagvalue.times_seen as t
|
|
|
FROM tagstore_grouptagvalue
|
|
|
INNER JOIN tagstore_tagkey
|
|
|
ON (tagstore_grouptagvalue.key_id = tagstore_tagkey.id)
|
|
|
- WHERE tagstore_grouptagvalue.group_id = %s
|
|
|
- AND tagstore_grouptagvalue.environment_id = %s
|
|
|
- AND tagstore_tagkey.key = %s
|
|
|
+ WHERE tagstore_grouptagvalue.group_id = %%s
|
|
|
+ AND tagstore_grouptagvalue.environment_id %s %%s
|
|
|
+ AND tagstore_tagkey.key = %%s
|
|
|
ORDER BY last_seen DESC
|
|
|
LIMIT 10000
|
|
|
) as a
|
|
|
- """, [group_id, environment_id, key]
|
|
|
+ """ % ('IS' if environment_id is None else '='), [group_id, environment_id, key]
|
|
|
)
|
|
|
return cursor.fetchone()[0] or 0
|
|
|
|
|
@@ -500,14 +574,14 @@ class TagStorage(TagStorage):
|
|
|
INNER JOIN tagstore_tagkey
|
|
|
ON (tagstore_grouptagvalue.key_id = tagstore_tagkey.id)
|
|
|
WHERE tagstore_grouptagvalue.group_id = %%s
|
|
|
- AND tagstore_grouptagvalue.environment_id = %%s
|
|
|
+ AND tagstore_grouptagvalue.environment_id %s %%s
|
|
|
AND tagstore_tagkey.key = %%s
|
|
|
ORDER BY last_seen DESC
|
|
|
LIMIT 10000
|
|
|
) as a
|
|
|
ORDER BY times_seen DESC
|
|
|
LIMIT %d
|
|
|
- """ % limit, [group_id, environment_id, key]
|
|
|
+ """ % ('IS' if environment_id is None else '=', limit), [group_id, environment_id, key]
|
|
|
)
|
|
|
)
|
|
|
|
|
@@ -631,14 +705,14 @@ class TagStorage(TagStorage):
|
|
|
project_id=instance.project_id,
|
|
|
group_id=instance.group_id,
|
|
|
environment_id=instance.environment_id,
|
|
|
- key=instance.key,
|
|
|
+ _key_id=instance._key_id,
|
|
|
).count(),
|
|
|
)
|
|
|
|
|
|
def get_tag_value_qs(self, project_id, environment_id, key, query=None):
|
|
|
queryset = TagValue.objects.filter(
|
|
|
project_id=project_id,
|
|
|
- key=key,
|
|
|
+ _key__key=key,
|
|
|
**self._get_environment_filter(environment_id)
|
|
|
)
|
|
|
|
|
@@ -651,7 +725,7 @@ class TagStorage(TagStorage):
|
|
|
return GroupTagValue.objects.filter(
|
|
|
project_id=project_id,
|
|
|
group_id=group_id,
|
|
|
- key=key,
|
|
|
+ _key__key=key,
|
|
|
**self._get_environment_filter(environment_id)
|
|
|
)
|
|
|
|