Просмотр исходного кода

feat(hc): Set up region resolution on RPC services (#46638)

Ryan Skonnord 1 год назад
Родитель
Сommit
859e76ead9

+ 5 - 2
src/sentry/models/commitauthor.py

@@ -1,10 +1,12 @@
-from typing import List
+from typing import TYPE_CHECKING, List
 
 from django.db import models
 
 from sentry.db.models import BoundedBigIntegerField, Model, region_silo_only_model, sane_repr
 from sentry.db.models.manager import BaseManager
-from sentry.services.hybrid_cloud.user import RpcUser, user_service
+
+if TYPE_CHECKING:
+    from sentry.services.hybrid_cloud.user import RpcUser
 
 
 class CommitAuthorManager(BaseManager):
@@ -37,6 +39,7 @@ class CommitAuthor(Model):
 
     def find_users(self) -> List["RpcUser"]:
         from sentry.models import OrganizationMember
+        from sentry.services.hybrid_cloud.user import user_service
 
         users = user_service.get_many_by_email(emails=[self.email])
         org_member_user_ids = set(

+ 2 - 2
src/sentry/models/team.py

@@ -19,18 +19,18 @@ from sentry.db.models import (
 from sentry.db.models.utils import slugify_instance
 from sentry.locks import locks
 from sentry.models.actor import Actor
-from sentry.services.hybrid_cloud.user import RpcUser
 from sentry.utils.retries import TimedRetryPolicy
 
 if TYPE_CHECKING:
     from sentry.models import Organization, Project, User
+    from sentry.services.hybrid_cloud.user import RpcUser
 
 
 class TeamManager(BaseManager):
     def get_for_user(
         self,
         organization: "Organization",
-        user: Union["User", RpcUser],
+        user: Union["User", "RpcUser"],
         scope: Optional[str] = None,
         with_projects: bool = False,
     ) -> Union[Sequence["Team"], Sequence[Tuple["Team", Sequence["Project"]]]]:

+ 19 - 12
src/sentry/services/hybrid_cloud/organization/__init__.py

@@ -12,7 +12,14 @@ from sentry.models.organization import OrganizationStatus
 from sentry.roles import team_roles
 from sentry.roles.manager import TeamRole
 from sentry.services.hybrid_cloud import RpcModel
-from sentry.services.hybrid_cloud.rpc import RpcService, rpc_method
+from sentry.services.hybrid_cloud.region import (
+    ByOrganizationId,
+    ByOrganizationIdAttribute,
+    ByOrganizationObject,
+    ByOrganizationSlug,
+    UnimplementedRegionResolution,
+)
+from sentry.services.hybrid_cloud.rpc import RpcService, regional_rpc_method
 from sentry.services.hybrid_cloud.user import RpcUser
 from sentry.silo import SiloMode
 
@@ -179,7 +186,7 @@ class OrganizationService(RpcService):
 
         return DatabaseBackedOrganizationService()
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationId("id"))
     @abstractmethod
     def get_organization_by_id(
         self, *, id: int, user_id: Optional[int] = None, slug: Optional[str] = None
@@ -193,7 +200,7 @@ class OrganizationService(RpcService):
 
     # TODO: This should return RpcOrganizationSummary objects, since we cannot realistically span out requests and
     #  capture full org objects / teams / permissions.  But we can gather basic summary data from the control silo.
-    @rpc_method
+    @regional_rpc_method(resolve=UnimplementedRegionResolution())
     @abstractmethod
     def get_organizations(
         self,
@@ -217,7 +224,7 @@ class OrganizationService(RpcService):
         """
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationId())
     @abstractmethod
     def check_membership_by_email(
         self, *, organization_id: int, email: str
@@ -227,7 +234,7 @@ class OrganizationService(RpcService):
         """
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationId())
     @abstractmethod
     def check_membership_by_id(
         self, *, organization_id: int, user_id: int
@@ -237,7 +244,7 @@ class OrganizationService(RpcService):
         """
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationSlug())
     @abstractmethod
     def check_organization_by_slug(self, *, slug: str, only_visible: bool) -> Optional[int]:
         """
@@ -257,7 +264,7 @@ class OrganizationService(RpcService):
 
         return self.get_organization_by_id(id=org_id, user_id=user_id)
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationObject())
     @abstractmethod
     def add_organization_member(
         self,
@@ -269,22 +276,22 @@ class OrganizationService(RpcService):
     ) -> RpcOrganizationMember:
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationIdAttribute("organization_member"))
     @abstractmethod
     def add_team_member(self, *, team_id: int, organization_member: RpcOrganizationMember) -> None:
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=UnimplementedRegionResolution())
     @abstractmethod
     def get_team_members(self, *, team_id: int) -> Iterable[RpcOrganizationMember]:
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationIdAttribute("organization_member"))
     @abstractmethod
     def update_membership_flags(self, *, organization_member: RpcOrganizationMember) -> None:
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationIdAttribute("organization_member"))
     @abstractmethod
     def get_all_org_roles(
         self,
@@ -294,7 +301,7 @@ class OrganizationService(RpcService):
     ) -> List[str]:
         pass
 
-    @rpc_method
+    @regional_rpc_method(resolve=ByOrganizationId())
     @abstractmethod
     def get_top_dog_team_member_ids(self, *, organization_id: int) -> List[int]:
         pass

+ 3 - 2
src/sentry/services/hybrid_cloud/project_key/__init__.py

@@ -8,7 +8,8 @@ from enum import Enum
 from typing import Any, Optional, cast
 
 from sentry.services.hybrid_cloud import RpcModel
-from sentry.services.hybrid_cloud.rpc import RpcService, rpc_method
+from sentry.services.hybrid_cloud.region import UnimplementedRegionResolution
+from sentry.services.hybrid_cloud.rpc import RpcService, regional_rpc_method
 from sentry.silo import SiloMode
 
 
@@ -41,7 +42,7 @@ class ProjectKeyService(RpcService):
 
         return DatabaseBackedProjectKeyService()
 
-    @rpc_method
+    @regional_rpc_method(resolve=UnimplementedRegionResolution())
     @abstractmethod
     def get_project_key(self, *, project_id: str, role: ProjectKeyRole) -> Optional[RpcProjectKey]:
         pass

+ 95 - 0
src/sentry/services/hybrid_cloud/region.py

@@ -0,0 +1,95 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from typing import TYPE_CHECKING
+
+from sentry.db.models import BaseManager
+from sentry.services.hybrid_cloud import ArgumentDict
+from sentry.services.hybrid_cloud.rpc import RpcServiceUnimplementedException
+from sentry.types.region import Region, get_region_by_name
+
+if TYPE_CHECKING:
+    from sentry.models import OrganizationMapping
+
+
+class RegionResolution(ABC):
+    """Interface for directing a service call to a remote region."""
+
+    @abstractmethod
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        """Return the region determined by a service call's arguments."""
+        raise NotImplementedError
+
+    @staticmethod
+    def _resolve_from_mapping(mapping: OrganizationMapping) -> Region:
+        return get_region_by_name(mapping.region_name)
+
+    @property
+    def organization_mapping_manager(self) -> BaseManager[OrganizationMapping]:
+        from sentry.models import OrganizationMapping
+
+        # Convenience method to avoid repeating the local import
+        return OrganizationMapping.objects
+
+
+@dataclass(frozen=True)
+class ByOrganizationObject(RegionResolution):
+    """Resolve from a parameter representing an organization object."""
+
+    parameter_name: str = "organization"
+
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        value = arguments[self.parameter_name]
+        mapping = self.organization_mapping_manager.get(organization_id=value.id)
+        return self._resolve_from_mapping(mapping)
+
+
+@dataclass(frozen=True)
+class ByOrganizationId(RegionResolution):
+    """Resolve from an `int` parameter representing an organization ID."""
+
+    parameter_name: str = "organization_id"
+
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        organization_id = arguments[self.parameter_name]
+        mapping = self.organization_mapping_manager.get(id=organization_id)
+        return self._resolve_from_mapping(mapping)
+
+
+@dataclass(frozen=True)
+class ByOrganizationSlug(RegionResolution):
+    """Resolve from a `str` parameter representing an organization slug."""
+
+    parameter_name: str = "slug"
+
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        slug = arguments[self.parameter_name]
+        mapping = self.organization_mapping_manager.get(slug=slug)
+        return self._resolve_from_mapping(mapping)
+
+
+@dataclass(frozen=True)
+class ByOrganizationIdAttribute(RegionResolution):
+    """Resolve from an object with an organization ID as one of its attributes."""
+
+    parameter_name: str
+    attribute_name: str = "organization_id"
+
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        argument = arguments[self.parameter_name]
+        organization_id = getattr(argument, self.attribute_name)
+        mapping = self.organization_mapping_manager.get(id=organization_id)
+        return self._resolve_from_mapping(mapping)
+
+
+class UnimplementedRegionResolution(RegionResolution):
+    """Indicate that a method's region resolution logic has not been implemented yet.
+
+    A remote call to the method will be interrupted and will default to the
+    monolithic fallback implementation. See the RpcServiceUnimplementedException
+    documentation for details.
+    """
+
+    def resolve(self, arguments: ArgumentDict) -> Region:
+        raise RpcServiceUnimplementedException("Need to resolve to remote region silo")

+ 39 - 3
src/sentry/services/hybrid_cloud/rpc.py

@@ -3,7 +3,7 @@ from __future__ import annotations
 import inspect
 import logging
 from abc import abstractmethod
-from typing import Any, Callable, Dict, Iterator, Mapping, Tuple, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Mapping, Tuple, Type, TypeVar, cast
 
 import pydantic
 
@@ -16,11 +16,15 @@ from sentry.services.hybrid_cloud import (
 from sentry.silo import SiloMode
 from sentry.types.region import Region
 
+if TYPE_CHECKING:
+    from sentry.services.hybrid_cloud.region import RegionResolution
+
 logger = logging.getLogger(__name__)
 
 _T = TypeVar("_T")
 
 _IS_RPC_METHOD_ATTR = "__is_rpc_method"
+_REGION_RESOLUTION_ATTR = "__region_resolution"
 
 
 class RpcServiceSetupException(Exception):
@@ -50,6 +54,7 @@ class RpcMethodSignature:
         self._base_service_cls = base_service_cls
         self._base_method = base_method
         self._model = self._create_pydantic_model()
+        self._region_resolution = self._extract_region_resolution()
 
     @property
     def service_name(self) -> str:
@@ -73,6 +78,24 @@ class RpcMethodSignature:
         field_definitions = {p.name: create_field(p) for p in parameters}
         return pydantic.create_model(name, **field_definitions)  # type: ignore
 
+    def _extract_region_resolution(self) -> RegionResolution | None:
+        region_resolution = getattr(self._base_method, _REGION_RESOLUTION_ATTR, None)
+
+        is_region_service = self._base_service_cls.local_mode == SiloMode.REGION
+        if not is_region_service and region_resolution is not None:
+            raise RpcServiceSetupException(
+                "@regional_rpc_method should be used only on a service with "
+                "`local_mode = SiloMode.REGION`"
+                f" ({self.service_name} is {self._base_service_cls.local_mode})"
+            )
+        if is_region_service and region_resolution is None:
+            # Use RpcServiceUnimplementedException as a placeholder if needed
+            raise RpcServiceSetupException(
+                f"Method {self.service_name}.{self.method_name} needs @regional_rpc_method"
+            )
+
+        return region_resolution
+
     def serialize_arguments(self, raw_arguments: ArgumentDict) -> ArgumentDict:
         model_instance = self._model(**raw_arguments)
         return model_instance.dict()
@@ -81,10 +104,13 @@ class RpcMethodSignature:
         return self._model.parse_obj(serial_arguments)
 
     def resolve_to_region(self, arguments: ArgumentDict) -> Region:
-        if self._base_service_cls.local_mode != SiloMode.REGION:
+        if self._region_resolution is None:
             raise RpcServiceSetupException(f"{self.service_name} does not run on the region silo")
 
-        raise RpcServiceUnimplementedException("Need to resolve region")  # TODO
+        try:
+            return self._region_resolution.resolve(arguments)
+        except Exception as e:
+            raise RpcServiceUnimplementedException("Error while resolving region") from e
 
 
 class DelegatingRpcService(DelegatedBySiloMode["RpcService"]):
@@ -121,6 +147,16 @@ def rpc_method(method: Callable[..., _T]) -> Callable[..., _T]:
     return method
 
 
+def regional_rpc_method(
+    resolve: RegionResolution,
+) -> Callable[[Callable[..., _T]], Callable[..., _T]]:
+    def decorator(method: Callable[..., _T]) -> Callable[..., _T]:
+        setattr(method, _REGION_RESOLUTION_ATTR, resolve)
+        return rpc_method(method)
+
+    return decorator
+
+
 _global_service_registry: Dict[str, DelegatingRpcService] = {}
 
 

+ 32 - 4
tests/sentry/hybrid_cloud/test_rpc.py

@@ -2,6 +2,7 @@ from unittest import mock
 
 from django.test import override_settings
 
+from sentry.models import OrganizationMapping
 from sentry.services.hybrid_cloud.actor import RpcActor
 from sentry.services.hybrid_cloud.organization import (
     OrganizationService,
@@ -12,19 +13,33 @@ from sentry.services.hybrid_cloud.rpc import dispatch_to_local_service
 from sentry.services.hybrid_cloud.user import RpcUser
 from sentry.silo import SiloMode
 from sentry.testutils import TestCase
+from sentry.testutils.region import override_regions
+from sentry.types.region import Region, RegionCategory
 
 
 class RpcServiceTest(TestCase):
-    def test_remote_service(self):
+    @mock.patch("sentry.services.hybrid_cloud.rpc.dispatch_remote_call")
+    def test_remote_service(self, mock_dispatch_remote_call):
+        regions = [
+            Region("north_america", 1, "na.sentry.io", RegionCategory.MULTI_TENANT),
+            Region("europe", 2, "eu.sentry.io", RegionCategory.MULTI_TENANT),
+        ]
+        target_region = regions[0]
+
         user = self.create_user()
         organization = self.create_organization()
+        OrganizationMapping.objects.create(
+            organization_id=organization.id,
+            slug=organization.slug,
+            name=organization.name,
+            region_name=target_region.name,
+        )
 
         serial_user = RpcUser(id=user.id)
         serial_org = DatabaseBackedOrganizationService.serialize_organization(organization)
 
-        with override_settings(SILO_MODE=SiloMode.CONTROL):
-            service = OrganizationService.create_delegation()
-
+        service = OrganizationService.create_delegation()
+        with override_regions(regions), override_settings(SILO_MODE=SiloMode.CONTROL):
             service.add_organization_member(
                 organization=serial_org,
                 user=serial_user,
@@ -32,6 +47,19 @@ class RpcServiceTest(TestCase):
                 role=None,
             )
 
+        assert mock_dispatch_remote_call.called
+        (
+            region,
+            service_name,
+            method_name,
+            serial_arguments,
+        ) = mock_dispatch_remote_call.call_args.args
+        assert region == target_region
+        assert service_name == OrganizationService.key
+        assert method_name == "add_organization_member"
+        assert serial_arguments.keys() == {"flags", "organization", "role", "user"}
+        assert serial_arguments["organization"]["id"] == organization.id
+
     @mock.patch("sentry.services.hybrid_cloud.report_pydantic_type_validation_error")
     def test_models_tolerate_invalid_types(self, mock_report):
         # Create an RpcModel instance whose fields don't obey type annotations and