Browse Source

ref(hc): Implement RPC mechanism for log service. (#51000)

1. Simplify all our hybrid cloud services by making them stateless and
removing `close()`
2.  Move the `hook` service into Rpc framework
3. Create additional service for log so that we have an RPC
implementation servicing the outbox logic.
Zach Collins 1 year ago
parent
commit
1570663a16

+ 3 - 6
src/sentry/receivers/outbox/region.py

@@ -22,8 +22,7 @@ from sentry.models import (
 from sentry.models.team import Team
 from sentry.receivers.outbox import maybe_process_tombstone
 from sentry.services.hybrid_cloud.identity import identity_service
-from sentry.services.hybrid_cloud.log import AuditLogEvent, UserIpEvent
-from sentry.services.hybrid_cloud.log.impl import DatabaseBackedLogService
+from sentry.services.hybrid_cloud.log import AuditLogEvent, UserIpEvent, log_rpc_service
 from sentry.services.hybrid_cloud.organization_mapping import organization_mapping_service
 from sentry.services.hybrid_cloud.organization_mapping.serial import (
     update_organization_mapping_from_instance,
@@ -38,16 +37,14 @@ from sentry.types.region import get_local_region
 
 @receiver(process_region_outbox, sender=OutboxCategory.AUDIT_LOG_EVENT)
 def process_audit_log_event(payload: Any, **kwds: Any):
-    # TODO: This will become explicit rpc
     if payload is not None:
-        DatabaseBackedLogService().record_audit_log(event=AuditLogEvent(**payload))
+        log_rpc_service.record_audit_log(event=AuditLogEvent(**payload))
 
 
 @receiver(process_region_outbox, sender=OutboxCategory.USER_IP_EVENT)
 def process_user_ip_event(payload: Any, **kwds: Any):
-    # TODO: This will become explicit rpc
     if payload is not None:
-        DatabaseBackedLogService().record_user_ip(event=UserIpEvent(**payload))
+        log_rpc_service.record_user_ip(event=UserIpEvent(**payload))
 
 
 def maybe_handle_joined_user(org_member: OrganizationMember) -> None:

+ 1 - 31
src/sentry/services/hybrid_cloud/__init__.py

@@ -6,7 +6,6 @@ import functools
 import inspect
 import logging
 import threading
-from abc import ABC, abstractmethod
 from typing import (
     Any,
     Callable,
@@ -14,7 +13,6 @@ from typing import (
     Generator,
     Generic,
     Iterable,
-    List,
     Mapping,
     Optional,
     Tuple,
@@ -43,12 +41,6 @@ REGION_NAME_LENGTH = 48
 DEFAULT_DATE = datetime.datetime(2000, 1, 1)
 
 
-class InterfaceWithLifecycle(ABC):
-    @abstractmethod
-    def close(self) -> None:
-        pass
-
-
 def report_pydantic_type_validation_error(
     field: pydantic.fields.ModelField,
     value: Any,
@@ -170,7 +162,7 @@ class RpcModel(pydantic.BaseModel):
         return cls(**fields)
 
 
-ServiceInterface = TypeVar("ServiceInterface", bound=InterfaceWithLifecycle)
+ServiceInterface = TypeVar("ServiceInterface")
 
 
 class DelegatedBySiloMode(Generic[ServiceInterface]):
@@ -204,7 +196,6 @@ class DelegatedBySiloMode(Generic[ServiceInterface]):
             yield
         finally:
             with self._lock:
-                self.close(silo_mode)
                 self._singleton[silo_mode] = prev
 
     def __getattr__(self, item: str) -> Any:
@@ -214,28 +205,11 @@ class DelegatedBySiloMode(Generic[ServiceInterface]):
             if impl := self._singleton.get(cur_mode, None):
                 return getattr(impl, item)
             if con := self._constructors.get(cur_mode, None):
-                self.close(cur_mode)
                 self._singleton[cur_mode] = inst = con()
                 return getattr(inst, item)
 
         raise KeyError(f"No implementation found for {cur_mode}.")
 
-    def close(self, mode: SiloMode | None = None) -> None:
-        to_close: List[ServiceInterface] = []
-        with self._lock:
-            if mode is None:
-                to_close.extend(s for s in self._singleton.values() if s is not None)
-                self._singleton = dict()
-            else:
-                existing = self._singleton.get(mode)
-                if existing:
-                    to_close.append(existing)
-                self._singleton = self._singleton.copy()
-                self._singleton[mode] = None
-
-        for service in to_close:
-            service.close()
-
 
 hc_test_stub: Any = threading.local()
 
@@ -256,9 +230,6 @@ def CreateStubFromBase(
     def __init__(self: Any, backing_service: ServiceInterface) -> None:
         self.backing_service = backing_service
 
-    def close(self: Any) -> None:
-        self.backing_service.close()
-
     def make_method(method_name: str) -> Any:
         def method(self: Any, *args: Any, **kwds: Any) -> Any:
             from sentry.services.hybrid_cloud.auth import AuthenticationContext
@@ -285,7 +256,6 @@ def CreateStubFromBase(
             if getattr(getattr(Super, name), "__isabstractmethod__", False):
                 methods[name] = make_method(name)
 
-    methods["close"] = close
     methods["__init__"] = __init__
 
     return cast(

+ 0 - 3
src/sentry/services/hybrid_cloud/app/impl.py

@@ -198,6 +198,3 @@ class DatabaseBackedAppService(AppService):
             return serialize_sentry_app(SentryApp.objects.get(application_id=api_application_id))
         except SentryApp.DoesNotExist:
             return None
-
-    def close(self) -> None:
-        pass

+ 0 - 3
src/sentry/services/hybrid_cloud/auth/impl.py

@@ -230,9 +230,6 @@ class DatabaseBackedAuthService(AuthService):
             permissions=permissions,
         )
 
-    def close(self) -> None:
-        pass
-
     def get_org_ids_with_scim(
         self,
     ) -> List[int]:

+ 1 - 4
src/sentry/services/hybrid_cloud/hook/impl.py

@@ -11,7 +11,7 @@ from sentry.services.hybrid_cloud.hook import HookService, RpcServiceHook
 from sentry.services.hybrid_cloud.hook.serial import serialize_service_hook
 
 
-class DatabaseBackedAppService(HookService):
+class DatabaseBackedHookService(HookService):
     def update_webhook_and_events(
         self,
         *,
@@ -60,6 +60,3 @@ class DatabaseBackedAppService(HookService):
                     hook.add_project(project_id)
 
             return serialize_service_hook(hook)
-
-    def close(self) -> None:
-        pass

+ 15 - 16
src/sentry/services/hybrid_cloud/hook/service.py

@@ -4,14 +4,24 @@
 # defined, because we want to reflect on type annotations and avoid forward references.
 
 import abc
-from typing import List, Optional
+from typing import List, Optional, cast
 
-from sentry.services.hybrid_cloud import InterfaceWithLifecycle, silo_mode_delegation, stubbed
 from sentry.services.hybrid_cloud.hook import RpcServiceHook
+from sentry.services.hybrid_cloud.rpc import RpcService, rpc_method
 from sentry.silo import SiloMode
 
 
-class HookService(InterfaceWithLifecycle):
+class HookService(RpcService):
+    key = "hook"
+    local_mode = SiloMode.REGION
+
+    @classmethod
+    def get_local_implementation(cls) -> RpcService:
+        from sentry.services.hybrid_cloud.hook.impl import DatabaseBackedHookService
+
+        return DatabaseBackedHookService()
+
+    @rpc_method
     @abc.abstractmethod
     def create_service_hook(
         self,
@@ -26,6 +36,7 @@ class HookService(InterfaceWithLifecycle):
     ) -> RpcServiceHook:
         pass
 
+    @rpc_method
     @abc.abstractmethod
     def update_webhook_and_events(
         self,
@@ -37,16 +48,4 @@ class HookService(InterfaceWithLifecycle):
         pass
 
 
-def impl_with_db() -> HookService:
-    from sentry.services.hybrid_cloud.hook.impl import DatabaseBackedAppService
-
-    return DatabaseBackedAppService()
-
-
-hook_service: HookService = silo_mode_delegation(
-    {
-        SiloMode.MONOLITH: impl_with_db,
-        SiloMode.REGION: impl_with_db,
-        SiloMode.CONTROL: stubbed(impl_with_db, SiloMode.REGION),
-    }
-)
+hook_service: HookService = cast(HookService, HookService.create_delegation())

+ 0 - 3
src/sentry/services/hybrid_cloud/identity/impl.py

@@ -21,9 +21,6 @@ from sentry.services.hybrid_cloud.identity.service import IdentityService
 
 
 class DatabaseBackedIdentityService(IdentityService):
-    def close(self) -> None:
-        pass
-
     def get_provider(
         self,
         *,

+ 0 - 3
src/sentry/services/hybrid_cloud/integration/impl.py

@@ -49,9 +49,6 @@ class DatabaseBackedIntegrationService(IntegrationService):
 
         return False
 
-    def close(self) -> None:
-        pass
-
     def page_integration_ids(
         self,
         *,

+ 2 - 72
src/sentry/services/hybrid_cloud/log/__init__.py

@@ -1,72 +1,2 @@
-# Please do not use
-#     from __future__ import annotations
-# in modules such as this one where hybrid cloud service classes and data models are
-# defined, because we want to reflect on type annotations and avoid forward references.
-
-import abc
-import datetime
-from typing import Any, Mapping, Optional
-
-from sentry.services.hybrid_cloud import (
-    DEFAULT_DATE,
-    InterfaceWithLifecycle,
-    RpcModel,
-    silo_mode_delegation,
-)
-from sentry.silo import SiloMode
-
-
-class UserIpEvent(RpcModel):
-    user_id: int = -1
-    ip_address: str = "127.0.0.1"
-    last_seen: datetime.datetime = DEFAULT_DATE
-    country_code: Optional[str] = None
-    region_code: Optional[str] = None
-
-
-class AuditLogEvent(RpcModel):
-    organization_id: int = -1
-    # 'datetime' is apparently reserved attribute name for dataclasses.
-    date_added: datetime.datetime = DEFAULT_DATE
-    event_id: int = -1
-    actor_label: str = ""
-    actor_user_id: Optional[int] = None
-    ip_address: Optional[str] = None
-    target_object_id: Optional[int] = None
-    data: Optional[Mapping[str, Any]] = None
-    target_user_id: Optional[int] = None
-
-
-class LogService(InterfaceWithLifecycle):
-    @abc.abstractmethod
-    def close(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def record_audit_log(self, *, event: AuditLogEvent) -> None:
-        pass
-
-    @abc.abstractmethod
-    def record_user_ip(self, *, event: UserIpEvent) -> None:
-        pass
-
-
-def impl_by_db() -> LogService:
-    from .impl import DatabaseBackedLogService
-
-    return DatabaseBackedLogService()
-
-
-def impl_by_outbox() -> LogService:
-    from .impl import OutboxBackedLogService
-
-    return OutboxBackedLogService()
-
-
-log_service = silo_mode_delegation(
-    {
-        SiloMode.REGION: impl_by_outbox,
-        SiloMode.CONTROL: impl_by_db,
-        SiloMode.MONOLITH: impl_by_db,
-    }
-)
+from .model import *  # noqa
+from .service import *  # noqa

+ 0 - 6
src/sentry/services/hybrid_cloud/log/impl.py

@@ -7,9 +7,6 @@ from sentry.utils import metrics
 
 
 class DatabaseBackedLogService(LogService):
-    def close(self) -> None:
-        pass
-
     def record_audit_log(self, *, event: AuditLogEvent) -> None:
         entry = AuditLogEntry.from_event(event)
         try:
@@ -55,9 +52,6 @@ class DatabaseBackedLogService(LogService):
 
 
 class OutboxBackedLogService(LogService):
-    def close(self) -> None:
-        pass
-
     def record_audit_log(self, *, event: AuditLogEvent) -> None:
         RegionOutbox(
             shard_scope=OutboxScope.AUDIT_LOG_SCOPE,

Some files were not shown because too many files changed in this diff