Browse Source

fix(relay): Defer cache invalidation up until after DB transaction (#35523)

Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
Markus Unterwaditzer 2 years ago
parent
commit
81780cfc3c

+ 1 - 1
requirements-dev.txt

@@ -6,7 +6,7 @@ mypy>=0.800,<0.900
 openapi-core==0.14.2
 pytest==6.1.0
 pytest-cov==2.11.1
-pytest-django==3.10.0
+pytest-django==4.4.0
 pytest-sentry==0.1.9
 pytest-rerunfailures==9.1.1
 responses==0.10.12

+ 11 - 3
src/sentry/models/options/organization_option.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, Mapping, Sequence
 
-from django.db import models
+from django.db import models, transaction
 
 from sentry.db.models import FlexibleForeignKey, Model, sane_repr
 from sentry.db.models.fields import EncryptedPickledObjectField
@@ -63,8 +63,16 @@ class OrganizationOptionManager(OptionManager["Organization"]):
 
     def reload_cache(self, organization_id: int, update_reason: str) -> Mapping[str, Value]:
         if update_reason != "organizationoption.get_all_values":
-            schedule_invalidate_project_config(
-                organization_id=organization_id, trigger=update_reason
+            # this hook may be called from model hooks during an
+            # open transaction. In that case, wait until the current transaction has
+            # been committed or rolled back to ensure we don't read stale data in the
+            # task.
+            #
+            # If there is no transaction open, on_commit should run immediately.
+            transaction.on_commit(
+                lambda: schedule_invalidate_project_config(
+                    organization_id=organization_id, trigger=update_reason
+                )
             )
 
         cache_key = self._make_key(organization_id)

+ 12 - 2
src/sentry/models/options/project_option.py

@@ -2,7 +2,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Any, Mapping, Sequence
 
-from django.db import models
+from django.db import models, transaction
 
 from sentry import projectoptions
 from sentry.db.models import FlexibleForeignKey, Model, sane_repr
@@ -73,7 +73,17 @@ class ProjectOptionManager(OptionManager["Project"]):
 
     def reload_cache(self, project_id: int, update_reason: str) -> Mapping[str, Value]:
         if update_reason != "projectoption.get_all_values":
-            schedule_invalidate_project_config(project_id=project_id, trigger=update_reason)
+            # this hook may be called from model hooks during an
+            # open transaction. In that case, wait until the current transaction has
+            # been committed or rolled back to ensure we don't read stale data in the
+            # task.
+            #
+            # If there is no transaction open, on_commit should run immediately.
+            transaction.on_commit(
+                lambda: schedule_invalidate_project_config(
+                    project_id=project_id, trigger=update_reason
+                )
+            )
         cache_key = self._make_key(project_id)
         result = {i.key: i.value for i in self.filter(project=project_id)}
         cache.set(cache_key, result)

+ 21 - 5
src/sentry/models/projectkey.py

@@ -4,7 +4,7 @@ from uuid import uuid4
 
 import petname
 from django.conf import settings
-from django.db import models
+from django.db import models, transaction
 from django.urls import reverse
 from django.utils import timezone
 from django.utils.translation import ugettext_lazy as _
@@ -33,13 +33,29 @@ class ProjectKeyStatus:
 
 class ProjectKeyManager(BaseManager):
     def post_save(self, instance, **kwargs):
-        schedule_invalidate_project_config(
-            public_key=instance.public_key, trigger="projectkey.post_save"
+        # this hook may be called from model hooks during an
+        # open transaction. In that case, wait until the current transaction has
+        # been committed or rolled back to ensure we don't read stale data in the
+        # task.
+        #
+        # If there is no transaction open, on_commit should run immediately.
+        transaction.on_commit(
+            lambda: schedule_invalidate_project_config(
+                public_key=instance.public_key, trigger="projectkey.post_save"
+            )
         )
 
     def post_delete(self, instance, **kwargs):
-        schedule_invalidate_project_config(
-            public_key=instance.public_key, trigger="projectkey.post_delete"
+        # this hook may be called from model hooks during an
+        # open transaction. In that case, wait until the current transaction has
+        # been committed or rolled back to ensure we don't read stale data in the
+        # task.
+        #
+        # If there is no transaction open, on_commit should run immediately.
+        transaction.on_commit(
+            lambda: schedule_invalidate_project_config(
+                public_key=instance.public_key, trigger="projectkey.post_delete"
+            )
         )
 
 

+ 7 - 16
src/sentry/testutils/cases.py

@@ -156,24 +156,15 @@ class BaseTestCase(Fixtures, Exam):
     def tasks(self):
         return TaskRunner()
 
-    @classmethod
-    @contextmanager
-    def capture_on_commit_callbacks(cls, using=DEFAULT_DB_ALIAS, execute=False):
+    @pytest.fixture(autouse=True)
+    def polyfill_capture_on_commit_callbacks(self, django_capture_on_commit_callbacks):
         """
-        Context manager to capture transaction.on_commit() callbacks.
-        Backported from Django:
-        https://github.com/django/django/pull/12944
+        https://pytest-django.readthedocs.io/en/latest/helpers.html#django_capture_on_commit_callbacks
+
+        pytest-django comes with its own polyfill of this Django helper for
+        older Django versions, so we're using that.
         """
-        callbacks = []
-        start_count = len(connections[using].run_on_commit)
-        try:
-            yield callbacks
-        finally:
-            run_on_commit = connections[using].run_on_commit[start_count:]
-            callbacks[:] = [func for sids, func in run_on_commit]
-            if execute:
-                for callback in callbacks:
-                    callback()
+        self.capture_on_commit_callbacks = django_capture_on_commit_callbacks
 
     def feature(self, names):
         """

+ 2 - 0
src/sentry/testutils/helpers/task_runner.py

@@ -44,5 +44,7 @@ def BurstTaskRunner():
         if queue:
             raise RuntimeError("Could not empty queue, last task items: %s" % repr(queue))
 
+    work.queue = queue
+
     with patch("celery.app.task.Task.apply_async", apply_async):
         yield work

+ 95 - 17
tests/sentry/tasks/test_relay.py

@@ -1,6 +1,8 @@
+import contextlib
 from unittest.mock import patch
 
 import pytest
+from django.db import transaction
 
 from sentry.models import Project, ProjectKey, ProjectKeyStatus, ProjectOption
 from sentry.relay.projectconfig_cache.redis import RedisProjectConfigCache
@@ -27,6 +29,45 @@ def _cache_keys_for_org(org):
             yield key.public_key
 
 
+@pytest.fixture
+def emulate_transactions(burst_task_runner, django_capture_on_commit_callbacks):
+    # This contraption helps in testing the usage of `transaction.on_commit` in
+    # schedule_update_config_cache. Normally tests involving transactions would
+    # require us to use the transactional testcase (or
+    # `pytest.mark.django_db(transaction=True)`), but that incurs a 2x slowdown
+    # in test speed and we're trying to keep our testcases fast.
+    @contextlib.contextmanager
+    def inner(assert_num_callbacks=1):
+        with burst_task_runner() as burst:
+            with django_capture_on_commit_callbacks(execute=True) as callbacks:
+                yield
+
+                # Assert there are no relay-related jobs in the queue yet, as we should have
+                # some on_commit callbacks instead. If we don't, then the model
+                # hook has scheduled the update_config_cache task prematurely.
+                #
+                # Remove any other jobs from the queue that may have been triggered via model hooks
+                assert not any("relay" in task.__name__ for task, _, _ in burst.queue)
+                burst.queue.clear()
+
+            # for some reason, the callbacks array is only populated by
+            # pytest-django's implementation after the context manager has
+            # exited, not while they are being registered
+            assert len(callbacks) == assert_num_callbacks
+
+        # Callbacks have been executed, job(s) should've been scheduled now, so
+        # let's execute them.
+        #
+        # Note: We can't directly assert that the data race has not occured, as
+        # there are no real DB transactions available in this testcase. The
+        # entire test runs in one transaction because that's how pytest-django
+        # sets up things unless one uses
+        # pytest.mark.django_db(transaction=True).
+        burst(max_jobs=10)
+
+    return inner
+
+
 @pytest.fixture
 def redis_cache(monkeypatch):
     monkeypatch.setattr(
@@ -97,13 +138,11 @@ def test_generate(
     default_project,
     default_organization,
     default_projectkey,
-    task_runner,
     redis_cache,
 ):
     assert not redis_cache.get(default_projectkey.public_key)
 
-    with task_runner():
-        build_project_config(default_projectkey.public_key)
+    build_project_config(default_projectkey.public_key)
 
     cfg = redis_cache.get(default_projectkey.public_key)
 
@@ -120,8 +159,11 @@ def test_generate(
 
 
 @pytest.mark.django_db
-def test_project_update_option(default_projectkey, default_project, task_runner, redis_cache):
-    with task_runner():
+def test_project_update_option(
+    default_projectkey, default_project, emulate_transactions, redis_cache
+):
+    # XXX: there should only be one hook triggered, regardless of debouncing
+    with emulate_transactions(assert_num_callbacks=4):
         default_project.update_option(
             "sentry:relay_pii_config", '{"applications": {"$string": ["@creditcard:mask"]}}'
         )
@@ -130,7 +172,8 @@ def test_project_update_option(default_projectkey, default_project, task_runner,
         "applications": {"$string": ["@creditcard:mask"]}
     }
 
-    with task_runner():
+    # XXX: there should only be one hook triggered, regardless of debouncing
+    with emulate_transactions(assert_num_callbacks=2):
         default_project.organization.update_option(
             "sentry:relay_pii_config", '{"applications": {"$string": ["@creditcard:mask"]}}'
         )
@@ -140,8 +183,9 @@ def test_project_update_option(default_projectkey, default_project, task_runner,
 
 
 @pytest.mark.django_db
-def test_project_delete_option(default_project, task_runner, redis_cache):
-    with task_runner():
+def test_project_delete_option(default_project, emulate_transactions, redis_cache):
+    # XXX: there should only be one hook triggered, regardless of debouncing
+    with emulate_transactions(assert_num_callbacks=3):
         default_project.delete_option("sentry:relay_pii_config")
 
     for cache_key in _cache_keys_for_project(default_project):
@@ -149,9 +193,9 @@ def test_project_delete_option(default_project, task_runner, redis_cache):
 
 
 @pytest.mark.django_db
-def test_project_get_option_does_not_reload(default_project, task_runner, monkeypatch):
+def test_project_get_option_does_not_reload(default_project, emulate_transactions, monkeypatch):
     ProjectOption.objects._option_cache.clear()
-    with task_runner():
+    with emulate_transactions(assert_num_callbacks=0):
         with patch("sentry.utils.cache.cache.get", return_value=None):
             with patch("sentry.tasks.relay.schedule_build_project_config") as build_project_config:
                 default_project.get_option(
@@ -162,7 +206,7 @@ def test_project_get_option_does_not_reload(default_project, task_runner, monkey
 
 
 @pytest.mark.django_db
-def test_invalidation_project_deleted(default_project, task_runner, redis_cache):
+def test_invalidation_project_deleted(default_project, emulate_transactions, redis_cache):
     # Ensure we have a ProjectKey
     project_key = next(_cache_keys_for_project(default_project))
     assert project_key
@@ -174,7 +218,7 @@ def test_invalidation_project_deleted(default_project, task_runner, redis_cache)
     project_id = default_project.id
 
     # Delete the project normally, this will delete it from the cache
-    with task_runner():
+    with emulate_transactions(assert_num_callbacks=4):
         default_project.delete()
     assert redis_cache.get(project_key) is None
 
@@ -184,10 +228,12 @@ def test_invalidation_project_deleted(default_project, task_runner, redis_cache)
 
 
 @pytest.mark.django_db
-def test_projectkeys(default_project, task_runner, redis_cache):
+def test_projectkeys(default_project, emulate_transactions, redis_cache):
     # When a projectkey is deleted the invalidation task should be triggered and the project
     # should be cached as disabled.
-    with task_runner():
+
+    # XXX: there should only be one hook triggered, regardless of debouncing
+    with emulate_transactions(assert_num_callbacks=2):
         deleted_pks = list(ProjectKey.objects.filter(project=default_project))
         for key in deleted_pks:
             key.delete()
@@ -201,13 +247,13 @@ def test_projectkeys(default_project, task_runner, redis_cache):
     (pk_json,) = redis_cache.get(pk.public_key)["publicKeys"]
     assert pk_json["publicKey"] == pk.public_key
 
-    with task_runner():
+    with emulate_transactions():
         pk.status = ProjectKeyStatus.INACTIVE
         pk.save()
 
     assert redis_cache.get(pk.public_key)["disabled"]
 
-    with task_runner():
+    with emulate_transactions():
         pk.delete()
 
     assert redis_cache.get(pk.public_key) is None
@@ -216,6 +262,38 @@ def test_projectkeys(default_project, task_runner, redis_cache):
         assert not redis_cache.get(key.public_key)
 
 
+@pytest.mark.django_db(transaction=True)
+def test_db_transaction(default_project, default_projectkey, redis_cache, task_runner):
+    with task_runner(), transaction.atomic():
+        default_project.update_option(
+            "sentry:relay_pii_config", '{"applications": {"$string": ["@creditcard:mask"]}}'
+        )
+
+        # Assert that cache entry hasn't been created yet, only after the
+        # transaction has committed.
+        assert not redis_cache.get(default_projectkey.public_key)
+
+    assert redis_cache.get(default_projectkey.public_key)["config"]["piiConfig"] == {
+        "applications": {"$string": ["@creditcard:mask"]}
+    }
+
+    try:
+        with task_runner(), transaction.atomic():
+            default_project.update_option(
+                "sentry:relay_pii_config", '{"applications": {"$string": ["@password:mask"]}}'
+            )
+
+            raise Exception("rollback!")
+
+    except Exception:
+        pass
+
+    # Assert that database rollback is honored
+    assert redis_cache.get(default_projectkey.public_key)["config"]["piiConfig"] == {
+        "applications": {"$string": ["@creditcard:mask"]}
+    }
+
+
 class TestInvalidationTask:
     @pytest.fixture
     def debounce_cache(self, monkeypatch):
@@ -312,8 +390,8 @@ class TestInvalidationTask:
         default_project,
         default_organization,
         default_projectkey,
-        task_runner,
         redis_cache,
+        task_runner,
     ):
         # Currently for org-wide we delete the config instead of computing it.
         cfg = {"dummy-key": "val"}