Browse Source

feat(release-health): Exclude current and previous hour from comparisons (#32211)

These are highly volatile since the bulk of real-time sessions are added
in this time frame. Ingestion delays in metrics vs sessions make the
comparisons indeterministic.
Joris Bayer 3 years ago
parent
commit
1d78c4027c
2 changed files with 129 additions and 17 deletions
  1. 88 15
      src/sentry/release_health/duplex.py
  2. 41 2
      tests/sentry/release_health/test_duplex.py

+ 88 - 15
src/sentry/release_health/duplex.py

@@ -46,6 +46,7 @@ from sentry.release_health.base import (
 )
 from sentry.release_health.metrics import MetricsReleaseHealthBackend
 from sentry.release_health.sessions import SessionsReleaseHealthBackend
+from sentry.snuba.metrics.helpers import get_intervals
 from sentry.snuba.sessions import get_rollup_starts_and_buckets
 from sentry.snuba.sessions_v2 import QueryDefinition
 from sentry.utils.metrics import incr, timer, timing
@@ -96,7 +97,25 @@ class ListSet:
             self.index_by = cast(Callable[[Any], Any], index_by)
 
 
-Schema = Union[ComparatorType, List[Any], Mapping[str, Any], Set[Any], ListSet, Tuple[Any, ...]]
+class FixedList:
+    """
+    List with a fixed number of elements, where each element has a separate
+    schema.
+    """
+
+    def __init__(self, child_schemas: List["Schema"]):
+        self.child_schemas = child_schemas
+
+    def __eq__(self, other: object) -> bool:
+        return isinstance(other, FixedList) and self.child_schemas == other.child_schemas
+
+    def __repr__(self) -> str:
+        return f"FixedList({self.child_schemas})"
+
+
+Schema = Union[
+    ComparatorType, List[Any], Mapping[str, Any], Set[Any], ListSet, FixedList, Tuple[Any, ...]
+]
 
 
 class ComparisonError:
@@ -474,6 +493,37 @@ def compare_list_set(
     )
 
 
+def compare_fixed_list(
+    sessions: ReleaseHealthResult,
+    metrics: ReleaseHealthResult,
+    rollup: int,
+    path: str,
+    schema: FixedList,
+) -> List[ComparisonError]:
+    errors = []
+    expected_length = len(schema.child_schemas)
+    if len(sessions) != expected_length:
+        errors.append(
+            ComparisonError(
+                f"Wrong number of elements in sessions list: expected {expected_length}, got {len(sessions)}"
+            )
+        )
+    if len(metrics) != expected_length:
+        errors.append(
+            ComparisonError(
+                f"Wrong number of elements in metrics list: expected {expected_length}, got {len(metrics)}"
+            )
+        )
+
+    for idx, (child_schema, session, metric) in enumerate(
+        zip(schema.child_schemas, sessions, metrics)
+    ):
+        elm_path = f"{path}[{idx}]"
+        errors += compare_results(session, metric, rollup, elm_path, child_schema)
+
+    return errors
+
+
 def compare_results(
     sessions: ReleaseHealthResult,
     metrics: ReleaseHealthResult,
@@ -505,6 +555,8 @@ def compare_results(
             return []
     elif isinstance(schema, ListSet):  # we only support ListSet in Schemas (not in the results)
         return compare_list_set(sessions, metrics, rollup, path, schema)
+    elif isinstance(schema, FixedList):
+        return compare_fixed_list(sessions, metrics, rollup, path, schema)
     elif isinstance(discriminator, tuple):
         assert schema is None or isinstance(schema, tuple)
         return compare_tuples(sessions, metrics, rollup, path, schema)
@@ -533,6 +585,36 @@ def tag_delta(errors: List[ComparisonError], tags: Mapping[str, str]) -> None:
         set_tag("rh.duplex.rel_change", tag_value)
 
 
+def get_sessionsv2_schema(now: datetime, query: QueryDefinition) -> Mapping[str, FixedList]:
+    schema_for_totals = {
+        "sum(session)": ComparatorType.Counter,
+        "count_unique(user)": ComparatorType.Counter,
+        "avg(session.duration)": ComparatorType.Quantile,
+        "p50(session.duration)": ComparatorType.Quantile,
+        "p75(session.duration)": ComparatorType.Quantile,
+        "p90(session.duration)": ComparatorType.Quantile,
+        "p95(session.duration)": ComparatorType.Quantile,
+        "p99(session.duration)": ComparatorType.Quantile,
+        "max(session.duration)": ComparatorType.Quantile,
+    }
+
+    # Exclude recent timestamps from comparisons
+    start_of_hour = now.replace(minute=0, second=0, microsecond=0)
+    max_timestamp = start_of_hour - timedelta(hours=1)
+    return {
+        field: FixedList(
+            [
+                # Use exclusive range here, because with hourly buckets,
+                # timestamp 09:00 contains data for the range 09:00 - 10:00,
+                # And we want to still exclude that at 10:01
+                comparator if timestamp < max_timestamp else ComparatorType.Ignore
+                for timestamp in get_intervals(query)
+            ]
+        )
+        for field, comparator in schema_for_totals.items()
+    }
+
+
 class DuplexReleaseHealthBackend(ReleaseHealthBackend):
     DEFAULT_ROLLUP = 60 * 60  # 1h
 
@@ -754,22 +836,13 @@ class DuplexReleaseHealthBackend(ReleaseHealthBackend):
     ) -> SessionsQueryResult:
         rollup = query.rollup
 
-        schema_for_totals = {
-            "sum(session)": ComparatorType.Counter,
-            "count_unique(user)": ComparatorType.Counter,
-            "avg(session.duration)": ComparatorType.Quantile,
-            "p50(session.duration)": ComparatorType.Quantile,
-            "p75(session.duration)": ComparatorType.Quantile,
-            "p90(session.duration)": ComparatorType.Quantile,
-            "p95(session.duration)": ComparatorType.Quantile,
-            "p99(session.duration)": ComparatorType.Quantile,
-            "max(session.duration)": ComparatorType.Quantile,
-        }
-        schema_for_series = {field: [comparator] for field, comparator in schema_for_totals.items()}
+        now = datetime.now(timezone.utc)
+
+        schema_for_series = get_sessionsv2_schema(now, query)
 
         # Tag sentry event with relative end time, so we can see if live queries
         # cause greater deltas:
-        relative_hours = math.ceil((query.end - datetime.now(timezone.utc)).total_seconds() / 3600)
+        relative_hours = math.ceil((query.end - now).total_seconds() / 3600)
         set_tag("run_sessions_query.rel_end", f"{relative_hours}h")
 
         project_ids = query.filter_keys.get("project_id")
@@ -795,7 +868,7 @@ class DuplexReleaseHealthBackend(ReleaseHealthBackend):
                 schema={
                     "by": ComparatorType.Ignore,
                     "series": schema_for_series,
-                    "totals": schema_for_totals,
+                    "totals": ComparatorType.Ignore,
                 },
                 index_by=index_by,
             ),

+ 41 - 2
tests/sentry/release_health/test_duplex.py

@@ -1,11 +1,19 @@
-from datetime import datetime
+from datetime import datetime, timezone
 from unittest.mock import MagicMock
 
 import pytest
+from django.utils.datastructures import MultiValueDict
+from freezegun import freeze_time
 
 from sentry.release_health import duplex
 from sentry.release_health.duplex import ComparatorType as Ct
-from sentry.release_health.duplex import DuplexReleaseHealthBackend, ListSet
+from sentry.release_health.duplex import (
+    DuplexReleaseHealthBackend,
+    FixedList,
+    ListSet,
+    get_sessionsv2_schema,
+)
+from sentry.snuba.sessions_v2 import AllowedResolution, QueryDefinition
 
 
 @pytest.mark.parametrize(
@@ -218,6 +226,19 @@ def test_compare_list_set(sessions, metrics, schema, are_equal):
     assert (len(result) == 0) == are_equal
 
 
+@pytest.mark.parametrize(
+    "sessions,metrics,schema, are_equal",
+    [
+        ([1, 2], [1], FixedList([Ct.Exact, Ct.Exact]), False),
+        ([1, 2], [1, 2], FixedList([Ct.Exact, Ct.Exact]), True),
+        ([1, 2, 3], [1, 2, 4], FixedList([Ct.Exact, Ct.Exact, Ct.Ignore]), True),
+    ],
+)
+def test_compare_fixed_list(sessions, metrics, schema, are_equal):
+    result = duplex.compare_fixed_list(sessions, metrics, 60, "", schema)
+    assert (len(result) == 0) == are_equal
+
+
 @pytest.mark.parametrize(
     "sessions,metrics,schema, are_equal",
     [
@@ -489,3 +510,21 @@ def test_function_dispatch_is_working():
     duplex.sessions.get_current_and_previous_crash_free_rates.assert_called_with(*call_params)
     # check metrics backend were not called again (only one original call)
     assert duplex.metrics.get_current_and_previous_crash_free_rates.call_count == 1
+
+
+@freeze_time("2022-03-02 15:17")
+def test_get_sessionsv2_schema():
+    query = QueryDefinition(
+        query=MultiValueDict(
+            {
+                "statsPeriod": ["24h"],
+                "interval": ["1h"],
+                "field": ["sum(session)", "avg(session.duration)"],
+            }
+        ),
+        params={},
+        allowed_resolution=AllowedResolution.one_hour,
+    )
+    schema = get_sessionsv2_schema(datetime.now(timezone.utc), query)
+    assert schema["sum(session)"] == FixedList(22 * [Ct.Counter] + 2 * [Ct.Ignore])
+    assert schema["avg(session.duration)"] == FixedList(22 * [Ct.Quantile] + 2 * [Ct.Ignore])