Browse Source

feat(hybridcloud) Add shared secret and signatures to RPC requests (#52842)

When doing RPC requests we should have authentication on requests. I've
gone with a simple request signature HMAC verfication as it felt like
the simplest solution that was tamper & forgery resistant. Previously we
had planned on using mTLS as authentication between regions. Setting up
a cerfiticate authority and managing certificates is more scope than we
presently have capacity for.

I'm anticipating us needing to do key rotation, or signature upgrades in
the future and have accounted for both in the current design.

Refs HC-730
Mark Story 1 year ago
parent
commit
2678010f17

+ 25 - 1
src/sentry/api/authentication.py

@@ -1,4 +1,4 @@
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
 
 from django.conf import settings
 from django.contrib.auth.models import AnonymousUser
@@ -13,6 +13,7 @@ from sentry import options
 from sentry.auth.system import SystemToken, is_internal_ip
 from sentry.models import ApiApplication, ApiKey, ApiToken, OrgAuthToken, ProjectKey, Relay
 from sentry.relay.utils import get_header_relay_id, get_header_relay_signature
+from sentry.services.hybrid_cloud.rpc import compare_signature
 from sentry.utils.sdk import configure_scope
 from sentry.utils.security.orgauthtoken_token import SENTRY_ORG_AUTH_TOKEN_PREFIX, hash_token
 
@@ -282,3 +283,26 @@ class DSNAuthentication(StandardAuthentication):
             scope.set_tag("api_project_key", key.id)
 
         return (AnonymousUser(), key)
+
+
+class RpcSignatureAuthentication(StandardAuthentication):
+    """
+    Authentication for cross-region RPC requests.
+    Requests are sent with an HMAC signed by a shared private key.
+    """
+
+    token_name = b"rpcsignature"
+
+    def accepts_auth(self, auth: List[bytes]) -> bool:
+        if not auth or len(auth) < 2:
+            return False
+        return auth[0].lower() == self.token_name
+
+    def authenticate_credentials(self, request: Request, token: str):
+        if not compare_signature(request.path_info, request.body, token):
+            raise AuthenticationFailed("Invalid signature")
+
+        with configure_scope() as scope:
+            scope.set_tag("rpc_auth", True)
+
+        return (AnonymousUser(), token)

+ 7 - 14
src/sentry/api/endpoints/rpc.py

@@ -2,33 +2,26 @@ from rest_framework.exceptions import NotFound, ParseError, PermissionDenied, Va
 from rest_framework.request import Request
 from rest_framework.response import Response
 
+from sentry.api.authentication import RpcSignatureAuthentication
 from sentry.api.base import Endpoint, all_silo_endpoint
 from sentry.services.hybrid_cloud.rpc import (
     RpcArgumentException,
     RpcResolutionException,
-    RpcSenderCredentials,
     dispatch_to_local_service,
 )
 
 
 @all_silo_endpoint
 class RpcServiceEndpoint(Endpoint):
+    authentication_classes = (RpcSignatureAuthentication,)
     permission_classes = ()
 
     def _is_authorized(self, request: Request) -> bool:
-        """Check whether the remote procedure call is authorized.
-
-        We currently know that RPC authorization is going to look a bit different
-        from any user-based auth. The authority to make arbitrary RPCs surpasses
-        anything in our current system of permission scopes; it's basically "system
-        access" only.
-
-        As a placeholder, use a global system flag (to be set only in dev
-        environments) that allows access if set, and disables it entirely otherwise.
-
-        TODO: Real solution
-        """
-        return RpcSenderCredentials.read_from_settings().is_allowed
+        if request.auth and isinstance(
+            request.successful_authenticator, RpcSignatureAuthentication
+        ):
+            return True
+        return False
 
     def post(self, request: Request, service_name: str, method_name: str) -> Response:
         if not self._is_authorized(request):

+ 12 - 10
src/sentry/conf/server.py

@@ -17,7 +17,6 @@ from urllib.parse import urlparse
 import sentry
 from sentry.conf.types.consumer_definition import ConsumerDefinition
 from sentry.conf.types.topic_definition import TopicDefinition
-from sentry.utils import json
 from sentry.utils.celery import crontab_with_minute_jitter
 from sentry.utils.types import type_from_value
 
@@ -630,12 +629,18 @@ USE_SILOS = os.environ.get("SENTRY_USE_SILOS", None)
 # that is parsed.
 SENTRY_REGION_CONFIG: Any = tuple()
 
+# Shared secret used to sign cross-region RPC requests.
+RPC_SHARED_SECRET = None
+
+# The protocol, host and port for control silo
+SENTRY_CONTROL_ADDRESS = ""
+
 # Fallback region name for monolith deployments
 SENTRY_MONOLITH_REGION: str = "--monolith--"
 
 # Control silo address (public or private).
 # Usecases include sending requests to the Integration Proxy Endpoint.
-SENTRY_CONTROL_ADDRESS = os.environ.get("SENTRY_CONTROL_ADDRESS", None)
+SENTRY_CONTROL_ADDRESS = os.environ.get("SENTRY_CONTROL_ADDRESS", "")
 
 # The key used for generating or verifying the HMAC signature for Integration Proxy Endpoint requests.
 SENTRY_SUBNET_SECRET = os.environ.get("SENTRY_SUBNET_SECRET", None)
@@ -3451,7 +3456,6 @@ SENTRY_FUNCTIONS_REGION = "us-central1"
 
 # Settings related to SiloMode
 FAIL_ON_UNAVAILABLE_API_CALL = False
-DEV_HYBRID_CLOUD_RPC_SENDER = os.environ.get("SENTRY_DEV_HYBRID_CLOUD_RPC_SENDER", None)
 
 DISALLOWED_CUSTOMER_DOMAINS: list[str] = []
 
@@ -3484,14 +3488,12 @@ if USE_SILOS:
             "api_token": "dev-region-silo-token",
         }
     ]
+    # RPC authentication and address information
+    RPC_SHARED_SECRET = [
+        "a-long-value-that-is-shared-but-also-secret",
+    ]
     control_port = os.environ.get("SENTRY_CONTROL_SILO_PORT", "8000")
-    DEV_HYBRID_CLOUD_RPC_SENDER = json.dumps(
-        {
-            "is_allowed": True,
-            "control_silo_api_token": "dev-control-silo-token",
-            "control_silo_address": f"http://127.0.0.1:{control_port}",
-        }
-    )
+    SENTRY_CONTROL_ADDRESS = f"http://127.0.0.1:{control_port}"
 
 
 # How long we should wait for a gateway proxy request to return before giving up

+ 71 - 25
src/sentry/services/hybrid_cloud/rpc.py

@@ -1,17 +1,18 @@
 from __future__ import annotations
 
 import abc
+import hashlib
+import hmac
 import inspect
 import logging
-import urllib.response
 from abc import abstractmethod
 from collections.abc import Iterable
 from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Mapping, Tuple, Type, TypeVar, cast
-from urllib.request import Request, urlopen
 
 import django.urls
 import pydantic
+import requests
 import sentry_sdk
 from django.conf import settings
 
@@ -469,20 +470,13 @@ def dispatch_remote_call(
 ) -> Any:
     service, _ = _look_up_service_method(service_name, method_name)
 
-    creds = RpcSenderCredentials.read_from_settings()
-    if not creds.is_allowed:
-        raise RpcSendException("RPC calls are not globally enabled")
-
     if region is None:
-        address = creds.control_silo_address
-        api_token = creds.control_silo_api_token
-        if not (address and api_token):
-            raise RpcSendException("Not configured to remotely access control silo")
+        address = settings.SENTRY_CONTROL_ADDRESS
     else:
         address = region.address
-        api_token = region.api_token
-        if not (address and api_token):
-            raise RpcSendException(f"Not configured to remotely access region: {region.name}")
+
+    if not (address and settings.RPC_SHARED_SECRET):
+        raise RpcSendException("Not configured for RPC network requests")
 
     path = django.urls.reverse(
         "sentry-api-0-rpc-service",
@@ -501,12 +495,13 @@ def dispatch_remote_call(
     span = sentry_sdk.start_span(
         op="hybrid_cloud.dispatch_rpc", description=f"rpc to {service_name}.{method_name}"
     )
-    with span, timer, _fire_request(url, request_body, api_token) as response:
-        charset = response.headers.get_content_charset() or _RPC_CONTENT_CHARSET
-        metrics.incr("hybrid_cloud.dispatch_rpc.response_code", tags={"status": response.status})
-        response_body = response.read().decode(charset)
+    with span, timer:
+        response = _fire_request(url, path, request_body)
+        metrics.incr(
+            "hybrid_cloud.dispatch_rpc.response_code", tags={"status": response.status_code}
+        )
 
-    serial_response = json.loads(response_body)
+    serial_response = response.json()
     return_value = serial_response["value"]
     return (
         None
@@ -515,16 +510,67 @@ def dispatch_remote_call(
     )
 
 
-def _fire_request(url: str, body: Any, api_token: str) -> urllib.response.addinfourl:
+def _fire_request(url: str, path: str, body: Any) -> requests.Response:
     # TODO: Performance considerations (persistent connections, pooling, etc.)?
     data = json.dumps(body).encode(_RPC_CONTENT_CHARSET)
 
-    request = Request(url)
-    request.add_header("Content-Type", f"application/json; charset={_RPC_CONTENT_CHARSET}")
-    request.add_header("Content-Length", str(len(data)))
-    # TODO(hybridcloud) Re-enable this when we've implemented RPC authentication
-    # request.add_header("Authorization", f"Bearer {api_token}")
-    return urlopen(request, data)
+    signature = generate_request_signature(path, data)
+    headers = {
+        "Content-Type": f"application/json; charset={_RPC_CONTENT_CHARSET}",
+        "Authorization": f"Rpcsignature {signature}",
+    }
+    return requests.post(url, headers=headers, data=data)
+
+
+def compare_signature(url: str, body: bytes, signature: str) -> bool:
+    """
+    Compare request data + signature signed by one of the shared secrets.
+
+    Once a key has been able to validate the signature other keys will
+    not be attempted. We should only have multiple keys during key rotations.
+    """
+    if not settings.RPC_SHARED_SECRET:
+        raise RpcServiceSetupException(
+            "Cannot validate RPC request signatures without RPC_SHARED_SECRET"
+        )
+
+    if not signature.startswith("rpc0:"):
+        return False
+
+    # We aren't using the version bits currently, but might use them in the future.
+    _, signature_data = signature.split(":", 2)
+    signature_input = b"%s:%s" % (
+        url.encode("utf8"),
+        body,
+    )
+
+    for key in settings.RPC_SHARED_SECRET:
+        computed = hmac.new(key.encode("utf-8"), signature_input, hashlib.sha256).hexdigest()
+
+        is_valid = hmac.compare_digest(computed.encode("utf-8"), signature_data.encode("utf-8"))
+        if is_valid:
+            return True
+
+    return False
+
+
+def generate_request_signature(url_path: str, body: bytes) -> str:
+    """
+    Generate a signature for the request body
+    with the first shared secret. If there are other
+    shared secrets in the list they are only to be used
+    by control silo for verfication during key rotation.
+    """
+    if not settings.RPC_SHARED_SECRET:
+        raise RpcServiceSetupException("Cannot sign RPC requests without RPC_SHARED_SECRET")
+
+    signature_input = b"%s:%s" % (
+        url_path.encode("utf8"),
+        body,
+    )
+    secret = settings.RPC_SHARED_SECRET[0]
+    signature = hmac.new(secret.encode("utf-8"), signature_input, hashlib.sha256).hexdigest()
+    return f"rpc0:{signature}"
 
 
 @dataclass(frozen=True)

+ 50 - 26
tests/sentry/api/endpoints/test_rpc.py

@@ -1,3 +1,7 @@
+from __future__ import annotations
+
+from typing import Any, Dict
+
 from django.test import override_settings
 from django.urls import reverse
 from rest_framework.exceptions import ErrorDetail
@@ -9,10 +13,13 @@ from sentry.notifications.types import (
     NotificationSettingTypes,
 )
 from sentry.services.hybrid_cloud.organization import RpcUserOrganizationContext
+from sentry.services.hybrid_cloud.rpc import generate_request_signature
 from sentry.testutils import APITestCase
 from sentry.types.integrations import ExternalProviders
+from sentry.utils import json
 
 
+@override_settings(RPC_SHARED_SECRET=["a-long-value-that-is-hard-to-guess"])
 class RpcServiceEndpointTest(APITestCase):
     def setUp(self) -> None:
         super().setUp()
@@ -25,64 +32,77 @@ class RpcServiceEndpointTest(APITestCase):
             kwargs={"service_name": service_name, "method_name": method_name},
         )
 
+    def auth_header(self, path: str, data: dict | str) -> str:
+        if isinstance(data, dict):
+            data = json.dumps(data)
+        signature = generate_request_signature(path, data.encode("utf8"))
+
+        return f"rpcsignature {signature}"
+
     def test_auth(self):
         path = self._get_path("not_a_service", "not_a_method")
         response = self.client.post(path)
         assert response.status_code == 403
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_bad_service_name(self):
         path = self._get_path("not_a_service", "not_a_method")
-        response = self.client.post(path)
+        response = self.client.post(path, HTTP_AUTHORIZATION=self.auth_header(path, ""))
         assert response.status_code == 404
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_bad_method_name(self):
         path = self._get_path("user", "not_a_method")
-        response = self.client.post(path)
+        response = self.client.post(path, HTTP_AUTHORIZATION=self.auth_header(path, ""))
         assert response.status_code == 404
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_no_arguments(self):
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path)
+        response = self.client.post(path, HTTP_AUTHORIZATION=self.auth_header(path, ""))
         assert response.status_code == 400
         assert response.data == {
             "detail": ErrorDetail(string="Malformed request.", code="parse_error")
         }
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_missing_argument_key(self):
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path, {})
+        data: Dict[str, Any] = {}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
         assert response.status_code == 400
         assert response.data == {
             "detail": ErrorDetail(string="Malformed request.", code="parse_error")
         }
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_missing_argument_values(self):
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path, {"args": {}})
+        data: Dict[str, Any] = {"args": {}}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
         assert response.status_code == 400
         assert response.data == {
             "detail": ErrorDetail(string="Malformed request.", code="parse_error")
         }
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_with_empty_response(self):
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path, {"args": {"id": 0}})
+        data = {"args": {"id": 0}}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
+
         assert response.status_code == 200
         assert "meta" in response.data
         assert response.data["value"] is None
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_with_object_response(self):
         organization = self.create_organization()
 
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path, {"args": {"id": organization.id}})
+        data = {"args": {"id": organization.id}}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
         assert response.status_code == 200
         assert response.data
         assert "meta" in response.data
@@ -92,20 +112,24 @@ class RpcServiceEndpointTest(APITestCase):
         assert response_obj.organization.slug == organization.slug
         assert response_obj.organization.name == organization.name
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_with_invalid_arguments(self):
         path = self._get_path("organization", "get_organization_by_id")
-        response = self.client.post(path, {"args": {"id": "invalid type"}})
+        data = {"args": {"id": "invalid type"}}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
         assert response.status_code == 400
         assert response.data == [ErrorDetail(string="Invalid input.", code="invalid")]
 
-        response = self.client.post(path, {"args": {"invalid": "invalid type"}})
+        data = {"args": {"invalid": "invalid type"}}
+        response = self.client.post(
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
+        )
         assert response.status_code == 400
         assert response.data == {
             "detail": ErrorDetail(string="Malformed request.", code="parse_error")
         }
 
-    @override_settings(DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_with_enum_serialization(self):
         path = self._get_path("notifications", "get_settings_for_user_by_projects")
         NotificationSetting.objects.update_settings(
@@ -114,15 +138,15 @@ class RpcServiceEndpointTest(APITestCase):
             NotificationSettingOptionValues.ALWAYS,
             user_id=self.user.id,
         )
+        data = {
+            "args": {
+                "type": 20,
+                "user_id": self.user.id,
+                "parent_ids": [self.project.id],
+            }
+        }
         response = self.client.post(
-            path,
-            {
-                "args": {
-                    "type": 20,
-                    "user_id": self.user.id,
-                    "parent_ids": [self.project.id],
-                }
-            },
+            path, data=data, HTTP_AUTHORIZATION=self.auth_header(path, data)
         )
         assert response.status_code == 200
         response_body = response.json()

+ 67 - 0
tests/sentry/api/test_authentication.py

@@ -5,6 +5,7 @@ import pytest
 from django.http import HttpRequest
 from django.test import RequestFactory, override_settings
 from rest_framework.exceptions import AuthenticationFailed
+from rest_framework.request import Request
 from sentry_relay import generate_key_pair
 
 from sentry.api.authentication import (
@@ -12,11 +13,13 @@ from sentry.api.authentication import (
     DSNAuthentication,
     OrgAuthTokenAuthentication,
     RelayAuthentication,
+    RpcSignatureAuthentication,
     TokenAuthentication,
 )
 from sentry.models import ProjectKeyStatus, Relay
 from sentry.models.apitoken import ApiToken
 from sentry.models.orgauthtoken import OrgAuthToken
+from sentry.services.hybrid_cloud.rpc import RpcServiceSetupException, generate_request_signature
 from sentry.testutils import TestCase
 from sentry.testutils.silo import control_silo_test
 from sentry.utils.pytest.fixtures import django_db_all
@@ -242,3 +245,67 @@ def test_statically_configured_relay(settings, internal):
     assert relay.public_key == str(pk)
     # data should be deserialized in request.relay_request_data
     assert request.relay_request_data == data
+
+
+@override_settings(RPC_SHARED_SECRET=["a-long-secret-key"])
+@control_silo_test(stable=True)
+class TestRpcSignatureAuthentication(TestCase):
+    def setUp(self):
+        super().setUp()
+
+        self.auth = RpcSignatureAuthentication()
+        self.org = self.create_organization(owner=self.user)
+
+    def test_authenticate_success(self):
+        data = b'{"meta":{},"args":{"id":1}'
+        request = RequestFactory().post("/", data=data, content_type="application/json")
+        request = Request(request=request)
+
+        signature = generate_request_signature(request.path_info, request.body)
+        request.META["HTTP_AUTHORIZATION"] = f"rpcsignature {signature}"
+
+        user, token = self.auth.authenticate(request)
+        assert user.is_anonymous
+        assert token == signature
+
+    def test_authenticate_old_key_validates(self):
+        request = RequestFactory().post("/", data="", content_type="application/json")
+        with override_settings(RPC_SHARED_SECRET=["an-old-key"]):
+            signature = generate_request_signature(request.path_info, request.body)
+            request.META["HTTP_AUTHORIZATION"] = f"rpcsignature {signature}"
+
+        request = Request(request=request)
+
+        # Update settings so that we have a new key
+        with override_settings(RPC_SHARED_SECRET=["a-long-secret-key", "an-old-key"]):
+            user, token = self.auth.authenticate(request)
+
+        assert user.is_anonymous
+        assert token == signature
+
+    def test_authenticate_without_signature(self):
+        request = RequestFactory().post("/", data="", content_type="application/json")
+        request.META["HTTP_AUTHORIZATION"] = "Bearer abcdef"
+
+        request = Request(request=request)
+
+        assert self.auth.authenticate(request) is None
+
+    def test_authenticate_invalid_signature(self):
+        request = RequestFactory().post("/", data="", content_type="application/json")
+        request.META["HTTP_AUTHORIZATION"] = "rpcsignature abcdef"
+
+        request = Request(request=request)
+
+        with pytest.raises(AuthenticationFailed):
+            self.auth.authenticate(request)
+
+    def test_authenticate_no_shared_secret(self):
+        request = RequestFactory().post("/", data="", content_type="application/json")
+        request.META["HTTP_AUTHORIZATION"] = "rpcsignature abcdef"
+
+        request = Request(request=request)
+
+        with override_settings(RPC_SHARED_SECRET=None):
+            with pytest.raises(RpcServiceSetupException):
+                self.auth.authenticate(request)

+ 47 - 36
tests/sentry/hybrid_cloud/test_rpc.py

@@ -1,8 +1,10 @@
-from typing import cast
+from __future__ import annotations
+
+from typing import Any, cast
 from unittest import mock
-from unittest.mock import MagicMock
 
 import pytest
+import responses
 from django.db import router
 from django.test import override_settings
 
@@ -131,76 +133,85 @@ class RpcServiceTest(TestCase):
             assert result[0]["organization_id"] == organization.id
 
 
+control_address = "https://control.example.com"
+shared_secret = ["a-long-token-you-could-not-guess"]
+
+
 class DispatchRemoteCallTest(TestCase):
     def test_while_not_allowed(self):
         with pytest.raises(RpcSendException):
             dispatch_remote_call(None, "user", "get_user", {"id": 0})
 
-    _REGION_SILO_CREDS = {
-        "is_allowed": True,
-        "control_silo_api_token": "letmein",
-        "control_silo_address": "http://localhost",
-    }
-
     @staticmethod
-    def _set_up_mock_response(mock_urlopen, response_value):
-        charset = "utf-8"
-        response_body = {"meta": {}, "value": response_value}
-        serial_response = json.dumps(response_body).encode(charset)
-
-        mock_response = MagicMock()
-        mock_response.headers.get_content_charset.return_value = charset
-        mock_response.read.return_value = serial_response
-        mock_urlopen.return_value.__enter__.return_value = mock_response
-
-    @mock.patch("sentry.services.hybrid_cloud.rpc.urlopen")
-    def test_region_to_control_happy_path(self, mock_urlopen):
+    def _set_up_mock_response(service_name: str, response_value: Any, address: str | None = None):
+        address = address or control_address
+        responses.add(
+            responses.POST,
+            f"{address}/api/0/internal/rpc/{service_name}/",
+            content_type="json",
+            body=json.dumps({"meta": {}, "value": response_value}),
+        )
+
+    @responses.activate
+    def test_region_to_control_happy_path(self):
         org = self.create_organization()
 
         with override_settings(
-            SILO_MODE=SiloMode.REGION,
-            DEV_HYBRID_CLOUD_RPC_SENDER=self._REGION_SILO_CREDS,
+            RPC_SHARED_SECRET=shared_secret, SENTRY_CONTROL_ADDRESS=control_address
         ):
             response_value = RpcUserOrganizationContext(
                 organization=serialize_rpc_organization(org)
             )
-            self._set_up_mock_response(mock_urlopen, response_value.dict())
+            self._set_up_mock_response("organization/get_organization_by_id", response_value.dict())
 
             result = dispatch_remote_call(
                 None, "organization", "get_organization_by_id", {"id": org.id}
             )
             assert result == response_value
 
-    @override_settings(SILO_MODE=SiloMode.REGION, DEV_HYBRID_CLOUD_RPC_SENDER=_REGION_SILO_CREDS)
-    @mock.patch("sentry.services.hybrid_cloud.rpc.urlopen")
-    def test_region_to_control_null_result(self, mock_urlopen):
-        self._set_up_mock_response(mock_urlopen, None)
+    @responses.activate
+    @override_settings(
+        SILO_MODE=SiloMode.REGION,
+        RPC_SHARED_SECRET=shared_secret,
+        SENTRY_CONTROL_ADDRESS=control_address,
+    )
+    def test_region_to_control_null_result(self):
+        self._set_up_mock_response("organization/get_organization_by_id", None)
 
         result = dispatch_remote_call(None, "organization", "get_organization_by_id", {"id": 0})
         assert result is None
 
+    @responses.activate
     @override_regions(_REGIONS)
-    @override_settings(SILO_MODE=SiloMode.CONTROL, DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
-    @mock.patch("sentry.services.hybrid_cloud.rpc.urlopen")
-    def test_control_to_region_happy_path(self, mock_urlopen):
+    @override_settings(
+        SILO_MODE=SiloMode.CONTROL,
+        RPC_SHARED_SECRET=shared_secret,
+        SENTRY_CONTROL_ADDRESS=control_address,
+    )
+    def test_control_to_region_happy_path(self):
         user = self.create_user()
         serial = serialize_rpc_user(user)
-        self._set_up_mock_response(mock_urlopen, serial.dict())
+        self._set_up_mock_response("user/get_user", serial.dict(), address="http://na.sentry.io")
 
         result = dispatch_remote_call(_REGIONS[0], "user", "get_user", {"id": 0})
         assert result == serial
 
+    @responses.activate
     @override_regions(_REGIONS)
-    @override_settings(SILO_MODE=SiloMode.CONTROL, DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
-    @mock.patch("sentry.services.hybrid_cloud.rpc.urlopen")
-    def test_control_to_region_with_list_result(self, mock_urlopen):
+    @override_settings(
+        SILO_MODE=SiloMode.CONTROL,
+        RPC_SHARED_SECRET=shared_secret,
+        SENTRY_CONTROL_ADDRESS=control_address,
+    )
+    def test_region_to_control_with_list_result(self):
         users = [self.create_user() for _ in range(3)]
         serial = [serialize_rpc_user(user) for user in users]
-        self._set_up_mock_response(mock_urlopen, [m.dict() for m in serial])
+        self._set_up_mock_response("user/get_many", [m.dict() for m in serial])
 
-        result = dispatch_remote_call(_REGIONS[0], "user", "get_many", {"filter": {}})
+        result = dispatch_remote_call(None, "user", "get_many", {"filter": {}})
         assert result == serial
 
+    @responses.activate
     @override_regions(_REGIONS)
     @override_settings(SILO_MODE=SiloMode.CONTROL, DEV_HYBRID_CLOUD_RPC_SENDER={"is_allowed": True})
     def test_early_halt_from_null_region_resolution(self):