Browse Source

feat(projectconfig): Compress redis value (#36926)

With transaction metrics and dynamic sampling config, the size of the
project config values written to redis become a problem at scale.
Applying zstandard compression should mitigate this problem.
Joris Bayer 2 years ago
parent
commit
b24a97c6fd

+ 30 - 1
src/sentry/relay/projectconfig_cache/redis.py

@@ -1,8 +1,28 @@
+import logging
+import random
+
+import zstandard
+
+from sentry import options
 from sentry.relay.projectconfig_cache.base import ProjectConfigCache
 from sentry.relay.projectconfig_cache.base import ProjectConfigCache
 from sentry.utils import json, metrics, redis
 from sentry.utils import json, metrics, redis
 from sentry.utils.redis import validate_dynamic_cluster
 from sentry.utils.redis import validate_dynamic_cluster
 
 
 REDIS_CACHE_TIMEOUT = 3600  # 1 hr
 REDIS_CACHE_TIMEOUT = 3600  # 1 hr
+COMPRESSION_OPTION_DSNS = "relay.project-config-cache-compress"
+COMPRESSION_OPTION_SAMPLE_RATE = "relay.project-config-cache-compress-sample-rate"
+COMPRESSION_LEVEL = 3  # 3 is the default level of compression
+
+logger = logging.getLogger(__name__)
+
+
+def _use_compression(public_key: str) -> bool:
+    enabled_dsns = options.get(COMPRESSION_OPTION_DSNS)
+    if public_key in enabled_dsns:
+        return True
+
+    # Apply sample rate:
+    return random.random() < options.get(COMPRESSION_OPTION_SAMPLE_RATE)
 
 
 
 
 class RedisProjectConfigCache(ProjectConfigCache):
 class RedisProjectConfigCache(ProjectConfigCache):
@@ -24,7 +44,11 @@ class RedisProjectConfigCache(ProjectConfigCache):
         # Note: Those are multiple pipelines, one per cluster node
         # Note: Those are multiple pipelines, one per cluster node
         p = self.cluster.pipeline()
         p = self.cluster.pipeline()
         for public_key, config in configs.items():
         for public_key, config in configs.items():
-            p.setex(self.__get_redis_key(public_key), REDIS_CACHE_TIMEOUT, json.dumps(config))
+            value = json.dumps(config)
+            if _use_compression(public_key):
+                value = zstandard.compress(value.encode(), level=COMPRESSION_LEVEL)
+            metrics.timing("relay.projectconfig_cache.size", len(value))
+            p.setex(self.__get_redis_key(public_key), REDIS_CACHE_TIMEOUT, value)
 
 
         p.execute()
         p.execute()
 
 
@@ -42,5 +66,10 @@ class RedisProjectConfigCache(ProjectConfigCache):
     def get(self, public_key):
     def get(self, public_key):
         rv = self.cluster.get(self.__get_redis_key(public_key))
         rv = self.cluster.get(self.__get_redis_key(public_key))
         if rv is not None:
         if rv is not None:
+            try:
+                rv = zstandard.decompress(rv).decode()
+            except (TypeError, zstandard.ZstdError):
+                # assume raw json
+                pass
             return json.loads(rv)
             return json.loads(rv)
         return None
         return None

+ 7 - 0
src/sentry/utils/pytest/relay.py

@@ -13,6 +13,7 @@ import pytest
 import requests
 import requests
 
 
 from sentry.runner.commands.devservices import get_docker_client
 from sentry.runner.commands.devservices import get_docker_client
+from sentry.utils.pytest.sentry import TEST_REDIS_DB
 
 
 _log = logging.getLogger(__name__)
 _log = logging.getLogger(__name__)
 
 
@@ -78,11 +79,17 @@ def relay_server_setup(live_server, tmpdir_factory):
     # NOTE: if we ever need to start the test relay server at various ports here's where we need to change
     # NOTE: if we ever need to start the test relay server at various ports here's where we need to change
     relay_port = 33331
     relay_port = 33331
 
 
+    redis_db = TEST_REDIS_DB
+    from sentry.relay import projectconfig_cache
+
+    assert redis_db == projectconfig_cache.backend.cluster.connection_pool.connection_kwargs["db"]
+
     template_vars = {
     template_vars = {
         "SENTRY_HOST": upstream_host,
         "SENTRY_HOST": upstream_host,
         "RELAY_PORT": relay_port,
         "RELAY_PORT": relay_port,
         "KAFKA_HOST": kafka_host,
         "KAFKA_HOST": kafka_host,
         "REDIS_HOST": redis_host,
         "REDIS_HOST": redis_host,
+        "REDIS_DB": redis_db,
     }
     }
 
 
     for source in sources:
     for source in sources:

+ 3 - 1
src/sentry/utils/pytest/sentry.py

@@ -20,6 +20,8 @@ TEST_ROOT = os.path.normpath(
     os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, os.pardir, "tests")
     os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, os.pardir, "tests")
 )
 )
 
 
+TEST_REDIS_DB = 9
+
 
 
 def pytest_configure(config):
 def pytest_configure(config):
     import warnings
     import warnings
@@ -141,7 +143,7 @@ def pytest_configure(config):
 
 
     settings.SENTRY_OPTIONS.update(
     settings.SENTRY_OPTIONS.update(
         {
         {
-            "redis.clusters": {"default": {"hosts": {0: {"db": 9}}}},
+            "redis.clusters": {"default": {"hosts": {0: {"db": TEST_REDIS_DB}}}},
             "mail.backend": "django.core.mail.backends.locmem.EmailBackend",
             "mail.backend": "django.core.mail.backends.locmem.EmailBackend",
             "system.url-prefix": "http://testserver",
             "system.url-prefix": "http://testserver",
             "system.base-hostname": "testserver",
             "system.base-hostname": "testserver",

+ 2 - 2
src/sentry/utils/pytest/template/config.yml

@@ -10,10 +10,10 @@ logging:
 processing:
 processing:
   enabled: true
   enabled: true
   kafka_config:
   kafka_config:
-    - {name: "bootstrap.servers", value: "${KAFKA_HOST}:9093"}
+    - {name: 'bootstrap.servers', value: '${KAFKA_HOST}:9093'}
   topics:
   topics:
     events: ingest-events
     events: ingest-events
     attachments: ingest-events
     attachments: ingest-events
     transactions: ingest-events
     transactions: ingest-events
     outcomes: outcomes
     outcomes: outcomes
-  redis: redis://${REDIS_HOST}:6379
+  redis: redis://${REDIS_HOST}:6379/${REDIS_DB}

+ 17 - 0
tests/relay_integration/test_integration.py

@@ -1,13 +1,16 @@
 from io import BytesIO
 from io import BytesIO
+from unittest import mock
 from uuid import uuid4
 from uuid import uuid4
 
 
 import pytest
 import pytest
 
 
 from sentry.models.eventattachment import EventAttachment
 from sentry.models.eventattachment import EventAttachment
 from sentry.spans.grouping.utils import hash_values
 from sentry.spans.grouping.utils import hash_values
+from sentry.tasks.relay import invalidate_project_config
 from sentry.testutils import RelayStoreHelper, TransactionTestCase
 from sentry.testutils import RelayStoreHelper, TransactionTestCase
 from sentry.testutils.helpers import Feature
 from sentry.testutils.helpers import Feature
 from sentry.testutils.helpers.datetime import before_now, iso_format, timestamp_format
 from sentry.testutils.helpers.datetime import before_now, iso_format, timestamp_format
+from sentry.testutils.helpers.options import override_options
 
 
 
 
 class SentryRemoteTest(RelayStoreHelper, TransactionTestCase):
 class SentryRemoteTest(RelayStoreHelper, TransactionTestCase):
@@ -216,3 +219,17 @@ class SentryRemoteTest(RelayStoreHelper, TransactionTestCase):
                     "total.time": {"unit": "millisecond", "value": pytest.approx(1050, abs=2)},
                     "total.time": {"unit": "millisecond", "value": pytest.approx(1050, abs=2)},
                 }
                 }
             }
             }
+
+    def test_project_config_compression(self):
+        # Populate redis cache with compressed config:
+        with override_options({"relay.project-config-cache-compress-sample-rate": 1.0}):
+            invalidate_project_config(public_key=self.projectkey, trigger="test")
+
+        # Disable project config endpoint, to make sure Relay gets its data
+        # from redis:
+        with mock.patch(
+            "sentry.api.endpoints.relay.project_configs.RelayProjectConfigsEndpoint.post"
+        ):
+            event_data = {"message": "hello", "timestamp": iso_format(before_now(seconds=1))}
+            event = self.post_and_retrieve_event(event_data)
+            assert event.message == "hello"

+ 27 - 0
tests/sentry/relay/test_projectconfig_cache.py

@@ -1,6 +1,9 @@
 from unittest import mock
 from unittest import mock
 
 
+import pytest
+
 from sentry.relay.projectconfig_cache import redis
 from sentry.relay.projectconfig_cache import redis
+from sentry.testutils.helpers import override_options
 
 
 
 
 def test_delete_count(monkeypatch):
 def test_delete_count(monkeypatch):
@@ -14,3 +17,27 @@ def test_delete_count(monkeypatch):
     assert incr_mock.call_args == mock.call(
     assert incr_mock.call_args == mock.call(
         "relay.projectconfig_cache.write", amount=1, tags={"action": "delete"}
         "relay.projectconfig_cache.write", amount=1, tags={"action": "delete"}
     )
     )
+
+
+@pytest.mark.django_db
+@pytest.mark.parametrize(
+    "enabled_dsns, sample_rate, should_use_compression",
+    [
+        (["fake-dsn-1", "fake-dsn-2"], 0.0, True),
+        (["fake-dsn-1", "fake-dsn-2"], 1.0, True),
+        ([], 1.0, True),
+        ([], 0.0, False),
+    ],
+)
+def test_read_write(enabled_dsns, sample_rate, should_use_compression):
+    cache = redis.RedisProjectConfigCache()
+    my_key = "fake-dsn-1"
+    with override_options(
+        {
+            "relay.project-config-cache-compress": enabled_dsns,
+            "relay.project-config-cache-compress-sample-rate": sample_rate,
+        }
+    ):
+        assert redis._use_compression(my_key) == should_use_compression
+        cache.set_many({my_key: "my-value"})
+        assert cache.get(my_key) == "my-value"