Browse Source

feat(backup): Enable import over RPC (#56689)

feat(backup): Enable imports over RPC

This finishes out the work started in #57740 and enables imports over
RPC as well. Like that PR, imports are already done on a sequential, per
model basis, so this change just consists of moving every such call
across an RPC boundary.

Closes getsentry/team-ospo#185
Closes getsentry/team-ospo#196
Closes getsentry/team-ospo#202
Alex Zaslavsky 1 year ago
parent
commit
57686ae075

+ 0 - 32
fixtures/backup/model_dependencies/detailed.json

@@ -5851,38 +5851,6 @@
     "table_name": "sentry_userrole_users",
     "uniques": []
   },
-  "sessions.session": {
-    "dangling": false,
-    "foreign_keys": {},
-    "model": "sessions.session",
-    "relocation_dependencies": [],
-    "relocation_scope": "Excluded",
-    "silos": [
-      "Monolith"
-    ],
-    "table_name": "django_session",
-    "uniques": [
-      [
-        "session_key"
-      ]
-    ]
-  },
-  "sites.site": {
-    "dangling": false,
-    "foreign_keys": {},
-    "model": "sites.site",
-    "relocation_dependencies": [],
-    "relocation_scope": "Excluded",
-    "silos": [
-      "Monolith"
-    ],
-    "table_name": "django_site",
-    "uniques": [
-      [
-        "domain"
-      ]
-    ]
-  },
   "social_auth.usersocialauth": {
     "dangling": false,
     "foreign_keys": {

+ 0 - 2
fixtures/backup/model_dependencies/flat.json

@@ -807,8 +807,6 @@
     "sentry.user",
     "sentry.userrole"
   ],
-  "sessions.session": [],
-  "sites.site": [],
   "social_auth.usersocialauth": [
     "sentry.user"
   ]

+ 0 - 2
fixtures/backup/model_dependencies/sorted.json

@@ -49,8 +49,6 @@
   "sentry.userpermission",
   "sentry.userrole",
   "sentry.userroleuser",
-  "sessions.session",
-  "sites.site",
   "social_auth.usersocialauth",
   "sentry.savedsearch",
   "sentry.release",

+ 0 - 2
fixtures/backup/model_dependencies/truncate.json

@@ -49,8 +49,6 @@
   "sentry_userpermission",
   "sentry_userrole",
   "sentry_userrole_users",
-  "django_session",
-  "django_site",
   "social_auth_usersocialauth",
   "sentry_savedsearch",
   "sentry_release",

+ 0 - 1
pyproject.toml

@@ -604,7 +604,6 @@ module = [
     "sentry.services.smtp",
     "sentry.shared_integrations.client.base",
     "sentry.shared_integrations.client.proxy",
-    "sentry.silo.base",
     "sentry.similarity.backends.dummy",
     "sentry.similarity.features",
     "sentry.snuba.discover",

+ 5 - 0
src/sentry/backup/dependencies.py

@@ -379,6 +379,11 @@ def dependencies() -> dict[NormalizedModelName, ModelRelations]:
         model_iterator = app_config.get_models()
 
         for model in model_iterator:
+            # Ignore some native Django models, since other models don't reference them and we don't
+            # really use them for business logic.
+            if model._meta.app_label in {"sessions", "sites"}:
+                continue
+
             foreign_keys: dict[str, ForeignField] = dict()
             uniques: set[frozenset[str]] = {
                 frozenset(combo) for combo in model._meta.unique_together

+ 2 - 8
src/sentry/backup/exports.py

@@ -1,10 +1,9 @@
 from __future__ import annotations
 
 import io
-from typing import BinaryIO, Type
+from typing import BinaryIO
 
 import click
-from django.db.models.base import Model
 
 from sentry.backup.dependencies import (
     PrimaryKeyMap,
@@ -93,11 +92,6 @@ def _export(
         else:
             raise ValueError("Filter arguments must only apply to `Organization` or `User` models")
 
-    def get_exporter_for_model(model: Type[Model]):
-        if SiloMode.CONTROL in model._meta.silo_limit.modes:  # type: ignore
-            return import_export_service.export_by_model
-        return ImportExportService.get_local_implementation().export_by_model  # type: ignore
-
     # TODO(getsentry/team-ospo#190): Another optimization opportunity to use a generator with ijson # to print the JSON objects in a streaming manner.
     for model in sorted_dependencies():
         from sentry.db.models.base import BaseModel
@@ -116,7 +110,7 @@ def _export(
             continue
 
         dep_models = {get_model_name(d) for d in model_relations.get_dependencies_for_relocation()}
-        export_by_model = get_exporter_for_model(model)
+        export_by_model = ImportExportService.get_exporter_for_model(model)
         result = export_by_model(
             model_name=str(model_name),
             scope=RpcExportScope.into_rpc(scope),

+ 61 - 82
src/sentry/backup/imports.py

@@ -3,20 +3,34 @@ from __future__ import annotations
 from typing import BinaryIO, Iterator, Optional, Tuple, Type
 
 import click
-from django.conf import settings
 from django.core import serializers
-from django.core.exceptions import ValidationError as DjangoValidationError
-from django.db import IntegrityError, connections, router, transaction
+from django.db import transaction
 from django.db.models.base import Model
-from rest_framework.serializers import ValidationError as DjangoRestFrameworkValidationError
 
-from sentry.backup.dependencies import NormalizedModelName, PrimaryKeyMap, get_model, get_model_name
-from sentry.backup.helpers import EXCLUDED_APPS, Filter, ImportFlags, decrypt_encrypted_tarball
+from sentry.backup.dependencies import (
+    NormalizedModelName,
+    PrimaryKeyMap,
+    dependencies,
+    get_model_name,
+)
+from sentry.backup.helpers import Filter, ImportFlags, decrypt_encrypted_tarball
 from sentry.backup.scopes import ImportScope
-from sentry.silo import unguarded_write
+from sentry.services.hybrid_cloud.import_export.model import (
+    RpcFilter,
+    RpcImportError,
+    RpcImportErrorKind,
+    RpcImportFlags,
+    RpcImportScope,
+    RpcPrimaryKeyMap,
+)
+from sentry.services.hybrid_cloud.import_export.service import ImportExportService
+from sentry.silo.base import SiloMode
+from sentry.silo.safety import unguarded_write
 from sentry.utils import json
+from sentry.utils.env import is_split_db
 
 __all__ = (
+    "ImportingError",
     "import_in_user_scope",
     "import_in_organization_scope",
     "import_in_config_scope",
@@ -24,6 +38,11 @@ __all__ = (
 )
 
 
+class ImportingError(Exception):
+    def __init__(self, context: RpcImportError) -> None:
+        self.context = context
+
+
 def _import(
     src: BinaryIO,
     scope: ImportScope,
@@ -45,6 +64,11 @@ def _import(
     from sentry.models.organizationmember import OrganizationMember
     from sentry.models.user import User
 
+    if SiloMode.get_current_mode() == SiloMode.CONTROL:
+        errText = "Imports must be run in REGION or MONOLITH instances only"
+        printer(errText, err=True)
+        raise RuntimeError(errText)
+
     flags = flags if flags is not None else ImportFlags()
     user_model_name = get_model_name(User)
     org_model_name = get_model_name(Organization)
@@ -154,83 +178,38 @@ def _import(
     # of how we do atomicity: on a per-model (if using multiple dbs) or global (if using a single
     # db) basis.
     def do_write():
-        allowed_relocation_scopes = scope.value
         pk_map = PrimaryKeyMap()
-        for (batch_model_name, batch) in yield_json_models(src):
-            model = get_model(batch_model_name)
-            if model is None:
-                raise ValueError("Unknown model name")
-
-            using = router.db_for_write(model)
-            with transaction.atomic(using=using):
-                count = 0
-                for obj in serializers.deserialize("json", batch, use_natural_keys=False):
-                    o = obj.object
-                    if o._meta.app_label not in EXCLUDED_APPS or o:
-                        if o.get_possible_relocation_scopes() & allowed_relocation_scopes:
-                            o = obj.object
-                            model_name = get_model_name(o)
-                            for f in filters:
-                                if f.model == type(o) and getattr(o, f.field, None) not in f.values:
-                                    break
-                            else:
-                                # We can only be sure `get_relocation_scope()` will be correct if it
-                                # is fired AFTER normalization, as some `get_relocation_scope()`
-                                # methods rely on being able to correctly resolve foreign keys,
-                                # which is only possible after normalization.
-                                old_pk = o.normalize_before_relocation_import(pk_map, scope, flags)
-                                if old_pk is None:
-                                    continue
-
-                                # Now that the model has been normalized, we can ensure that this
-                                # particular instance has a `RelocationScope` that permits
-                                # importing.
-                                if not o.get_relocation_scope() in allowed_relocation_scopes:
-                                    continue
-
-                                written = o.write_relocation_import(scope, flags)
-                                if written is None:
-                                    continue
-
-                                new_pk, import_kind = written
-                                slug = getattr(o, "slug", None)
-                                pk_map.insert(model_name, old_pk, new_pk, import_kind, slug)
-                                count += 1
-
-                # If we wrote at least one model, make sure to update the sequences too.
-                if count > 0:
-                    table = o._meta.db_table
-                    seq = f"{table}_id_seq"
-                    with connections[using].cursor() as cursor:
-                        cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])
-
-    try:
-        if len(settings.DATABASES) == 1:
-            # TODO(getsentry/team-ospo#185): This is currently untested in single-db mode. Fix ASAP!
-            with unguarded_write(using="default"), transaction.atomic("default"):
-                do_write()
-        else:
+        for model_name, json_data in yield_json_models(src):
+            model_relations = dependencies().get(model_name)
+            if not model_relations:
+                continue
+
+            dep_models = {
+                get_model_name(d) for d in model_relations.get_dependencies_for_relocation()
+            }
+            import_by_model = ImportExportService.get_importer_for_model(model_relations.model)
+            result = import_by_model(
+                model_name=str(model_name),
+                scope=RpcImportScope.into_rpc(scope),
+                flags=RpcImportFlags.into_rpc(flags),
+                filter_by=[RpcFilter.into_rpc(f) for f in filters],
+                pk_map=RpcPrimaryKeyMap.into_rpc(pk_map.partition(dep_models)),
+                json_data=json_data,
+            )
+
+            if isinstance(result, RpcImportError):
+                printer(result.pretty(), err=True)
+                if result.get_kind() == RpcImportErrorKind.IntegrityError:
+                    warningText = ">> Are you restoring from a backup of the same version of Sentry?\n>> Are you restoring onto a clean database?\n>> If so then this IntegrityError might be our fault, you can open an issue here:\n>> https://github.com/getsentry/sentry/issues/new/choose"
+                    printer(warningText, err=True)
+                raise ImportingError(result)
+            pk_map.extend(result.mapped_pks)
+
+    if SiloMode.get_current_mode() == SiloMode.MONOLITH and not is_split_db():
+        with unguarded_write(using="default"), transaction.atomic(using="default"):
             do_write()
-
-    # For all database integrity errors, let's warn users to follow our
-    # recommended backup/restore workflow before reraising exception. Most of
-    # these errors come from restoring on a different version of Sentry or not restoring
-    # on a clean install.
-    except IntegrityError as e:
-        warningText = ">> Are you restoring from a backup of the same version of Sentry?\n>> Are you restoring onto a clean database?\n>> If so then this IntegrityError might be our fault, you can open an issue here:\n>> https://github.com/getsentry/sentry/issues/new/choose"
-        printer(
-            warningText,
-            err=True,
-        )
-        raise (e)
-
-    # Calls to `write_relocation_import` may fail validation and throw either a
-    # `DjangoValidationError` when a call to `.full_clean()` failed, or a
-    # `DjangoRestFrameworkValidationError` when a call to a custom DRF serializer failed. This
-    # exception catcher converts instances of the former to the latter.
-    except DjangoValidationError as e:
-        errs = {field: error for field, error in e.message_dict.items()}
-        raise DjangoRestFrameworkValidationError(errs) from e
+    else:
+        do_write()
 
 
 def import_in_user_scope(

+ 8 - 6
src/sentry/runner/commands/backup.py

@@ -5,12 +5,6 @@ import click
 from sentry.backup.comparators import get_default_comparators
 from sentry.backup.findings import FindingJSONEncoder
 from sentry.backup.helpers import ImportFlags
-from sentry.backup.imports import (
-    import_in_config_scope,
-    import_in_global_scope,
-    import_in_organization_scope,
-    import_in_user_scope,
-)
 from sentry.backup.validate import validate
 from sentry.runner.decorators import configuration
 from sentry.utils import json
@@ -136,6 +130,8 @@ def import_users(src, decrypt_with, filter_usernames, merge_users, silent):
     Import the Sentry users from an exported JSON file.
     """
 
+    from sentry.backup.imports import import_in_user_scope
+
     import_in_user_scope(
         src,
         decrypt_with=decrypt_with,
@@ -173,6 +169,8 @@ def import_organizations(src, decrypt_with, filter_org_slugs, merge_users, silen
     Import the Sentry organizations, and all constituent Sentry users, from an exported JSON file.
     """
 
+    from sentry.backup.imports import import_in_organization_scope
+
     import_in_organization_scope(
         src,
         decrypt_with=decrypt_with,
@@ -208,6 +206,8 @@ def import_config(src, decrypt_with, merge_users, overwrite_configs, silent):
     Import all configuration and administrator accounts needed to set up this Sentry instance.
     """
 
+    from sentry.backup.imports import import_in_config_scope
+
     import_in_config_scope(
         src,
         decrypt_with=decrypt_with,
@@ -236,6 +236,8 @@ def import_global(src, decrypt_with, silent, overwrite_configs):
     Import all Sentry data from an exported JSON file.
     """
 
+    from sentry.backup.imports import import_in_global_scope
+
     import_in_global_scope(
         src,
         decrypt_with=decrypt_with,

+ 185 - 4
src/sentry/services/hybrid_cloud/import_export/impl.py

@@ -5,8 +5,12 @@
 
 from typing import List, Optional, Set
 
-from django.core.serializers import serialize
+from django.core.exceptions import ValidationError as DjangoValidationError
+from django.core.serializers import deserialize, serialize
+from django.core.serializers.base import DeserializationError
+from django.db import DatabaseError, IntegrityError, connections, router, transaction
 from django.db.models import Q
+from rest_framework.serializers import ValidationError as DjangoRestFrameworkValidationError
 
 from sentry.backup.dependencies import (
     ImportKind,
@@ -17,7 +21,7 @@ from sentry.backup.dependencies import (
     get_model_name,
 )
 from sentry.backup.findings import InstanceID
-from sentry.backup.helpers import DatetimeSafeDjangoJSONEncoder, Filter
+from sentry.backup.helpers import EXCLUDED_APPS, DatetimeSafeDjangoJSONEncoder, Filter
 from sentry.backup.scopes import ExportScope
 from sentry.models.user import User
 from sentry.models.userpermission import UserPermission
@@ -29,6 +33,12 @@ from sentry.services.hybrid_cloud.import_export.model import (
     RpcExportResult,
     RpcExportScope,
     RpcFilter,
+    RpcImportError,
+    RpcImportErrorKind,
+    RpcImportFlags,
+    RpcImportOk,
+    RpcImportResult,
+    RpcImportScope,
     RpcPrimaryKeyMap,
 )
 from sentry.services.hybrid_cloud.import_export.service import ImportExportService
@@ -50,6 +60,175 @@ class UniversalImportExportService(ImportExportService):
             ImportExportService.get_local_implementation().export_by_model(...)
     """
 
+    def import_by_model(
+        self,
+        *,
+        model_name: str,
+        scope: Optional[RpcImportScope] = None,
+        flags: RpcImportFlags,
+        filter_by: List[RpcFilter],
+        pk_map: RpcPrimaryKeyMap,
+        json_data: str,
+    ) -> RpcImportResult:
+        import_flags = flags.from_rpc()
+        batch_model_name = NormalizedModelName(model_name)
+        model = get_model(batch_model_name)
+        if model is None:
+            return RpcImportError(
+                kind=RpcImportErrorKind.UnknownModel,
+                on=InstanceID(model_name),
+                reason=f"The model `{model_name}` could not be found",
+            )
+
+        silo_mode = SiloMode.get_current_mode()
+        model_modes = model._meta.silo_limit.modes  # type: ignore
+        if silo_mode != SiloMode.MONOLITH and silo_mode not in model_modes:
+            return RpcImportError(
+                kind=RpcImportErrorKind.IncorrectSiloModeForModel,
+                on=InstanceID(model_name),
+                reason=f"The model `{model_name}` was forwarded to the incorrect silo (it cannot be imported from the {silo_mode} silo)",
+            )
+
+        if scope is None:
+            return RpcImportError(
+                kind=RpcImportErrorKind.UnspecifiedScope,
+                on=InstanceID(model_name),
+                reason="The RPC was called incorrectly, please set an `ImportScope` parameter",
+            )
+
+        import_scope = scope.from_rpc()
+        in_pk_map = pk_map.from_rpc()
+        filters: List[Filter] = []
+        for fb in filter_by:
+            if NormalizedModelName(fb.model_name) == batch_model_name:
+                filters.append(fb.from_rpc())
+
+        try:
+            using = router.db_for_write(model)
+            with transaction.atomic(using=using):
+                ok_relocation_scopes = import_scope.value
+                out_pk_map = PrimaryKeyMap()
+                max_pk = 0
+                counter = 0
+                for deserialized_object in deserialize("json", json_data, use_natural_keys=False):
+                    counter += 1
+                    model_instance = deserialized_object.object
+                    if model_instance._meta.app_label not in EXCLUDED_APPS or model_instance:
+                        if model_instance.get_possible_relocation_scopes() & ok_relocation_scopes:
+                            inst_model_name = get_model_name(model_instance)
+                            if inst_model_name != batch_model_name:
+                                return RpcImportError(
+                                    kind=RpcImportErrorKind.UnexpectedModel,
+                                    on=InstanceID(model=str(inst_model_name), ordinal=1),
+                                    left_pk=model_instance.pk,
+                                    reason=f"Received model of kind `{str(inst_model_name)}` when `{str(batch_model_name)}` was expected",
+                                )
+
+                            for f in filters:
+                                if getattr(model_instance, f.field, None) not in f.values:
+                                    break
+                            else:
+                                try:
+                                    # We can only be sure `get_relocation_scope()` will be correct
+                                    # if it is fired AFTER normalization, as some
+                                    # `get_relocation_scope()` methods rely on being able to
+                                    # correctly resolve foreign keys, which is only possible after
+                                    # normalization.
+                                    old_pk = model_instance.normalize_before_relocation_import(
+                                        in_pk_map, import_scope, import_flags
+                                    )
+                                    if old_pk is None:
+                                        continue
+
+                                    # Now that the model has been normalized, we can ensure that
+                                    # this particular instance has a `RelocationScope` that permits
+                                    # importing.
+                                    if (
+                                        not model_instance.get_relocation_scope()
+                                        in ok_relocation_scopes
+                                    ):
+                                        continue
+
+                                    # Perform the actual database write.
+                                    written = model_instance.write_relocation_import(
+                                        import_scope, import_flags
+                                    )
+                                    if written is None:
+                                        continue
+
+                                    # For models that may have circular references to themselves
+                                    # (unlikely), keep track of the new pk in the input map as well.
+                                    new_pk, import_kind = written
+                                    slug = getattr(model_instance, "slug", None)
+                                    in_pk_map.insert(
+                                        inst_model_name, old_pk, new_pk, import_kind, slug
+                                    )
+                                    out_pk_map.insert(
+                                        inst_model_name, old_pk, new_pk, import_kind, slug
+                                    )
+                                    if new_pk > max_pk:
+                                        max_pk = new_pk
+
+                                except DjangoValidationError as e:
+                                    errs = {field: error for field, error in e.message_dict.items()}
+                                    return RpcImportError(
+                                        kind=RpcImportErrorKind.ValidationError,
+                                        on=InstanceID(model_name, ordinal=counter),
+                                        left_pk=model_instance.pk,
+                                        reason=f"Django validation error encountered: {errs}",
+                                    )
+
+                                except DjangoRestFrameworkValidationError as e:
+                                    return RpcImportError(
+                                        kind=RpcImportErrorKind.ValidationError,
+                                        on=InstanceID(model_name, ordinal=counter),
+                                        left_pk=model_instance.pk,
+                                        reason=str(e),
+                                    )
+
+            # If we wrote at least one model, make sure to update the sequences too.
+            if counter > 0:
+                table = model_instance._meta.db_table
+                seq = f"{table}_id_seq"
+                with connections[using].cursor() as cursor:
+                    cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])
+
+            return RpcImportOk(
+                mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
+                max_pk=max_pk,
+                num_imported=counter,
+            )
+
+        except DeserializationError:
+            return RpcImportError(
+                kind=RpcImportErrorKind.DeserializationFailed,
+                on=InstanceID(model_name),
+                reason="The submitted JSON could not be deserialized into Django model instances",
+            )
+
+        # Catch `IntegrityError` before `DatabaseError`, since the former is a subclass of the
+        # latter.
+        except IntegrityError as e:
+            return RpcImportError(
+                kind=RpcImportErrorKind.IntegrityError,
+                on=InstanceID(model_name),
+                reason=str(e),
+            )
+
+        except DatabaseError as e:
+            return RpcImportError(
+                kind=RpcImportErrorKind.DatabaseError,
+                on=InstanceID(model_name),
+                reason=str(e),
+            )
+
+        except Exception as e:
+            return RpcImportError(
+                kind=RpcImportErrorKind.Unknown,
+                on=InstanceID(model_name),
+                reason=f"Unknown internal error occurred: {e}",
+            )
+
     def export_by_model(
         self,
         *,
@@ -94,7 +273,7 @@ class UniversalImportExportService(ImportExportService):
             allowed_relocation_scopes = export_scope.value
             possible_relocation_scopes = model.get_possible_relocation_scopes()
             includable = possible_relocation_scopes & allowed_relocation_scopes
-            if not includable or model._meta.proxy:
+            if not includable:
                 return RpcExportError(
                     kind=RpcExportErrorKind.UnexportableModel,
                     on=InstanceID(model_name),
@@ -142,7 +321,9 @@ class UniversalImportExportService(ImportExportService):
                             # For models that may have circular references to themselves (unlikely),
                             # keep track of the new pk in the input map as well.
                             nonlocal max_pk
-                            max_pk = item.pk
+                            if item.pk > max_pk:
+                                max_pk = item.pk
+
                             in_pk_map.insert(model_name, item.pk, item.pk, ImportKind.Inserted)
                             out_pk_map.insert(model_name, item.pk, item.pk, ImportKind.Inserted)
                             yield item

Some files were not shown because too many files changed in this diff