Browse Source

feat(ds): Implements release boosting functionality for ds [TET-496] (#40403)

Sets releases that should be boosted with ds into the cache when a
transaction is observed in the event manager. The logic is as follows
once a transaction from a release that wasn't observed in the previous
24 hours is received, a cache key for that release is set with an
expiration of one day and then that release is set into a list of
boosted releases into the cache with an expiration of 1h, then the
project config is invalidated so we recompute the project config with
new dynamic sampling rule to boost that release with a hardcoded
interval for one hour. If that release doesn't send any transactions in
the next 24 hours i.e. after the 24 hour cache key expires and then
starts sending transaction again, we want to start boosting the release
again for an hour. This PR is one part of two parts, and only handles
the setting of the cache and the invalidation of the project config, but
does not include the dynamic sampling rules to be sent to relay. This is
by design so we can merge this into production and monitor the
performance impact of this logic before committing to adding the dynamic
sampling rules

As a follow up, add a PR that only runs this logic if the feature flags
for dynamic sampling are enabled, however we want to merge this without
that check to monitor production load
Ahmed Etefy 2 years ago
parent
commit
0fc7bab05d

+ 0 - 1
mypy.ini

@@ -44,7 +44,6 @@ files = fixtures/mypy-stubs,
         src/sentry/db/models/query.py,
         src/sentry/db/models/utils.py,
         src/sentry/digests/,
-        src/sentry/dynamic_sampling/,
         src/sentry/eventstream/base.py,
         src/sentry/eventstream/snuba.py,
         src/sentry/eventstream/kafka/consumer_strategy.py,

+ 85 - 0
src/sentry/dynamic_sampling/latest_release_booster.py

@@ -0,0 +1,85 @@
+from datetime import datetime
+
+from django.conf import settings
+from pytz import UTC
+
+from sentry.utils import redis
+
+BOOSTED_RELEASE_TIMEOUT = 60 * 60
+ONE_DAY_TIMEOUT = 60 * 60 * 24
+
+
+def get_redis_client_for_ds():
+    cluster_key = getattr(settings, "SENTRY_DYNAMIC_SAMPLING_RULES_REDIS_CLUSTER", "default")
+    return redis.redis_clusters.get(cluster_key)
+
+
+def generate_cache_key_for_observed_release(project_id, release_id):
+    """
+    Generates a cache key for releases that had a transaction observed in the last 24 hours
+    """
+    return f"ds::p:{project_id}:r:{release_id}"
+
+
+def generate_cache_key_for_boosted_release(project_id):
+    """
+    Generates a cache key for the boosted releases for a given project.
+    """
+    return f"ds::p:{project_id}:boosted_releases"
+
+
+def observe_release(project_id, release_id):
+    """
+    Checks if release was observed in the last 24 hours, and resets the cache timeout. If the release was observed,
+    returns True otherwise returns False.
+    """
+    redis_client = get_redis_client_for_ds()
+    cache_key = generate_cache_key_for_observed_release(project_id, release_id)
+
+    # TODO(ahmed): Modify these two statements into one once we upgrade to a higher redis-py version as in newer
+    #  versions these two operations can be done in a single call.
+    release_observed = redis_client.getset(name=cache_key, value=1)
+    redis_client.pexpire(cache_key, ONE_DAY_TIMEOUT)
+    return release_observed == "1"
+
+
+def get_boosted_releases(project_id):
+    """
+    Function that returns the releases that should be boosted for a given project, and excludes expired releases.
+    """
+    cache_key = generate_cache_key_for_boosted_release(project_id)
+    current_timestamp = datetime.utcnow().replace(tzinfo=UTC).timestamp()
+
+    redis_client = get_redis_client_for_ds()
+    old_boosted_releases = redis_client.hgetall(cache_key)
+
+    boosted_releases = []
+    expired_releases = []
+    for release_id, timestamp in old_boosted_releases.items():
+        if current_timestamp <= float(timestamp) + BOOSTED_RELEASE_TIMEOUT:
+            boosted_releases.append((int(release_id), float(timestamp)))
+        else:
+            expired_releases.append(release_id)
+
+    if expired_releases:
+        redis_client.hdel(cache_key, *expired_releases)
+    return boosted_releases
+
+
+def add_boosted_release(project_id, release_id):
+    """
+    Function that adds a release to the list of active boosted releases for a given project.
+    """
+    # Called here for expired releases cleanup
+    get_boosted_releases(project_id)
+
+    cache_key = generate_cache_key_for_boosted_release(project_id)
+    redis_client = get_redis_client_for_ds()
+    # TODO(ahmed): Modify these two statements into one once we upgrade to a higher redis-py version as in newer
+    #  versions these two operations can be done in a single call.
+    redis_client.hset(
+        cache_key,
+        release_id,
+        datetime.utcnow().replace(tzinfo=UTC).timestamp(),
+    )
+    redis_client.pexpire(cache_key, ONE_DAY_TIMEOUT)

+ 14 - 0
src/sentry/event_manager.py

@@ -37,6 +37,7 @@ from sentry.constants import (
     DataCategory,
 )
 from sentry.culprit import generate_culprit
+from sentry.dynamic_sampling.latest_release_booster import add_boosted_release, observe_release
 from sentry.eventstore.processing import event_processing_store
 from sentry.grouping.api import (
     BackgroundGroupingConfigLoader,
@@ -92,6 +93,7 @@ from sentry.signals import first_event_received, first_transaction_received, iss
 from sentry.tasks.commits import fetch_commits
 from sentry.tasks.integrations import kick_off_status_syncs
 from sentry.tasks.process_buffer import buffer_incr
+from sentry.tasks.relay import schedule_invalidate_project_config
 from sentry.types.activity import ActivityType
 from sentry.types.issues import GroupCategory
 from sentry.utils import json, metrics
@@ -821,6 +823,18 @@ def _get_or_create_release_many(jobs, projects):
                     pop_tag(job["data"], "dist")
                     set_tag(job["data"], "sentry:dist", job["dist"].name)
 
+                # Dynamic Sampling - Boosting latest release functionality
+                if data.get("type") == "transaction":
+                    try:
+                        release_observed_in_last_24h = observe_release(project_id, release.id)
+                        if not release_observed_in_last_24h:
+                            add_boosted_release(project_id, release.id)
+                            schedule_invalidate_project_config(
+                                project_id=project_id, trigger="dynamic_sampling:boost_release"
+                            )
+                    except Exception:
+                        pass
+
 
 @metrics.wraps("save_event.get_event_user_many")
 def _get_event_user_many(jobs, projects):

+ 134 - 0
tests/sentry/event_manager/test_event_manager.py

@@ -23,6 +23,11 @@ from fixtures.github import (
 from sentry import audit_log, nodestore, tsdb
 from sentry.attachments import CachedAttachment, attachment_cache
 from sentry.constants import MAX_VERSION_LENGTH, DataCategory
+from sentry.dynamic_sampling.latest_release_booster import (
+    BOOSTED_RELEASE_TIMEOUT,
+    get_boosted_releases,
+    get_redis_client_for_ds,
+)
 from sentry.event_manager import (
     EventManager,
     EventUser,
@@ -2689,3 +2694,132 @@ class ReleaseIssueTest(TestCase):
             last_seen=self.timestamp + 100,
             first_seen=self.timestamp + 100,
         )
+
+
+@region_silo_test
+class ReleaseDSLatestReleaseBoost(TestCase):
+    def setUp(self):
+        self.project = self.create_project()
+        self.release = Release.get_or_create(self.project, "1.0")
+        self.environment1 = Environment.get_or_create(self.project, "prod")
+        self.environment2 = Environment.get_or_create(self.project, "staging")
+        self.timestamp = float(int(time() - 300))
+        self.redis_client = get_redis_client_for_ds()
+
+    def make_transaction_event(self, **kwargs):
+        result = {
+            "transaction": "wait",
+            "contexts": {
+                "trace": {
+                    "parent_span_id": "bce14471e0e9654d",
+                    "op": "foobar",
+                    "trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
+                    "span_id": "bf5be759039ede9a",
+                }
+            },
+            "spans": [],
+            "timestamp": self.timestamp + 0.23,
+            "start_timestamp": "2019-06-14T14:01:40Z",
+            "type": "transaction",
+        }
+        result.update(kwargs)
+        return result
+
+    def make_release_transaction(
+        self, release_version="1.0", environment_name="prod", project_id=1, **kwargs
+    ):
+        transaction = self.make_transaction_event(
+            release=release_version, environment=environment_name, event_id=uuid.uuid1().hex
+        )
+        transaction.update(kwargs)
+        manager = EventManager(transaction)
+        with self.tasks():
+            event = manager.save(project_id)
+        return event
+
+    @freeze_time()
+    def test_boost_release_when_first_observed(self):
+        self.make_release_transaction(
+            release_version=self.release.version,
+            environment_name=self.environment1.name,
+            project_id=self.project.id,
+            checksum="a" * 32,
+            timestamp=self.timestamp,
+        )
+
+        ts = time()
+
+        assert self.redis_client.get(f"ds::p:{self.project.id}:r:{self.release.id}") == "1"
+        assert self.redis_client.hgetall(f"ds::p:{self.project.id}:boosted_releases") == {
+            str(self.release.id): str(ts)
+        }
+
+        new_release = Release.get_or_create(self.project, "2.0")
+
+        self.make_release_transaction(
+            release_version=new_release.version,
+            environment_name=self.environment1.name,
+            project_id=self.project.id,
+            checksum="b" * 32,
+            timestamp=self.timestamp,
+        )
+
+        assert self.redis_client.get(f"ds::p:{self.project.id}:r:{new_release.id}") == "1"
+        assert self.redis_client.hgetall(f"ds::p:{self.project.id}:boosted_releases") == {
+            str(self.release.id): str(ts),
+            str(new_release.id): str(ts),
+        }
+
+    def test_ensure_release_not_boosted_when_it_is_not_first_observed(self):
+        self.redis_client.set(f"ds::p:{self.project.id}:r:{self.release.id}", 1, 60 * 60 * 24)
+        self.make_release_transaction(
+            release_version=self.release.version,
+            environment_name=self.environment1.name,
+            project_id=self.project.id,
+            checksum="b" * 32,
+            timestamp=self.timestamp,
+        )
+        assert self.redis_client.hgetall(f"ds::p:{self.project.id}:boosted_releases") == {}
+        assert get_boosted_releases(self.project.id) == []
+
+    @freeze_time()
+    def test_evict_expired_boosted_releases(self):
+        release_2 = Release.get_or_create(self.project, "2.0")
+        release_3 = Release.get_or_create(self.project, "3.0")
+
+        for release_id in (self.release.id, release_2.id):
+            self.redis_client.set(f"ds::p:{self.project.id}:r:{release_id}", 1, 60 * 60 * 24)
+            self.redis_client.hset(
+                f"ds::p:{self.project.id}:boosted_releases",
+                release_id,
+                time() - BOOSTED_RELEASE_TIMEOUT * 2,
+            )
+
+        self.make_release_transaction(
+            release_version=release_3.version,
+            environment_name=self.environment1.name,
+            project_id=self.project.id,
+            checksum="b" * 32,
+            timestamp=self.timestamp,
+        )
+        assert self.redis_client.hgetall(f"ds::p:{self.project.id}:boosted_releases") == {
+            str(release_3.id): str(time())
+        }
+        assert self.redis_client.get(f"ds::p:{self.project.id}:r:{release_3.id}") == "1"
+        assert get_boosted_releases(self.project.id) == [(release_3.id, time())]
+
+    @mock.patch("sentry.event_manager.schedule_invalidate_project_config")
+    def test_project_config_invalidation_is_triggered_when_new_release_is_observed(
+        self, mocked_invalidate
+    ):
+        self.make_release_transaction(
+            release_version=self.release.version,
+            environment_name=self.environment1.name,
+            project_id=self.project.id,
+            checksum="a" * 32,
+            timestamp=self.timestamp,
+        )
+        assert any(
+            o.kwargs["trigger"] == "dynamic_sampling:boost_release"
+            for o in mocked_invalidate.mock_calls
+        )