|
@@ -5,6 +5,7 @@ import pytest
|
|
|
|
|
|
from sentry.coreapi import APIUnauthorized
|
|
|
from sentry.mediators.sentry_app_installations.installation_notifier import InstallationNotifier
|
|
|
+from sentry.services.hybrid_cloud.user.service import user_service
|
|
|
from sentry.testutils.cases import TestCase
|
|
|
from sentry.testutils.silo import control_silo_test
|
|
|
from sentry.utils import json
|
|
@@ -39,10 +40,11 @@ class TestInstallationNotifier(TestCase):
|
|
|
user=self.user,
|
|
|
prevent_token_exchange=True,
|
|
|
)
|
|
|
+ self.rpc_user = user_service.get_user(user_id=self.user.id)
|
|
|
|
|
|
@patch("sentry.utils.sentry_apps.webhooks.safe_urlopen", return_value=MockResponseInstance)
|
|
|
def test_task_enqueued(self, safe_urlopen):
|
|
|
- InstallationNotifier.run(install=self.install, user=self.user, action="created")
|
|
|
+ InstallationNotifier.run(install=self.install, user=self.rpc_user, action="created")
|
|
|
|
|
|
((args, kwargs),) = safe_urlopen.call_args_list
|
|
|
|
|
@@ -71,7 +73,7 @@ class TestInstallationNotifier(TestCase):
|
|
|
|
|
|
@patch("sentry.utils.sentry_apps.webhooks.safe_urlopen", return_value=MockResponseInstance)
|
|
|
def test_uninstallation_enqueued(self, safe_urlopen):
|
|
|
- InstallationNotifier.run(install=self.install, user=self.user, action="deleted")
|
|
|
+ InstallationNotifier.run(install=self.install, user=self.rpc_user, action="deleted")
|
|
|
|
|
|
((args, kwargs),) = safe_urlopen.call_args_list
|
|
|
|
|
@@ -101,14 +103,14 @@ class TestInstallationNotifier(TestCase):
|
|
|
@patch("sentry.utils.sentry_apps.webhooks.safe_urlopen")
|
|
|
def test_invalid_installation_action(self, safe_urlopen):
|
|
|
with pytest.raises(APIUnauthorized):
|
|
|
- InstallationNotifier.run(install=self.install, user=self.user, action="updated")
|
|
|
+ InstallationNotifier.run(install=self.install, user=self.rpc_user, action="updated")
|
|
|
|
|
|
assert not safe_urlopen.called
|
|
|
|
|
|
@patch("sentry.utils.sentry_apps.webhooks.safe_urlopen", return_value=MockResponseInstance)
|
|
|
def test_webhook_request_saved(self, safe_urlopen):
|
|
|
- InstallationNotifier.run(install=self.install, user=self.user, action="created")
|
|
|
- InstallationNotifier.run(install=self.install, user=self.user, action="deleted")
|
|
|
+ InstallationNotifier.run(install=self.install, user=self.rpc_user, action="created")
|
|
|
+ InstallationNotifier.run(install=self.install, user=self.rpc_user, action="deleted")
|
|
|
|
|
|
buffer = SentryAppWebhookRequestsBuffer(self.sentry_app)
|
|
|
requests = buffer.get_requests()
|