Browse Source

perf(issue_alerts): Batch fetches to `GroupRuleStatus` table (WOR-1350) (#28558)

We currently fetch the GroupRuleStatus rows for every rule in RuleProcessor individually, rather
than in bulk. There are some projects with over 1000 rules, and so fetching these all individually
costs us a lot of IO wait time.

This changes the ruleprocessor to fetch all the statuses in bulk and pass them through to
apply_rule.
Dan Fuller 3 years ago
parent
commit
e7dd0224e6
2 changed files with 136 additions and 14 deletions
  1. 65 14
      src/sentry/rules/processor.py
  2. 71 0
      tests/sentry/rules/test_processor.py

+ 65 - 14
src/sentry/rules/processor.py

@@ -2,6 +2,7 @@ import logging
 from collections import namedtuple
 from datetime import timedelta
 from random import randrange
+from typing import Mapping, Sequence, Set
 
 from django.core.cache import cache
 from django.utils import timezone
@@ -38,15 +39,66 @@ class RuleProcessor:
         """
         return Rule.get_for_project(self.project.id)
 
-    def get_rule_status(self, rule):
-        key = "grouprulestatus:1:%s" % hash_values([self.group.id, rule.id])
-        rule_status = cache.get(key)
-        if rule_status is None:
-            rule_status, _ = GroupRuleStatus.objects.get_or_create(
-                rule=rule, group=self.group, defaults={"project": self.project}
+    def _build_rule_status_cache_key(self, rule_id: int) -> str:
+        return "grouprulestatus:1:%s" % hash_values([self.group.id, rule_id])
+
+    def bulk_get_rule_status(self, rules: Sequence[Rule]) -> Mapping[int, GroupRuleStatus]:
+        keys = [self._build_rule_status_cache_key(rule.id) for rule in rules]
+        cache_results: Mapping[str, GroupRuleStatus] = cache.get_many(keys)
+        missing_rule_ids: Set[int] = set()
+        rule_statuses: Mapping[int, GroupRuleStatus] = {}
+        for key, rule in zip(keys, rules):
+            rule_status = cache_results.get(key)
+            if not rule_status:
+                missing_rule_ids.add(rule.id)
+            else:
+                rule_statuses[rule.id] = rule_status
+
+        if missing_rule_ids:
+            # If not cached, attempt to fetch status from the database
+            statuses = GroupRuleStatus.objects.filter(
+                group=self.group, rule_id__in=missing_rule_ids
             )
-            cache.set(key, rule_status, 300)
-        return rule_status
+            to_cache: Sequence[GroupRuleStatus] = list()
+            for status in statuses:
+                rule_statuses[status.rule_id] = status
+                missing_rule_ids.remove(status.rule_id)
+                to_cache.append(status)
+
+            # We might need to create some statuses if they don't already exist
+            if missing_rule_ids:
+                # We use `ignore_conflicts=True` here to avoid race conditions where the statuses
+                # might be created between when we queried above and attempt to create the rows now.
+                GroupRuleStatus.objects.bulk_create(
+                    [
+                        GroupRuleStatus(rule_id=rule_id, group=self.group, project=self.project)
+                        for rule_id in missing_rule_ids
+                    ],
+                    ignore_conflicts=True,
+                )
+                # Using `ignore_conflicts=True` prevents the pk from being set on the model
+                # instances. Re-query the database to fetch the rows, they should all exist at this
+                # point.
+                statuses = GroupRuleStatus.objects.filter(
+                    group=self.group, rule_id__in=missing_rule_ids
+                )
+                for status in statuses:
+                    rule_statuses[status.rule_id] = status
+                    missing_rule_ids.remove(status.rule_id)
+                    to_cache.append(status)
+
+                if missing_rule_ids:
+                    # Shouldn't happen, but log just in case
+                    self.logger.error(
+                        "Failed to fetch some GroupRuleStatuses in RuleProcessor",
+                        extra={"missing_rule_ids": missing_rule_ids, "group_id": self.group.id},
+                    )
+            if to_cache:
+                cache.set_many(
+                    {self._build_rule_status_cache_key(item.rule_id): item for item in to_cache}
+                )
+
+        return rule_statuses
 
     def condition_matches(self, condition, state, rule):
         condition_cls = rules.get(condition["id"])
@@ -82,7 +134,7 @@ class RuleProcessor:
             return lambda bool_iter: not any(bool_iter)
         return None
 
-    def apply_rule(self, rule):
+    def apply_rule(self, rule, status):
         """
         If all conditions and filters pass, execute every action.
 
@@ -100,11 +152,8 @@ class RuleProcessor:
         ):
             return
 
-        status = self.get_rule_status(rule)
-
         now = timezone.now()
         freq_offset = now - timedelta(minutes=frequency)
-
         if status.last_active and status.last_active > freq_offset:
             return
 
@@ -194,6 +243,8 @@ class RuleProcessor:
             return {}.values()
 
         self.grouped_futures.clear()
-        for rule in self.get_rules():
-            self.apply_rule(rule)
+        rules = self.get_rules()
+        rule_statuses = self.bulk_get_rule_status(rules)
+        for rule in rules:
+            self.apply_rule(rule, rule_statuses[rule.id])
         return self.grouped_futures.values()

+ 71 - 0
tests/sentry/rules/test_processor.py

@@ -1,5 +1,9 @@
 from datetime import timedelta
+from unittest import mock
 
+from django.core.cache import cache
+from django.db import DEFAULT_DB_ALIAS, connections
+from django.test.utils import CaptureQueriesContext
 from django.utils import timezone
 
 from sentry.models import GroupRuleStatus, GroupStatus, Rule
@@ -84,6 +88,73 @@ class RuleProcessorTest(TestCase):
         results = list(rp.apply())
         assert len(results) == 0
 
+    def run_query_test(self, rp, expected_queries):
+        with CaptureQueriesContext(connections[DEFAULT_DB_ALIAS]) as queries:
+            results = list(rp.apply())
+        status_queries = [
+            q
+            for q in queries.captured_queries
+            if "grouprulestatus" in str(q) and "UPDATE" not in str(q)
+        ]
+        assert len(status_queries) == expected_queries, "\n".join(
+            "%d. %s" % (i, query["sql"]) for i, query in enumerate(status_queries, start=1)
+        )
+        assert len(results) == 2
+
+    def test_multiple_rules(self):
+        rule_2 = Rule.objects.create(
+            project=self.event.project,
+            data={"conditions": [EVERY_EVENT_COND_DATA], "actions": [EMAIL_ACTION_DATA]},
+        )
+        rp = RuleProcessor(
+            self.event,
+            is_new=True,
+            is_regression=True,
+            is_new_group_environment=True,
+            has_reappeared=True,
+        )
+        self.run_query_test(rp, 3)
+
+        GroupRuleStatus.objects.filter(rule__in=[self.rule, rule_2]).update(
+            last_active=timezone.now() - timedelta(minutes=Rule.DEFAULT_FREQUENCY + 1)
+        )
+
+        # GroupRuleStatus queries should be cached
+        self.run_query_test(rp, 0)
+
+        cache.clear()
+        GroupRuleStatus.objects.filter(rule__in=[self.rule, rule_2]).update(
+            last_active=timezone.now() - timedelta(minutes=Rule.DEFAULT_FREQUENCY + 1)
+        )
+
+        # GroupRuleStatus rows should be created, so we should perform two fewer queries since we
+        # don't need to create/fetch the rows
+        self.run_query_test(rp, 1)
+
+        cache.clear()
+        GroupRuleStatus.objects.filter(rule__in=[self.rule, rule_2]).update(
+            last_active=timezone.now() - timedelta(minutes=Rule.DEFAULT_FREQUENCY + 1)
+        )
+
+        # Test that we don't get errors if we try to create statuses that already exist due to a
+        # race condition
+        with mock.patch("sentry.rules.processor.GroupRuleStatus") as mocked_GroupRuleStatus:
+            call_count = 0
+
+            def mock_filter(*args, **kwargs):
+                nonlocal call_count
+                if call_count == 0:
+                    call_count += 1
+                    # Make a query here to not throw the query counts off
+                    return GroupRuleStatus.objects.filter(id=-1)
+                return GroupRuleStatus.objects.filter(*args, **kwargs)
+
+            mocked_GroupRuleStatus.objects.filter.side_effect = mock_filter
+            # Even though the rows already exist, we should go through the creation step and make
+            # the extra queries. The conflicting insert doesn't seem to be counted here since it
+            # creates no rows.
+            self.run_query_test(rp, 2)
+
 
 # mock filter which always passes
 class MockFilterTrue(EventFilter):