Browse Source

fix(metrics): Query session.status groups together (#32487)

Since #31392, we limit the number of groups the sessions V2 API can
return. But for the metrics-based implementation, when
?groupBy=session.status is requested, every group in the API response is
actually composed of multiple groups in Snuba, e.g. healthy := started -
errored.

Putting a limit on the number of groups means that sometimes, session
status groups are missing for the necessary arithmetic. This PR fixes
that shortcoming by querying session status groups from Snuba as columns
(using conditional aggregates) rather than rows.
Joris Bayer 3 years ago
parent
commit
10163a45f6

+ 85 - 28
src/sentry/release_health/metrics_sessions_v2.py

@@ -400,6 +400,10 @@ def _get_snuba_query(
 
     groupby = {}
     for field in query.raw_groupby:
+        if field == "session.status":
+            # This will be handled by conditional aggregates
+            continue
+
         if field == "project":
             groupby["project"] = Column("project_id")
             continue
@@ -516,20 +520,58 @@ def _fetch_data_for_field(
     # Find the field that needs a specific column in a specific metric
     metric_to_output_field: MutableMapping[Tuple[MetricKey, _VirtualColumnName], _OutputField] = {}
 
+    group_by_status = "session.status" in query.raw_groupby
+
+    # We limit the number of groups returned, but because session status
+    # groups in the response are actually composed of multiple groups in storage,
+    # we need to make sure we get them all. For this, use conditional aggregates:
+    def get_column_for_status(function_name: str, prefix: str, status: str) -> Function:
+        return Function(
+            f"{function_name}If",
+            [
+                Column("value"),
+                Function(
+                    "equals",
+                    [Column(tag_key_session_status), indexer.resolve(status)],
+                ),
+            ],
+            alias=f"{prefix}_{status}",
+        )
+
     if "count_unique(user)" == raw_field:
         metric_id = indexer.resolve(MetricKey.USER.value)
         if metric_id is not None:
-            data.extend(
-                _get_snuba_query_data(
-                    org_id,
-                    query,
-                    EntityKey.MetricsSets,
-                    MetricKey.USER,
-                    metric_id,
-                    [Function("uniq", [Column("value")], "value")],
-                    limit_state,
+            if group_by_status:
+                data.extend(
+                    _get_snuba_query_data(
+                        org_id,
+                        query,
+                        EntityKey.MetricsSets,
+                        MetricKey.USER,
+                        metric_id,
+                        [
+                            # The order of these columns is important, because
+                            # the first column might get used in order by
+                            get_column_for_status("uniq", "users", "init"),
+                            get_column_for_status("uniq", "users", "abnormal"),
+                            get_column_for_status("uniq", "users", "crashed"),
+                            get_column_for_status("uniq", "users", "errored"),
+                        ],
+                        limit_state,
+                    )
+                )
+            else:
+                data.extend(
+                    _get_snuba_query_data(
+                        org_id,
+                        query,
+                        EntityKey.MetricsSets,
+                        MetricKey.USER,
+                        metric_id,
+                        [Function("uniq", [Column("value")], "value")],
+                        limit_state,
+                    )
                 )
-            )
             metric_to_output_field[(MetricKey.USER, "value")] = _UserField()
 
     if raw_field in _DURATION_FIELDS:
@@ -539,18 +581,13 @@ def _fetch_data_for_field(
             def get_virtual_column(field: SessionsQueryFunction) -> _VirtualColumnName:
                 return cast(_VirtualColumnName, field[:3])
 
-            group_by_status = "session.status" in query.raw_groupby
-
-            # If we're not grouping by status, we still need to filter down
+            # Filter down
             # to healthy sessions, because that's what sessions_v2 exposes:
-            if group_by_status:
-                column_condition = 1
-            else:
-                healthy = indexer.resolve("exited")
-                if healthy is None:
-                    # There are no healthy sessions, return
-                    return [], {}
-                column_condition = Function("equals", (Column(tag_key_session_status), healthy))
+            healthy = indexer.resolve("exited")
+            if healthy is None:
+                # There are no healthy sessions, return
+                return [], {}
+            column_condition = Function("equals", (Column(tag_key_session_status), healthy))
 
             snuba_column = _to_column(raw_field, column_condition)
 
@@ -576,7 +613,7 @@ def _fetch_data_for_field(
     if "sum(session)" == raw_field:
         metric_id = indexer.resolve(MetricKey.SESSION.value)
         if metric_id is not None:
-            if "session.status" in query.raw_groupby:
+            if group_by_status:
                 # We need session counters grouped by status, as well as the number of errored sessions
 
                 # 1 session counters
@@ -587,7 +624,14 @@ def _fetch_data_for_field(
                         EntityKey.MetricsCounters,
                         MetricKey.SESSION,
                         metric_id,
-                        [Function("sum", [Column("value")], "value")],
+                        [
+                            # The order of these columns is important, because
+                            # the first column might get used in order by
+                            get_column_for_status("sum", "sessions", "init"),
+                            get_column_for_status("sum", "sessions", "abnormal"),
+                            get_column_for_status("sum", "sessions", "crashed"),
+                            get_column_for_status("sum", "sessions", "errored_preaggr"),
+                        ],
                         limit_state,
                     )
                 )
@@ -647,8 +691,6 @@ def _flatten_data(org_id: int, data: _SnubaDataByMetric) -> _DataPoints:
     for metric_key, metric_data in data:
         for row in metric_data:
             raw_session_status = row.pop(tag_key_session_status, None) or None
-            if raw_session_status is not None:
-                raw_session_status = reverse_resolve(raw_session_status)
             flat_key = _DataPointKey(
                 metric_key=metric_key,
                 raw_session_status=raw_session_status,
@@ -665,10 +707,25 @@ def _flatten_data(org_id: int, data: _SnubaDataByMetric) -> _DataPoints:
                     percentile_key = replace(flat_key, column=percentile)
                     data_points[percentile_key] = percentiles[i]
 
+            # Check for special group-by-status columns
+            for col in list(row.keys()):
+                if col.startswith("sessions_"):
+                    # Map column back to metric key
+                    new_key = replace(
+                        flat_key, metric_key=MetricKey.SESSION, raw_session_status=col[9:]
+                    )
+                    data_points[new_key] = row.pop(col) or 0
+                elif col.startswith("users_"):
+                    # Map column back to metric key
+                    new_key = replace(
+                        flat_key, metric_key=MetricKey.USER, raw_session_status=col[6:]
+                    )
+                    data_points[new_key] = row.pop(col) or 0
+
             # Remaining data are simple columns:
-            for key in list(row.keys()):
-                assert key in ("avg", "max", "value")
-                data_points[replace(flat_key, column=key)] = row.pop(key)
+            for col in list(row.keys()):
+                assert col in ("avg", "max", "value")
+                data_points[replace(flat_key, column=col)] = row.pop(col)
 
             assert row == {}
 

+ 102 - 0
tests/snuba/api/endpoints/test_organization_sessions.py

@@ -756,6 +756,108 @@ class OrganizationSessionsEndpointTest(APITestCase, SnubaTestCase):
                 },
             ]
 
+    @freeze_time(MOCK_DATETIME)
+    def test_snuba_limit_exceeded_groupby_status(self):
+        """Get consistent result when grouping by status"""
+        # 2 * 3 => only show two groups
+        with patch("sentry.snuba.sessions_v2.SNUBA_LIMIT", 6), patch(
+            "sentry.release_health.metrics_sessions_v2.SNUBA_LIMIT", 6
+        ):
+
+            response = self.do_request(
+                {
+                    "project": [-1],
+                    "statsPeriod": "3d",
+                    "interval": "1d",
+                    "field": ["sum(session)", "count_unique(user)"],
+                    "groupBy": ["project", "release", "environment", "session.status"],
+                }
+            )
+
+            assert response.status_code == 200, response.content
+            assert result_sorted(response.data)["groups"] == [
+                {
+                    "by": {
+                        "project": self.project1.id,
+                        "release": "foo@1.0.0",
+                        "session.status": "abnormal",
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 0, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 0], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "project": self.project1.id,
+                        "release": "foo@1.0.0",
+                        "session.status": "crashed",
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 0, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 0], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "project": self.project1.id,
+                        "release": "foo@1.0.0",
+                        "environment": "production",
+                        "session.status": "errored",
+                    },
+                    "totals": {"sum(session)": 0, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 0], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "project": self.project1.id,
+                        "session.status": "healthy",
+                        "release": "foo@1.0.0",
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 3, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 3], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "session.status": "abnormal",
+                        "release": "foo@1.0.0",
+                        "project": self.project3.id,
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 0, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 0], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "release": "foo@1.0.0",
+                        "project": self.project3.id,
+                        "session.status": "crashed",
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 0, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 0], "count_unique(user)": [0, 0, 0]},
+                },
+                {
+                    "by": {
+                        "release": "foo@1.0.0",
+                        "project": self.project3.id,
+                        "environment": "production",
+                        "session.status": "errored",
+                    },
+                    "totals": {"sum(session)": 1, "count_unique(user)": 1},
+                    "series": {"sum(session)": [0, 0, 1], "count_unique(user)": [0, 0, 1]},
+                },
+                {
+                    "by": {
+                        "session.status": "healthy",
+                        "release": "foo@1.0.0",
+                        "project": self.project3.id,
+                        "environment": "production",
+                    },
+                    "totals": {"sum(session)": 1, "count_unique(user)": 0},
+                    "series": {"sum(session)": [0, 0, 1], "count_unique(user)": [0, 0, 0]},
+                },
+            ]
+
     @freeze_time(MOCK_DATETIME)
     def test_environment_filter_not_present_in_query(self):
         self.create_environment(name="abc")