Browse Source

perf: move assemble status to redis (#70414)

This PR reverts the reverted revert, since
https://github.com/getsentry/getsentry/commit/6a76fde9a0988cc619bd3fbef5f8910a3f30a9c2
landed on Getsentry.
Yagiz Nizipli 10 months ago
parent
commit
c13fbca046

+ 1 - 0
src/sentry/conf/server.py

@@ -135,6 +135,7 @@ SENTRY_STATISTICAL_DETECTORS_REDIS_CLUSTER = "default"
 SENTRY_METRIC_META_REDIS_CLUSTER = "default"
 SENTRY_ESCALATION_THRESHOLDS_REDIS_CLUSTER = "default"
 SENTRY_SPAN_BUFFER_CLUSTER = "default"
+SENTRY_ASSEMBLE_CLUSTER = "default"
 
 # Hosts that are allowed to use system token authentication.
 # http://en.wikipedia.org/wiki/Reserved_IP_addresses

+ 4 - 0
src/sentry/options/defaults.py

@@ -2300,6 +2300,10 @@ register(
     flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
 )
 
+
+# Switch to read assemble status from Redis instead of memcache
+register("assemble.read_from_redis", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)
+
 # Sampling rates for testing Rust-based grouping enhancers
 
 # Rate at which to run the Rust implementation of `assemble_stacktrace_component`

+ 31 - 4
src/sentry/tasks/assemble.py

@@ -6,9 +6,11 @@ import uuid
 from abc import ABC, abstractmethod
 from datetime import datetime
 from os import path
-from typing import IO, Generic, NamedTuple, Protocol, TypeVar
+from typing import IO, TYPE_CHECKING, Generic, NamedTuple, Protocol, TypeVar
 
+import orjson
 import sentry_sdk
+from django.conf import settings
 from django.db import IntegrityError, router
 from django.db.models import Q
 from django.utils import timezone
@@ -39,13 +41,16 @@ from sentry.models.release import Release
 from sentry.models.releasefile import ReleaseArchive, ReleaseFile, update_artifact_index
 from sentry.silo.base import SiloMode
 from sentry.tasks.base import instrumented_task
-from sentry.utils import metrics
+from sentry.utils import metrics, redis
 from sentry.utils.db import atomic_transaction
 from sentry.utils.files import get_max_file_size
 from sentry.utils.sdk import bind_organization_context, configure_scope
 
 logger = logging.getLogger(__name__)
 
+if TYPE_CHECKING:
+    from rediscluster import RedisCluster
+
 
 class ChunkFileState:
     OK = "ok"  # File in database
@@ -164,12 +169,18 @@ def _get_cache_key(task, scope, checksum):
             % (
                 str(scope).encode("ascii"),
                 checksum.encode("ascii"),
-                str(task).encode("utf-8"),
+                str(task).encode(),
             )
         ).hexdigest()
     )
 
 
+def _get_redis_cluster_for_assemble() -> RedisCluster:
+    cluster_key = settings.SENTRY_ASSEMBLE_CLUSTER
+    return redis.redis_clusters.get(cluster_key)  # type: ignore[return-value]
+
+
+@sentry_sdk.tracing.trace
 def get_assemble_status(task, scope, checksum):
     """
     Checks the current status of an assembling task.
@@ -179,26 +190,42 @@ def get_assemble_status(task, scope, checksum):
     notice or error message.
     """
     cache_key = _get_cache_key(task, scope, checksum)
-    rv = default_cache.get(cache_key)
+
+    if options.get("assemble.read_from_redis"):
+        client = _get_redis_cluster_for_assemble()
+        rv = client.get(cache_key)
+
+        # It is stored as bytes with [state, detail] on Redis.
+        if rv:
+            rv = orjson.loads(rv)
+    else:
+        rv = default_cache.get(cache_key)
+
     if rv is None:
         return None, None
     return tuple(rv)
 
 
+@sentry_sdk.tracing.trace
 def set_assemble_status(task, scope, checksum, state, detail=None):
     """
     Updates the status of an assembling task. It is cached for 10 minutes.
     """
     cache_key = _get_cache_key(task, scope, checksum)
     default_cache.set(cache_key, (state, detail), 600)
+    redis_client = _get_redis_cluster_for_assemble()
+    redis_client.set(name=cache_key, value=orjson.dumps([state, detail]), ex=600)
 
 
+@sentry_sdk.tracing.trace
 def delete_assemble_status(task, scope, checksum):
     """
     Deletes the status of an assembling task.
     """
     cache_key = _get_cache_key(task, scope, checksum)
     default_cache.delete(cache_key)
+    redis_client = _get_redis_cluster_for_assemble()
+    redis_client.delete(cache_key)
 
 
 @instrumented_task(

+ 4 - 0
src/sentry/testutils/helpers/redis.py

@@ -12,6 +12,7 @@ def use_redis_cluster(
     cluster_id: str = "cluster",
     high_watermark: int = 100,
     with_settings: dict[str, Any] | None = None,
+    with_options: dict[str, Any] | None = None,
 ) -> Generator[None, None, None]:
     # Cluster id needs to be different than "default" to distinguish redis instance with redis cluster.
 
@@ -32,6 +33,9 @@ def use_redis_cluster(
         },
     }
 
+    if with_options:
+        options.update(with_options)
+
     settings = dict(with_settings or {})
     settings["SENTRY_PROCESSING_SERVICES"] = {"redis": {"redis": cluster_id}}
 

+ 25 - 0
tests/sentry/tasks/test_assemble.py

@@ -1,5 +1,6 @@
 import io
 import os
+import uuid
 from datetime import UTC, datetime, timedelta
 from hashlib import sha1
 from unittest import mock
@@ -28,10 +29,13 @@ from sentry.tasks.assemble import (
     assemble_artifacts,
     assemble_dif,
     assemble_file,
+    delete_assemble_status,
     get_assemble_status,
+    set_assemble_status,
 )
 from sentry.testutils.cases import TestCase
 from sentry.testutils.helpers.datetime import freeze_time
+from sentry.testutils.helpers.redis import use_redis_cluster
 
 
 class BaseAssembleTest(TestCase):
@@ -1047,3 +1051,24 @@ class ArtifactBundleIndexingTest(TestCase):
             organization_id=self.organization.id,
             artifact_bundles=[(artifact_bundle_1, mock.ANY)],
         )
+
+
+@use_redis_cluster(with_options={"assemble.read_from_redis": True})
+def test_redis_assemble_status():
+    task = AssembleTask.DIF
+    project_id = uuid.uuid4().hex
+    checksum = uuid.uuid4().hex
+
+    # If it doesn't exist, it should return correct values.
+    assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None)
+
+    # Test setter
+    set_assemble_status(task, project_id, checksum, ChunkFileState.CREATED, detail="cylons")
+    assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (
+        "created",
+        "cylons",
+    )
+
+    # Deleting should actually delete it.
+    delete_assemble_status(task, project_id, checksum=checksum)
+    assert get_assemble_status(task=task, scope=project_id, checksum=checksum) == (None, None)