Browse Source

perf(issue_alerts): Add jitter to issue alert queries to even out requests to snuba (#34783)

* perf(issue_alerts): Add jitter to issue alert queries to even out requests to snuba

This pr adds jitter to the start/end dates on our issue alert queries so that we query snuba in a
less spiky way.

The query cache (both in snuba and on our end) basically caches on identical queries. When we make
queries from issue alerts, we either roll up the start/end date to the nearest 10 second or hour
bucket, depending on how large the interval for the alert is. This means that in practice, for 10
second rollups we cache all the queries for 10 seconds, and for hourly rollups we cache for (cache
time, probably around 5 mins).

The other effect of this is that for 10 second rollups, our caches are effectively invalidated
every 10 seconds.  So for any high volume groups that are constantly querying we’ll be querying at
second 0, 10, 20, etc of every minute fairly consistently. This leads to spikiness, where we hammer
snuba with queries a lot at cache invalidation, and then don’t do too much for the other seconds
in the bucket.

Some examples of how rollups work currently:

interval: 1 hour, rollup: 10s, time: 2022-05-18 22:33:08
Rolls up into start/end:  2022-05-18 21:33:00 -> 2022-05-18 22:33:10
Note that we add an additional rollup bucket to the end here, so this ends up querying for a
1 hour 10 second period. Since the end is slightly in the future, this is in practice querying for
1 hour 8 seconds.

interval: 1 minute, rollup: 10s, time: 2022-05-18 22:33:05
Rolls up into start/end: 2022-05-18 22:32:00 2022-05-18 22:33:10
Again an extra rollup bucket is added here, so it’s querying for 1 min 10 seconds, although again
since it’s in the future in practice it’s querying a 1 min 5 second bucket.

To improve this we want to add jitter to how these buckets work. Since we already vary the queries
by up to <rollup> seconds, we want to keep within those boundaries. We use the `group_id` so that
queries for a given group will be always run with the same jitter. To do this, we essentially shift
the buckets that we round to.

Examples:
Interval: 1 day, rollup: 1 hour, time 2022-05-18 22:33:08
Group id 609, jitter is 609 seconds
Previous start/end: 2022-05-17 22:00:00 2022-05-18 23:00:00 (Note that since our rollup is an hour,
it adds a whole extra hour here, although in practice here it’s 33 minutes extra)
Jitter start/end:  2022-05-17 22:10:09 2022-05-18 23:10:09 (In practice end time is still
2022-05-18 22:33:08, since anything after that is in the future)

interval: 1 hour, rollup: 10s, time: 2022-05-18 23:22:38
Group id 109, jitter is 9 s
Previous start/end:  2022-05-18 22:22:30 2022-05-18 23:22:40
Jitter start/end: 2022-05-18 22:22:29 2022-05-18 23:22:39

interval: 1 minute, rollup: 10s, time: 2022-05-18 22:33:05
Group id 103, so jitter is 3s.
Previously Rolls up into start/end: 2022-05-18 22:32:00 2022-05-18 22:33:10
With jitter we roll up to: 2022-05-18 22:32:03 2022-05-18 22:33:13
Since jitter is 3 seconds, we want to round down to the nearest second where second % 10 = 3.

For the same group, but with time 2022-05-18 22:33:02
Previously Rolls up into start/end: 2022-05-18 22:32:00 2022-05-18 22:33:10
With jitter we roll up to: 2022-05-18 22:31:53 2022-05-18 22:33:03
Dan Fuller 2 years ago
parent
commit
4888bf6769

+ 3 - 0
src/sentry/rules/conditions/event_frequency.py

@@ -193,6 +193,7 @@ class EventFrequencyCondition(BaseEventFrequencyCondition):
             end=end,
             environment_id=environment_id,
             use_cache=True,
+            jitter_value=event.group_id,
         )
         return sums[event.group_id]
 
@@ -209,6 +210,7 @@ class EventUniqueUserFrequencyCondition(BaseEventFrequencyCondition):
             end=end,
             environment_id=environment_id,
             use_cache=True,
+            jitter_value=event.group_id,
         )
         return totals[event.group_id]
 
@@ -309,6 +311,7 @@ class EventFrequencyPercentCondition(BaseEventFrequencyCondition):
                 end=end,
                 environment_id=environment_id,
                 use_cache=True,
+                jitter_value=event.group_id,
             )[event.group_id]
             if issue_count > avg_sessions_in_interval:
                 # We want to better understand when and why this is happening, so we're logging it for now

+ 21 - 1
src/sentry/tsdb/base.py

@@ -361,7 +361,17 @@ class BaseTSDB(Service):
         """
         raise NotImplementedError
 
-    def get_sums(self, model, keys, start, end, rollup=None, environment_id=None, use_cache=False):
+    def get_sums(
+        self,
+        model,
+        keys,
+        start,
+        end,
+        rollup=None,
+        environment_id=None,
+        use_cache=False,
+        jitter_value=None,
+    ):
         range_set = self.get_range(
             model,
             keys,
@@ -370,10 +380,19 @@ class BaseTSDB(Service):
             rollup,
             environment_ids=[environment_id] if environment_id is not None else None,
             use_cache=use_cache,
+            jitter_value=jitter_value,
         )
         sum_set = {key: sum(p for _, p in points) for (key, points) in range_set.items()}
         return sum_set
 
+    def _add_jitter_to_series(self, series, start, rollup, jitter_value):
+        if jitter_value and series:
+            jitter = jitter_value % rollup
+            if (start - to_datetime(series[0])).total_seconds() < jitter:
+                jitter -= rollup
+            return [value + jitter for value in series]
+        return series
+
     def rollup(self, values, rollup):
         """
         Given a set of values (as returned from ``get_range``), roll them up
@@ -423,6 +442,7 @@ class BaseTSDB(Service):
         rollup=None,
         environment_id=None,
         use_cache=False,
+        jitter_value=None,
     ):
         """
         Count distinct items during a time range.

+ 18 - 2
src/sentry/tsdb/dummy.py

@@ -22,7 +22,15 @@ class DummyTSDB(BaseTSDB):
         self.validate_arguments(models, environment_ids)
 
     def get_range(
-        self, model, keys, start, end, rollup=None, environment_ids=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end,
+        rollup=None,
+        environment_ids=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         self.validate_arguments([model], environment_ids if environment_ids is not None else [None])
         _, series = self.get_optimal_rollup_series(start, end, rollup)
@@ -39,7 +47,15 @@ class DummyTSDB(BaseTSDB):
         return {k: [(ts, 0) for ts in series] for k in keys}
 
     def get_distinct_counts_totals(
-        self, model, keys, start, end=None, rollup=None, environment_id=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end=None,
+        rollup=None,
+        environment_id=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         self.validate_arguments([model], [environment_id])
         return {k: 0 for k in keys}

+ 18 - 2
src/sentry/tsdb/inmemory.py

@@ -61,7 +61,15 @@ class InMemoryTSDB(BaseTSDB):
                             data.pop(self.normalize_to_rollup(timestamp, rollup), 0)
 
     def get_range(
-        self, model, keys, start, end, rollup=None, environment_ids=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end,
+        rollup=None,
+        environment_ids=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         self.validate_arguments([model], environment_ids if environment_ids is not None else [None])
 
@@ -120,7 +128,15 @@ class InMemoryTSDB(BaseTSDB):
         return results
 
     def get_distinct_counts_totals(
-        self, model, keys, start, end=None, rollup=None, environment_id=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end=None,
+        rollup=None,
+        environment_id=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         self.validate_arguments([model], [environment_id])
 

+ 19 - 2
src/sentry/tsdb/redis.py

@@ -261,7 +261,15 @@ class RedisTSDB(BaseTSDB):
                         client.expireat(hash_key, key_expiries.pop(hash_key))
 
     def get_range(
-        self, model, keys, start, end, rollup=None, environment_ids=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end,
+        rollup=None,
+        environment_ids=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         """
         To get a range of data for group ID=[1, 2, 3]:
@@ -435,7 +443,15 @@ class RedisTSDB(BaseTSDB):
         }
 
     def get_distinct_counts_totals(
-        self, model, keys, start, end=None, rollup=None, environment_id=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end=None,
+        rollup=None,
+        environment_id=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         """
         Count distinct items during a time range.
@@ -668,6 +684,7 @@ class RedisTSDB(BaseTSDB):
                     # Figure out all of the keys we need to be incrementing, as
                     # well as their expiration policies.
                     for rollup, max_values in self.rollups.items():
+                        chunk = []
                         for environment_id in environment_ids:
                             chunk = self.make_frequency_table_keys(
                                 model, rollup, ts, key, environment_id

+ 18 - 1
src/sentry/tsdb/snuba.py

@@ -167,6 +167,7 @@ class SnubaTSDB(BaseTSDB):
         group_on_time=False,
         conditions=None,
         use_cache=False,
+        jitter_value=None,
     ):
         """
         Normalizes all the TSDB parameters and sends a query to snuba.
@@ -219,6 +220,11 @@ class SnubaTSDB(BaseTSDB):
         # we grab the original bucketed series and add the rollup time to the
         # timestamp of the last bucket to get the end time.
         rollup, series = self.get_optimal_rollup_series(start, end, rollup)
+
+        # If jitter_value is provided then we use it to offset the buckets we round start/end to by
+        # up  to `rollup` seconds.
+        series = self._add_jitter_to_series(series, start, rollup, jitter_value)
+
         start = to_datetime(series[0])
         end = to_datetime(series[-1] + rollup)
         limit = min(10000, int(len(keys) * ((end - start).total_seconds() / rollup)))
@@ -309,6 +315,7 @@ class SnubaTSDB(BaseTSDB):
         environment_ids=None,
         conditions=None,
         use_cache=False,
+        jitter_value=None,
     ):
         model_query_settings = self.model_query_settings.get(model)
         assert model_query_settings is not None, f"Unsupported TSDBModel: {model.name}"
@@ -329,6 +336,7 @@ class SnubaTSDB(BaseTSDB):
             group_on_time=True,
             conditions=conditions,
             use_cache=use_cache,
+            jitter_value=jitter_value,
         )
         # convert
         #    {group:{timestamp:count, ...}}
@@ -356,7 +364,15 @@ class SnubaTSDB(BaseTSDB):
         return {k: sorted(result[k].items()) for k in result}
 
     def get_distinct_counts_totals(
-        self, model, keys, start, end=None, rollup=None, environment_id=None, use_cache=False
+        self,
+        model,
+        keys,
+        start,
+        end=None,
+        rollup=None,
+        environment_id=None,
+        use_cache=False,
+        jitter_value=None,
     ):
         return self.get_data(
             model,
@@ -367,6 +383,7 @@ class SnubaTSDB(BaseTSDB):
             [environment_id] if environment_id is not None else None,
             aggregation="uniq",
             use_cache=use_cache,
+            jitter_value=jitter_value,
         )
 
     def get_distinct_counts_union(

+ 49 - 1
tests/snuba/tsdb/test_tsdb_backend.py

@@ -8,7 +8,7 @@ from sentry.testutils import SnubaTestCase, TestCase
 from sentry.testutils.helpers.datetime import iso_format
 from sentry.tsdb.base import TSDBModel
 from sentry.tsdb.snuba import SnubaTSDB
-from sentry.utils.dates import to_timestamp
+from sentry.utils.dates import to_datetime, to_timestamp
 
 
 def timestamp(d):
@@ -461,3 +461,51 @@ class SnubaTSDBTest(TestCase, SnubaTestCase):
             start = end + timedelta(hours=-1, seconds=rollup)
             self.db.get_data(TSDBModel.group, [1, 2, 3, 4, 5], start, end, rollup=rollup)
             assert snuba.query.call_args[1]["limit"] == 5
+
+
+class AddJitterToSeriesTest(TestCase):
+    def setUp(self):
+        self.db = SnubaTSDB()
+
+    def run_test(self, end, interval, jitter, expected_start, expected_end):
+        end = end.replace(tzinfo=pytz.UTC)
+        start = end - interval
+        rollup, rollup_series = self.db.get_optimal_rollup_series(start, end)
+        series = self.db._add_jitter_to_series(rollup_series, start, rollup, jitter)
+        assert to_datetime(series[0]) == expected_start.replace(tzinfo=pytz.UTC)
+        assert to_datetime(series[-1]) == expected_end.replace(tzinfo=pytz.UTC)
+
+    def test(self):
+        self.run_test(
+            end=datetime(2022, 5, 18, 10, 23, 4),
+            interval=timedelta(hours=1),
+            jitter=5,
+            expected_start=datetime(2022, 5, 18, 9, 22, 55),
+            expected_end=datetime(2022, 5, 18, 10, 22, 55),
+        )
+        self.run_test(
+            end=datetime(2022, 5, 18, 10, 23, 8),
+            interval=timedelta(hours=1),
+            jitter=5,
+            expected_start=datetime(2022, 5, 18, 9, 23, 5),
+            expected_end=datetime(2022, 5, 18, 10, 23, 5),
+        )
+        # Jitter should be the same
+        self.run_test(
+            end=datetime(2022, 5, 18, 10, 23, 8),
+            interval=timedelta(hours=1),
+            jitter=55,
+            expected_start=datetime(2022, 5, 18, 9, 23, 5),
+            expected_end=datetime(2022, 5, 18, 10, 23, 5),
+        )
+        self.run_test(
+            end=datetime(2022, 5, 18, 22, 33, 2),
+            interval=timedelta(minutes=1),
+            jitter=3,
+            expected_start=datetime(2022, 5, 18, 22, 31, 53),
+            expected_end=datetime(2022, 5, 18, 22, 32, 53),
+        )
+
+    def test_empty_series(self):
+        assert self.db._add_jitter_to_series([], datetime(2022, 5, 18, 10, 23, 4), 60, 127) == []
+        assert self.db._add_jitter_to_series([], datetime(2022, 5, 18, 10, 23, 4), 60, None) == []