Browse Source

ref(releasehealth): Implement get_release_sessions_time_bounds in metrics backend [INGEST-249] (#28824)

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Markus Unterwaditzer 3 years ago
parent
commit
538b229949

+ 3 - 7
src/sentry/api/endpoints/organization_release_details.py

@@ -2,7 +2,7 @@ from django.db.models import Q
 from rest_framework.exceptions import ParseError
 from rest_framework.response import Response
 
-from sentry import features
+from sentry import features, release_health
 from sentry.api.base import ReleaseAnalyticsMixin
 from sentry.api.bases.organization import OrganizationReleasesBaseEndpoint
 from sentry.api.endpoints.organization_releases import (
@@ -20,11 +20,7 @@ from sentry.api.serializers.rest_framework import (
 )
 from sentry.models import Activity, Project, Release, ReleaseCommitError, ReleaseStatus
 from sentry.models.release import UnsafeReleaseDeletion
-from sentry.snuba.sessions import (
-    STATS_PERIODS,
-    get_adjacent_releases_based_on_adoption,
-    get_release_sessions_time_bounds,
-)
+from sentry.snuba.sessions import STATS_PERIODS, get_adjacent_releases_based_on_adoption
 from sentry.utils.sdk import bind_organization_context, configure_scope
 
 
@@ -391,7 +387,7 @@ class OrganizationReleaseDetailsEndpoint(
             environments = set(request.GET.getlist("environment")) or None
             current_project_meta.update(
                 {
-                    **get_release_sessions_time_bounds(
+                    **release_health.get_release_sessions_time_bounds(
                         project_id=int(project_id),
                         release=release.version,
                         org_id=organization.id,

+ 38 - 2
src/sentry/release_health/base.py

@@ -1,5 +1,5 @@
 from datetime import datetime
-from typing import Mapping, Optional, Sequence, Set, Tuple, TypeVar
+from typing import Mapping, Optional, Sequence, Set, Tuple, TypeVar, Union
 
 from typing_extensions import TypedDict
 
@@ -9,9 +9,9 @@ ProjectId = int
 OrganizationId = int
 ReleaseName = str
 EnvironmentName = str
+FormattedIsoTime = str
 
 ProjectRelease = Tuple[ProjectId, ReleaseName]
-
 ProjectOrRelease = TypeVar("ProjectOrRelease", ProjectId, ProjectRelease)
 
 
@@ -23,6 +23,19 @@ class CurrentAndPreviousCrashFreeRate(TypedDict):
 CurrentAndPreviousCrashFreeRates = Mapping[ProjectId, CurrentAndPreviousCrashFreeRate]
 
 
+class _TimeBounds(TypedDict):
+    sessions_lower_bound: FormattedIsoTime
+    sessions_upper_bound: FormattedIsoTime
+
+
+class _NoTimeBounds(TypedDict):
+    sessions_lower_bound: None
+    sessions_upper_bound: None
+
+
+ReleaseSessionsTimeBounds = Union[_TimeBounds, _NoTimeBounds]
+
+
 class ReleaseAdoption(TypedDict):
     #: Adoption rate (based on usercount) for a project's release from 0..100
     adoption: Optional[float]
@@ -48,6 +61,7 @@ class ReleaseHealthBackend(Service):  # type: ignore
         "get_current_and_previous_crash_free_rates",
         "get_release_adoption",
         "check_has_health_data",
+        "get_release_sessions_time_bounds",
         "check_releases_have_health_data",
     )
 
@@ -114,6 +128,28 @@ class ReleaseHealthBackend(Service):  # type: ignore
 
         raise NotImplementedError()
 
+    def get_release_sessions_time_bounds(
+        self,
+        project_id: ProjectId,
+        release: ReleaseName,
+        org_id: OrganizationId,
+        environments: Optional[Sequence[EnvironmentName]] = None,
+    ) -> ReleaseSessionsTimeBounds:
+        """
+        Get the sessions time bounds in terms of when the first session started and
+        when the last session started according to a specific (project_id, org_id, release, environments)
+        combination
+        Inputs:
+            * project_id
+            * release
+            * org_id: Organisation Id
+            * environments
+        Return:
+            Dictionary with two keys "sessions_lower_bound" and "sessions_upper_bound" that
+        correspond to when the first session occurred and when the last session occurred respectively
+        """
+        raise NotImplementedError()
+
     def check_has_health_data(
         self, projects_list: Sequence[ProjectOrRelease]
     ) -> Set[ProjectOrRelease]:

+ 153 - 9
src/sentry/release_health/metrics.py

@@ -2,7 +2,7 @@ from datetime import datetime, timedelta
 from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple, Union
 
 import pytz
-from snuba_sdk import BooleanCondition, Column, Condition, Entity, Op, Query
+from snuba_sdk import BooleanCondition, Column, Condition, Entity, Function, Op, Query
 from snuba_sdk.expressions import Granularity
 from snuba_sdk.query import SelectableExpression
 
@@ -17,27 +17,35 @@ from sentry.release_health.base import (
     ReleaseHealthBackend,
     ReleaseName,
     ReleasesAdoption,
+    ReleaseSessionsTimeBounds,
 )
 from sentry.sentry_metrics import indexer
-from sentry.snuba.dataset import Dataset
+from sentry.snuba.dataset import Dataset, EntityKey
 from sentry.utils.snuba import raw_snql_query
 
 
+class MetricIndexNotFound(Exception):
+    pass
+
+
 def metric_id(org_id: int, name: str) -> int:
     index = indexer.resolve(org_id, name)  # type: ignore
-    assert index is not None  # TODO: assert too strong?
+    if index is None:
+        raise MetricIndexNotFound(name)
     return index  # type: ignore
 
 
 def tag_key(org_id: int, name: str) -> str:
     index = indexer.resolve(org_id, name)  # type: ignore
-    assert index is not None
+    if index is None:
+        raise MetricIndexNotFound(name)
     return f"tags[{index}]"
 
 
 def tag_value(org_id: int, name: str) -> int:
     index = indexer.resolve(org_id, name)  # type: ignore
-    assert index is not None
+    if index is None:
+        raise MetricIndexNotFound(name)
     return index  # type: ignore
 
 
@@ -47,6 +55,9 @@ def try_get_tag_value(org_id: int, name: str) -> Optional[int]:
 
 def reverse_tag_value(org_id: int, index: int) -> str:
     str_value = indexer.reverse_resolve(org_id, index)  # type: ignore
+    # If the value can't be reversed it's very likely a real programming bug
+    # instead of something to be caught down: We probably got back a value from
+    # Snuba that's not in the indexer => partial data loss
     assert str_value is not None
     return str_value  # type: ignore
 
@@ -124,7 +135,7 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
 
         count_query = Query(
             dataset=Dataset.Metrics.value,
-            match=Entity("metrics_counters"),
+            match=Entity(EntityKey.MetricsCounters.value),
             select=[Column("value")],
             where=[
                 Condition(Column("org_id"), Op.EQ, org_id),
@@ -249,7 +260,7 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
         def _count_sessions(total: bool, referrer: str) -> Dict[Any, int]:
             query = Query(
                 dataset=Dataset.Metrics.value,
-                match=Entity("metrics_counters"),
+                match=Entity(EntityKey.MetricsCounters.value),
                 select=[Column("value")],
                 where=_get_common_where(total)
                 + [
@@ -270,7 +281,7 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
         def _count_users(total: bool, referrer: str) -> Dict[Any, int]:
             query = Query(
                 dataset=Dataset.Metrics.value,
-                match=Entity("metrics_sets"),
+                match=Entity(EntityKey.MetricsSets.value),
                 select=[Column("value")],
                 where=_get_common_where(total)
                 + [
@@ -348,6 +359,139 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
 
         return rv
 
+    def get_release_sessions_time_bounds(
+        self,
+        project_id: ProjectId,
+        release: ReleaseName,
+        org_id: OrganizationId,
+        environments: Optional[Sequence[EnvironmentName]] = None,
+    ) -> ReleaseSessionsTimeBounds:
+        select: List[SelectableExpression] = [
+            Function("min", [Column("timestamp")], "min"),
+            Function("max", [Column("timestamp")], "max"),
+        ]
+
+        try:
+            where: List[Union[BooleanCondition, Condition]] = [
+                Condition(Column("org_id"), Op.EQ, org_id),
+                Condition(Column("project_id"), Op.EQ, project_id),
+                Condition(Column(tag_key(org_id, "release")), Op.EQ, tag_value(org_id, release)),
+                Condition(Column("timestamp"), Op.GTE, datetime.min),
+                Condition(Column("timestamp"), Op.LT, datetime.now(pytz.utc)),
+            ]
+
+            if environments is not None:
+                env_filter = [
+                    x
+                    for x in [
+                        try_get_tag_value(org_id, environment) for environment in environments
+                    ]
+                    if x is not None
+                ]
+                if not env_filter:
+                    raise MetricIndexNotFound()
+
+                where.append(Condition(Column(tag_key(org_id, "environment")), Op.IN, env_filter))
+        except MetricIndexNotFound:
+            # Some filter condition can't be constructed and therefore can't be
+            # satisfied.
+            #
+            # Ignore return type because of https://github.com/python/mypy/issues/8533
+            return {"sessions_lower_bound": None, "sessions_upper_bound": None}  # type: ignore
+
+        # XXX(markus): We know that this combination of queries is not fully
+        # equivalent to the sessions-table based backend. Example:
+        #
+        # 1. Session sid=x is started with timestamp started=n
+        # 2. Same sid=x is updated with new payload with timestamp started=n - 1
+        #
+        # Old sessions backend would return [n - 1 ; n - 1] as range.
+        # New metrics backend would return [n ; n - 1] as range.
+        #
+        # We don't yet know if this case is relevant. Session's started
+        # timestamp shouldn't really change as session status is updated
+        # though.
+
+        try:
+            # Take care of initial values for session.started by querying the
+            # init counter. This should take care of most cases on its own.
+            init_sessions_query = Query(
+                dataset=Dataset.Metrics.value,
+                match=Entity(EntityKey.MetricsCounters.value),
+                select=select,
+                where=where
+                + [
+                    Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session")),
+                    Condition(
+                        Column(tag_key(org_id, "session.status")), Op.EQ, tag_value(org_id, "init")
+                    ),
+                ],
+            )
+
+            rows = raw_snql_query(
+                init_sessions_query,
+                referrer="release_health.metrics.get_release_sessions_time_bounds.init_sessions",
+                use_cache=False,
+            )["data"]
+        except MetricIndexNotFound:
+            rows = []
+
+        try:
+            # Take care of potential timestamp updates by looking at the metric
+            # for session duration, which is emitted once a session is closed ("terminal state")
+            #
+            # There is a testcase checked in that tests specifically for a
+            # session update that lowers session.started. We don't know if that
+            # testcase matters particularly.
+            terminal_sessions_query = Query(
+                dataset=Dataset.Metrics.value,
+                match=Entity(EntityKey.MetricsDistributions.value),
+                select=select,
+                where=where
+                + [
+                    Condition(Column("metric_id"), Op.EQ, metric_id(org_id, "session.duration")),
+                ],
+            )
+            rows.extend(
+                raw_snql_query(
+                    terminal_sessions_query,
+                    referrer="release_health.metrics.get_release_sessions_time_bounds.terminal_sessions",
+                    use_cache=False,
+                )["data"]
+            )
+        except MetricIndexNotFound:
+            pass
+
+        # This check is added because if there are no sessions found, then the
+        # aggregations query return both the sessions_lower_bound and the
+        # sessions_upper_bound as `0` timestamp and we do not want that behaviour
+        # by default
+        # P.S. To avoid confusion the `0` timestamp which is '1970-01-01 00:00:00'
+        # is rendered as '0000-00-00 00:00:00' in clickhouse shell
+        formatted_unix_start_time = datetime.utcfromtimestamp(0).strftime("%Y-%m-%dT%H:%M:%S+00:00")
+
+        lower_bound = None
+        upper_bound = None
+
+        for row in rows:
+            if set(row.values()) == {formatted_unix_start_time}:
+                continue
+            if lower_bound is None or row["min"] < lower_bound:
+                lower_bound = row["min"]
+            if upper_bound is None or row["max"] > upper_bound:
+                upper_bound = row["max"]
+
+        if lower_bound is None or upper_bound is None:
+            return {"sessions_lower_bound": None, "sessions_upper_bound": None}  # type: ignore
+
+        def iso_format_snuba_datetime(date: str) -> str:
+            return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S+00:00").isoformat()[:19] + "Z"
+
+        return {  # type: ignore
+            "sessions_lower_bound": iso_format_snuba_datetime(lower_bound),
+            "sessions_upper_bound": iso_format_snuba_datetime(upper_bound),
+        }
+
     def check_has_health_data(
         self, projects_list: Sequence[ProjectOrRelease]
     ) -> Set[ProjectOrRelease]:
@@ -408,7 +552,7 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
 
         query = Query(
             dataset=Dataset.Metrics.value,
-            match=Entity("metrics_counters"),
+            match=Entity(EntityKey.MetricsCounters.value),
             select=query_cols,
             where=where_clause,
             groupby=group_by_clause,

+ 13 - 0
src/sentry/release_health/sessions.py

@@ -10,11 +10,13 @@ from sentry.release_health.base import (
     ReleaseHealthBackend,
     ReleaseName,
     ReleasesAdoption,
+    ReleaseSessionsTimeBounds,
 )
 from sentry.snuba.sessions import (
     _check_has_health_data,
     _check_releases_have_health_data,
     _get_release_adoption,
+    _get_release_sessions_time_bounds,
     get_current_and_previous_crash_free_rates,
 )
 
@@ -52,6 +54,17 @@ class SessionsReleaseHealthBackend(ReleaseHealthBackend):
             project_releases=project_releases, environments=environments, now=now
         )
 
+    def get_release_sessions_time_bounds(
+        self,
+        project_id: ProjectId,
+        release: ReleaseName,
+        org_id: OrganizationId,
+        environments: Optional[Sequence[EnvironmentName]] = None,
+    ) -> ReleaseSessionsTimeBounds:
+        return _get_release_sessions_time_bounds(  # type: ignore
+            project_id=project_id, release=release, org_id=org_id, environments=environments
+        )
+
     def check_has_health_data(
         self, projects_list: Sequence[ProjectOrRelease]
     ) -> Set[ProjectOrRelease]:

+ 7 - 0
src/sentry/snuba/dataset.py

@@ -10,3 +10,10 @@ class Dataset(Enum):
     OutcomesRaw = "outcomes_raw"
     Sessions = "sessions"
     Metrics = "metrics"
+
+
+@unique
+class EntityKey(Enum):
+    MetricsSets = "metrics_sets"
+    MetricsCounters = "metrics_counters"
+    MetricsDistributions = "metrics_distributions"

+ 1 - 1
src/sentry/snuba/sessions.py

@@ -671,7 +671,7 @@ def get_project_release_stats(project_id, release, stat, rollup, start, end, env
     return stats, totals
 
 
-def get_release_sessions_time_bounds(project_id, release, org_id, environments=None):
+def _get_release_sessions_time_bounds(project_id, release, org_id, environments=None):
     """
     Get the sessions time bounds in terms of when the first session started and
     when the last session started according to a specific (project_id, org_id, release, environments)

+ 11 - 6
tests/snuba/sessions/test_sessions.py

@@ -14,7 +14,6 @@ from sentry.snuba.sessions import (
     get_project_releases_by_stability,
     get_project_releases_count,
     get_release_health_data_overview,
-    get_release_sessions_time_bounds,
 )
 from sentry.testutils import SnubaTestCase, TestCase
 from sentry.testutils.cases import SessionMetricsTestCase
@@ -513,19 +512,25 @@ class SnubaSessionsTest(TestCase, SnubaTestCase):
             }
         )
 
+        if isinstance(self.backend, MetricsReleaseHealthBackend):
+            truncation = {"second": 0}
+        else:
+            truncation = {"minute": 0}
+
         expected_formatted_lower_bound = (
             datetime.utcfromtimestamp(self.session_started - 3600 * 2)
-            .replace(minute=0)
+            .replace(**truncation)
             .isoformat()[:19]
             + "Z"
         )
 
         expected_formatted_upper_bound = (
-            datetime.utcfromtimestamp(self.session_started).replace(minute=0).isoformat()[:19] + "Z"
+            datetime.utcfromtimestamp(self.session_started).replace(**truncation).isoformat()[:19]
+            + "Z"
         )
 
         # Test for self.session_release
-        data = get_release_sessions_time_bounds(
+        data = self.backend.get_release_sessions_time_bounds(
             project_id=self.project.id,
             release=self.session_release,
             org_id=self.organization.id,
@@ -537,7 +542,7 @@ class SnubaSessionsTest(TestCase, SnubaTestCase):
         }
 
         # Test for self.session_crashed_release
-        data = get_release_sessions_time_bounds(
+        data = self.backend.get_release_sessions_time_bounds(
             project_id=self.project.id,
             release=self.session_crashed_release,
             org_id=self.organization.id,
@@ -553,7 +558,7 @@ class SnubaSessionsTest(TestCase, SnubaTestCase):
         Test that ensures if no sessions are available for a specific release then the bounds
         should be returned as None
         """
-        data = get_release_sessions_time_bounds(
+        data = self.backend.get_release_sessions_time_bounds(
             project_id=self.project.id,
             release="different_release",
             org_id=self.organization.id,