Browse Source

feat(backup): Add preprocessing relocation tasks (#59199)

These tasks deal with all of the little preparatory steps we need to
complete before we can do a full-fledged, Google CloudBuild backed
validation run. This includes performing some basic validation that we
can do cheaply up front, as well as ensuring that all of the necessary
data has been collected and moved to the correct location in GCS.

Issue: getsentry/team-ospo#203
Alex Zaslavsky 1 year ago
parent
commit
795b976cf3

+ 27 - 0
fixtures/backup/invalid-user.json

@@ -0,0 +1,27 @@
+[
+    {
+        "model": "sentry.user",
+        "pk": 1,
+        "fields": {
+            "password": "pbkdf2_sha256$150000$iEvdIknqYjTr$+QsGn0tfIJ1FZLxQI37mVU1gL2KbL/wqjMtG/dFhsMA=",
+            "last_login": null,
+            "name": "",
+            "email": "maximum@example.com",
+            "is_staff": true,
+            "is_active": true,
+            "is_superuser": true,
+            "is_managed": false,
+            "is_sentry_app": null,
+            "is_password_expired": false,
+            "is_unclaimed": false,
+            "last_password_change": "2023-06-22T22:59:57.023Z",
+            "flags": "0",
+            "session_nonce": null,
+            "date_joined": "2023-06-22T22:59:55.488Z",
+            "last_active": "2023-06-22T22:59:55.489Z",
+            "avatar_type": 0,
+            "avatar_url": null,
+            "doesnotexist": "foo"
+        }
+    }
+]

+ 4 - 4
fixtures/backup/single-option.json

@@ -3,10 +3,10 @@
         "model": "sentry.option",
         "pk": 1,
         "fields": {
-        "key": "sentry:latest_version",
-        "last_updated": "2023-06-22T00:00:00.000Z",
-        "last_updated_by": "unknown",
-        "value": "\"23.6.1\""
+            "key": "sentry:latest_version",
+            "last_updated": "2023-06-22T00:00:00.000Z",
+            "last_updated_by": "unknown",
+            "value": "\"23.6.1\""
         }
     }
 ]

+ 25 - 0
src/sentry/backup/helpers.py

@@ -18,6 +18,7 @@ from google_crc32c import value as crc32c
 
 from sentry.backup.scopes import RelocationScope
 from sentry.utils import json
+from sentry.utils.env import gcp_project_id
 
 # Django apps we take care to never import or export from.
 EXCLUDED_APPS = frozenset(("auth", "contenttypes", "fixtures"))
@@ -180,10 +181,34 @@ class CryptoKeyVersion(NamedTuple):
     version: str
 
 
+DEFAULT_CRYPTO_KEY_VERSION = CryptoKeyVersion(
+    project_id=gcp_project_id(),
+    location="global",
+    key_ring="relocation",
+    key="relocation",
+    # TODO(getsentry/team-ospo#190): This version should be pulled from an option, rather than hard
+    # coded.
+    version="1",
+)
+
+
 class DecryptionError(Exception):
     pass
 
 
+def get_public_key_using_gcp_kms(crypto_key_version: CryptoKeyVersion) -> bytes:
+    kms_client = KeyManagementServiceClient()
+    key_name = kms_client.crypto_key_version_path(
+        project=crypto_key_version.project_id,
+        location=crypto_key_version.location,
+        key_ring=crypto_key_version.key_ring,
+        crypto_key=crypto_key_version.key,
+        crypto_key_version=crypto_key_version.version,
+    )
+    public_key = kms_client.get_public_key(request={"name": key_name})
+    return public_key.pem.encode("utf-8")
+
+
 def decrypt_data_encryption_key_using_gcp_kms(
     unwrapped: UnwrappedEncryptedExportTarball, gcp_kms_config: bytes
 ) -> bytes:

+ 12 - 0
src/sentry/models/relocation.py

@@ -161,6 +161,18 @@ class RelocationFile(DefaultFieldsModel):
         def get_choices(cls) -> list[tuple[int, str]]:
             return [(key.value, key.name) for key in cls]
 
+        def to_filename(self, suffix: str):
+            if self.name == "RAW_USER_DATA":
+                return f"raw-relocation-data.{suffix}"
+            elif self.name == "NORMALIZED_USER_DATA":
+                return f"normalized-relocation-data.{suffix}"
+            elif self.name == "BASELINE_CONFIG_VALIDATION_DATA":
+                return f"baseline-config.{suffix}"
+            elif self.name == "COLLIDING_USERS_VALIDATION_DATA":
+                return f"colliding-users.{suffix}"
+            else:
+                raise ValueError("Cannot extract a filename from `RelocationFile.Kind.UNKNOWN`.")
+
     relocation = FlexibleForeignKey("sentry.Relocation")
     file = FlexibleForeignKey("sentry.File")
     kind = models.SmallIntegerField(choices=Kind.get_choices())

+ 375 - 5
src/sentry/tasks/relocation.py

@@ -1,12 +1,37 @@
 from __future__ import annotations
 
 import logging
+from io import BytesIO
+from string import Template
 from typing import Optional
 
+from cryptography.fernet import Fernet
+
+from sentry.backup.dependencies import NormalizedModelName, get_model
+from sentry.backup.exports import export_in_config_scope, export_in_user_scope
+from sentry.backup.helpers import (
+    DEFAULT_CRYPTO_KEY_VERSION,
+    decrypt_data_encryption_key_using_gcp_kms,
+    get_public_key_using_gcp_kms,
+    unwrap_encrypted_export_tarball,
+)
+from sentry.filestore.gcs import GoogleCloudStorage
+from sentry.models.files.file import File
+from sentry.models.files.utils import get_storage
+from sentry.models.organization import Organization
 from sentry.models.relocation import Relocation, RelocationFile
+from sentry.models.user import User
 from sentry.silo import SiloMode
 from sentry.tasks.base import instrumented_task
-from sentry.utils.relocation import OrderedTask, retry_task_or_fail_relocation, start_task
+from sentry.utils import json
+from sentry.utils.relocation import (
+    RELOCATION_BLOB_SIZE,
+    RELOCATION_FILE_TYPE,
+    OrderedTask,
+    fail_relocation,
+    retry_task_or_fail_relocation,
+    start_relocation_task,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -29,8 +54,25 @@ RELOCATION_FILES_TO_BE_VALIDATED = [
     RelocationFile.Kind.RAW_USER_DATA,
 ]
 
-# Various error strings that we want to surface to users.
-ERR_FILE_UPLOAD = "Internal error during file upload"
+# Various error strings that we want to surface to users, grouped by step.
+ERR_UPLOADING_FAILED = "Internal error during file upload."
+
+ERR_PREPROCESSING_DECRYPTION = """Could not decrypt the imported JSON - are you sure you used the
+                                  correct public key?"""
+ERR_PREPROCESSING_INTERNAL = "Internal error during preprocessing."
+ERR_PREPROCESSING_INVALID_JSON = "Invalid input JSON."
+ERR_PREPROCESSING_INVALID_TARBALL = "The import tarball you provided was invalid."
+ERR_PREPROCESSING_NO_USERS = "The provided JSON must contain at least one user."
+ERR_PREPROCESSING_TOO_MANY_USERS = Template(
+    f"The provided JSON must contain $count users but must not exceed the limit of {MAX_USERS_PER_RELOCATION}."
+)
+ERR_PREPROCESSING_NO_ORGS = "The provided JSON must contain at least one organization."
+ERR_PREPROCESSING_TOO_MANY_ORGS = Template(
+    f"The provided JSON must contain $count organizations, but must not exceed the limit of {MAX_ORGS_PER_RELOCATION}."
+)
+ERR_PREPROCESSING_MISSING_ORGS = Template(
+    "The following organization slug imports were requested, but could not be found in your submitted JSON: $orgs."
+)
 
 
 # TODO(getsentry/team-ospo#203): We should split this task in two, one for "small" imports of say
@@ -52,7 +94,7 @@ def uploading_complete(uuid: str) -> None:
 
     relocation: Optional[Relocation]
     attempts_left: int
-    (relocation, attempts_left) = start_task(
+    (relocation, attempts_left) = start_relocation_task(
         uuid=uuid,
         step=Relocation.Step.UPLOADING,
         task=OrderedTask.UPLOADING_COMPLETE,
@@ -69,7 +111,7 @@ def uploading_complete(uuid: str) -> None:
         relocation,
         OrderedTask.UPLOADING_COMPLETE,
         attempts_left,
-        ERR_FILE_UPLOAD,
+        ERR_UPLOADING_FAILED,
     ):
         raw_relocation_file = (
             RelocationFile.objects.filter(
@@ -95,5 +137,333 @@ def uploading_complete(uuid: str) -> None:
     silo_mode=SiloMode.REGION,
 )
 def preprocessing_scan(uuid: str) -> None:
+    """
+    Performs the very first part of the `PREPROCESSING` step of a `Relocation`, which involves
+    decrypting the user-supplied tarball and picking out some useful information for it. This let's
+    us validate a few things:
+
+        - Ensuring that the user gave us properly encrypted data (was it encrypted? With the right
+          key?).
+        - Ensuring that the org slug the user supplied exists in the provided JSON data.
+        - Recording the slugs of the orgs the relocation is attempting to import.
+        - Recording the usernames of the users the relocation is attempting to import.
+
+    Of the preprocessing tasks, this is the most resource-onerous (what if the importer provides a
+    2GB JSON blob? What if they have 20,000 usernames? Etc...) so we should take care with our retry
+    logic and set careful limits.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.PREPROCESSING,
+        task=OrderedTask.PREPROCESSING_SCAN,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.PREPROCESSING_SCAN,
+        attempts_left,
+        ERR_PREPROCESSING_INTERNAL,
+    ):
+        # The `uploading_complete` task above should have verified that this is ready for use.
+        raw_relocation_file = (
+            RelocationFile.objects.filter(
+                relocation=relocation,
+                kind=RelocationFile.Kind.RAW_USER_DATA.value,
+            )
+            .select_related("file")
+            .first()
+        )
+        fp = raw_relocation_file.file.getfile()
+
+        with fp:
+            try:
+                unwrapped = unwrap_encrypted_export_tarball(fp)
+            except Exception:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_INVALID_TARBALL,
+                )
+
+            # Decrypt the DEK using Google KMS, and use the decrypted DEK to decrypt the encoded
+            # JSON.
+            try:
+                plaintext_data_encryption_key = decrypt_data_encryption_key_using_gcp_kms(
+                    unwrapped,
+                    json.dumps(DEFAULT_CRYPTO_KEY_VERSION).encode("utf-8"),
+                )
+                decryptor = Fernet(plaintext_data_encryption_key)
+                json_data = decryptor.decrypt(unwrapped.encrypted_json_blob).decode("utf-8")
+            except Exception:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_DECRYPTION,
+                )
+
+            # Grab usernames and org slugs from the JSON data.
+            usernames = []
+            org_slugs = []
+            try:
+                for json_model in json.loads(json_data):
+                    model_name = NormalizedModelName(json_model["model"])
+                    if get_model(model_name) == Organization:
+                        org_slugs.append(json_model["fields"]["slug"])
+                        # TODO(getsentry/team-ospo#190): Validate slug using regex, so that we can
+                        # fail early on obviously invalid slugs. Also keeps the database `JSONField`
+                        # from ballooning on bad input.
+                    if get_model(model_name) == User:
+                        usernames.append(json_model["fields"]["username"])
+                        # TODO(getsentry/team-ospo#190): Validate username using regex, so that we
+                        # can fail early on obviously invalid usernames. Also keeps the database
+                        # `JSONField` from ballooning on bad input.
+            except KeyError:
+                return fail_relocation(
+                    relocation, OrderedTask.PREPROCESSING_SCAN, ERR_PREPROCESSING_INVALID_JSON
+                )
+
+            # Ensure that the data is reasonable and within our set bounds before we start on the
+            # next task.
+            if len(usernames) == 0:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_NO_USERS,
+                )
+            if len(usernames) > MAX_USERS_PER_RELOCATION:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_TOO_MANY_USERS.substitute(count=len(usernames)),
+                )
+            if len(org_slugs) == 0:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_NO_ORGS,
+                )
+            if len(org_slugs) > MAX_ORGS_PER_RELOCATION:
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_TOO_MANY_ORGS.substitute(count=len(org_slugs)),
+                )
+            missing_org_slugs = set(relocation.want_org_slugs) - set(org_slugs)
+            if len(missing_org_slugs):
+                return fail_relocation(
+                    relocation,
+                    OrderedTask.PREPROCESSING_SCAN,
+                    ERR_PREPROCESSING_MISSING_ORGS.substitute(
+                        orgs=",".join(sorted(missing_org_slugs))
+                    ),
+                )
+
+            relocation.want_usernames = sorted(usernames)
+            relocation.save()
+
+            # TODO(getsentry/team-ospo#203): The user's import data looks basically okay - we should
+            # use this opportunity to send a "your relocation request has been accepted and is in
+            # flight, please give it a couple hours" email.
+            preprocessing_baseline_config.delay(uuid)
+
+
+@instrumented_task(
+    name="sentry.relocation.preprocessing_baseline_config",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def preprocessing_baseline_config(uuid: str) -> None:
+    """
+    Pulls down the global config data we'll need to check for collisions and global data integrity.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.PREPROCESSING,
+        task=OrderedTask.PREPROCESSING_BASELINE_CONFIG,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.PREPROCESSING_BASELINE_CONFIG,
+        attempts_left,
+        ERR_PREPROCESSING_INTERNAL,
+    ):
+        # TODO(getsentry/team-ospo#203): A very nice optimization here is to only pull this down
+        # once a day - if we've already done a relocation today, we should just copy that file
+        # instead of doing this (expensive!) global export again.
+        fp = BytesIO()
+        export_in_config_scope(
+            fp,
+            encrypt_with=BytesIO(get_public_key_using_gcp_kms(DEFAULT_CRYPTO_KEY_VERSION)),
+        )
+        fp.seek(0)
+        kind = RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA
+        file = File.objects.create(name=kind.to_filename("tar"), type=RELOCATION_FILE_TYPE)
+        file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
+        RelocationFile.objects.create(
+            relocation=relocation,
+            file=file,
+            kind=kind.value,
+        )
+
+        preprocessing_colliding_users.delay(uuid)
+
+
+@instrumented_task(
+    name="sentry.relocation.preprocessing_colliding_users",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def preprocessing_colliding_users(uuid: str) -> None:
+    """
+    Pulls down any already existing users whose usernames match those found in the import - we'll
+    need to validate that none of these are mutated during import.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.PREPROCESSING,
+        task=OrderedTask.PREPROCESSING_COLLIDING_USERS,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.PREPROCESSING_COLLIDING_USERS,
+        attempts_left,
+        ERR_PREPROCESSING_INTERNAL,
+    ):
+        fp = BytesIO()
+        export_in_user_scope(
+            fp,
+            encrypt_with=BytesIO(get_public_key_using_gcp_kms(DEFAULT_CRYPTO_KEY_VERSION)),
+            user_filter=set(relocation.want_usernames),
+        )
+        fp.seek(0)
+        kind = RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA
+        file = File.objects.create(name=kind.to_filename("tar"), type=RELOCATION_FILE_TYPE)
+        file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
+        RelocationFile.objects.create(
+            relocation=relocation,
+            file=file,
+            kind=kind.value,
+        )
+
+        preprocessing_complete.delay(uuid)
+
+
+@instrumented_task(
+    name="sentry.relocation.preprocessing_complete",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def preprocessing_complete(uuid: str) -> None:
+    """
+    Creates a "composite object" from the uploaded tarball, which could have many pieces. Because
+    creating a composite object in this manner is a synchronous operation, we don't need a follow-up
+    step confirming success.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.PREPROCESSING,
+        task=OrderedTask.PREPROCESSING_COMPLETE,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.PREPROCESSING_COMPLETE,
+        attempts_left,
+        ERR_PREPROCESSING_INTERNAL,
+    ):
+        storage = get_storage()
+        for kind in RELOCATION_FILES_TO_BE_VALIDATED:
+            raw_relocation_file = (
+                RelocationFile.objects.filter(
+                    relocation=relocation,
+                    kind=kind.value,
+                )
+                .select_related("file")
+                .prefetch_related("file__blobs")
+                .first()
+            )
+
+            file = raw_relocation_file.file
+            path = f'relocations/runs/{uuid}/in/{kind.to_filename("tar")}'
+            if isinstance(storage, GoogleCloudStorage):
+                # If we're using GCS, rather than performing an expensive copy of the file, just
+                # create a composite object.
+                storage.client.bucket(storage.bucket_name).blob(path).compose(
+                    [b.getfile().blob for b in file.blobs.all()]
+                )
+            else:
+                # In S3 or the local filesystem, no "composite object" API exists, so we do a manual
+                # concatenation then copying instead.
+                fp = file.getfile()
+                fp.seek(0)
+                storage.save(path, fp)
+
+        relocation.step = Relocation.Step.VALIDATING.value
+        relocation.save()
+        validating_start.delay(uuid)
+
+
+@instrumented_task(
+    name="sentry.relocation.validating_start",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def validating_start(uuid: str) -> None:
+    """
+    Calls into Google CloudBuild and kicks off a validation run.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
     # TODO(getsentry/team-ospo#203): Implement this.
     pass

+ 20 - 2
src/sentry/testutils/helpers/backups.py

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta
 from functools import cached_property, lru_cache
 from pathlib import Path
 from typing import Tuple
+from unittest.mock import MagicMock
 from uuid import uuid4
 
 from cryptography.hazmat.backends import default_backend
@@ -26,7 +27,7 @@ from sentry.backup.exports import (
     export_in_user_scope,
 )
 from sentry.backup.findings import ComparatorFindings
-from sentry.backup.helpers import decrypt_encrypted_tarball
+from sentry.backup.helpers import KeyManagementServiceClient, decrypt_encrypted_tarball
 from sentry.backup.imports import import_in_global_scope
 from sentry.backup.scopes import ExportScope
 from sentry.backup.validate import validate
@@ -98,6 +99,21 @@ __all__ = [
 NOOP_PRINTER = lambda *args, **kwargs: None
 
 
+class FakeKeyManagementServiceClient:
+    """
+    Fake version of `KeyManagementServiceClient` that removes the two network calls we rely on: the
+    `Transport` setup on class construction, and the call to the hosted `asymmetric_decrypt`
+    endpoint.
+    """
+
+    asymmetric_decrypt = MagicMock()
+    get_public_key = MagicMock()
+
+    @staticmethod
+    def crypto_key_version_path(**kwargs) -> str:
+        return KeyManagementServiceClient.crypto_key_version_path(**kwargs)
+
+
 class ValidationError(Exception):
     def __init__(self, info: ComparatorFindings):
         super().__init__(info.pretty())
@@ -600,7 +616,9 @@ class BackupTestCase(TransactionTestCase):
 
     def create_exhaustive_instance(self, *, is_superadmin: bool = False):
         """
-        Takes an empty Sentry instance's database, and populates it with an "exhaustive" version of every model. The end result is two users, in one organization, with one full set of extensions, and all global flags set.
+        Takes an empty Sentry instance's database, and populates it with an "exhaustive" version of
+        every model. The end result is two users, in one organization, with one full set of
+        extensions, and all global flags set.
         """
 
         owner = self.create_exhaustive_user(

+ 17 - 0
src/sentry/utils/env.py

@@ -1,12 +1,29 @@
+import os
 import sys
 
 from django.conf import settings
 
+from sentry.utils import json
+
 
 def in_test_environment() -> bool:
     return "pytest" in sys.argv[0] or "vscode" in sys.argv[0]
 
 
+def gcp_project_id() -> str:
+    if in_test_environment():
+        return "__test_gcp_project__"
+
+    adc_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "")
+    if adc_path:
+        with open(adc_path) as fp:
+            adc = json.load(fp)
+            if adc.get("quota_project_id") is not None:
+                return adc.get("quota_project_id")
+
+    return "__unknown_gcp_project__"
+
+
 def is_split_db() -> bool:
     if len(settings.DATABASES) != 1:  # type: ignore
         return True

+ 6 - 2
src/sentry/utils/relocation.py

@@ -17,6 +17,10 @@ class OrderedTask(Enum):
     NONE = 0
     UPLOADING_COMPLETE = 1
     PREPROCESSING_SCAN = 2
+    PREPROCESSING_BASELINE_CONFIG = 3
+    PREPROCESSING_COLLIDING_USERS = 4
+    PREPROCESSING_COMPLETE = 5
+    VALIDATING_START = 6
 
 
 # The file type for a relocation export tarball of any kind.
@@ -35,7 +39,7 @@ RELOCATION_FILE_TYPE = "relocation.file"
 RELOCATION_BLOB_SIZE = int((2**31) / 32)
 
 
-def start_task(
+def start_relocation_task(
     uuid: str, step: Relocation.Step, task: OrderedTask, allowed_task_attempts: int
 ) -> Tuple[Optional[Relocation], int]:
     """
@@ -77,7 +81,7 @@ def start_task(
         return (None, 0)
     else:
         relocation.latest_task = task.name
-        relocation.latest_task_attempts += 1
+        relocation.latest_task_attempts = 1
 
     relocation.step = step.value
     relocation.save()

+ 6 - 17
tests/sentry/runner/commands/test_backup.py

@@ -3,14 +3,13 @@ from __future__ import annotations
 from pathlib import Path
 from tempfile import TemporaryDirectory
 from types import SimpleNamespace
-from unittest.mock import MagicMock, patch
+from unittest.mock import patch
 
 from click.testing import CliRunner
 from google_crc32c import value as crc32c
 
 from sentry.backup.helpers import (
     DecryptionError,
-    KeyManagementServiceClient,
     create_encrypted_export_tarball,
     decrypt_data_encryption_key_local,
     unwrap_encrypted_export_tarball,
@@ -21,7 +20,11 @@ from sentry.services.hybrid_cloud.import_export.model import RpcImportErrorKind
 from sentry.silo.base import SiloMode
 from sentry.testutils.cases import TestCase, TransactionTestCase
 from sentry.testutils.factories import get_fixture_path
-from sentry.testutils.helpers.backups import clear_database, generate_rsa_key_pair
+from sentry.testutils.helpers.backups import (
+    FakeKeyManagementServiceClient,
+    clear_database,
+    generate_rsa_key_pair,
+)
 from sentry.testutils.silo import assume_test_silo_mode, region_silo_test
 from sentry.utils import json
 
@@ -56,20 +59,6 @@ def create_encryption_test_files(tmp_dir: str) -> tuple[Path, Path, Path]:
     return (tmp_priv_key_path, tmp_pub_key_path, tmp_tar_path)
 
 
-class FakeKeyManagementServiceClient:
-    """
-    Fake version of `KeyManagementServiceClient` that removes the two network calls we rely on: the
-    `Transport` setup on class construction, and the call to the hosted `asymmetric_decrypt`
-    endpoint.
-    """
-
-    asymmetric_decrypt = MagicMock()
-
-    @staticmethod
-    def crypto_key_version_path(**kwargs) -> str:
-        return KeyManagementServiceClient.crypto_key_version_path(**kwargs)
-
-
 class GoodCompareCommandTests(TestCase):
     """
     Test success cases of the `sentry compare` CLI command.

+ 527 - 6
tests/sentry/tasks/test_relocation.py

@@ -2,20 +2,47 @@ from functools import cached_property
 from io import BytesIO
 from pathlib import Path
 from tempfile import TemporaryDirectory
-from unittest.mock import Mock, patch
+from types import SimpleNamespace
+from unittest.mock import MagicMock, Mock, patch
 
 import pytest
+from google_crc32c import value as crc32c
 
-from sentry.backup.helpers import create_encrypted_export_tarball
+from sentry.backup.dependencies import NormalizedModelName, get_model_name
+from sentry.backup.helpers import (
+    create_encrypted_export_tarball,
+    decrypt_data_encryption_key_local,
+    decrypt_encrypted_tarball,
+    unwrap_encrypted_export_tarball,
+)
 from sentry.models.files.file import File
+from sentry.models.files.utils import get_storage
 from sentry.models.relocation import Relocation, RelocationFile
-from sentry.tasks.relocation import ERR_FILE_UPLOAD, MAX_FAST_TASK_RETRIES, uploading_complete
+from sentry.models.user import User
+from sentry.tasks.relocation import (
+    ERR_PREPROCESSING_DECRYPTION,
+    ERR_PREPROCESSING_INTERNAL,
+    ERR_PREPROCESSING_INVALID_JSON,
+    ERR_PREPROCESSING_INVALID_TARBALL,
+    ERR_PREPROCESSING_MISSING_ORGS,
+    ERR_PREPROCESSING_NO_ORGS,
+    ERR_PREPROCESSING_NO_USERS,
+    ERR_PREPROCESSING_TOO_MANY_ORGS,
+    ERR_PREPROCESSING_TOO_MANY_USERS,
+    ERR_UPLOADING_FAILED,
+    MAX_FAST_TASK_RETRIES,
+    preprocessing_baseline_config,
+    preprocessing_colliding_users,
+    preprocessing_complete,
+    preprocessing_scan,
+    uploading_complete,
+)
 from sentry.testutils.cases import TestCase
 from sentry.testutils.factories import get_fixture_path
-from sentry.testutils.helpers.backups import generate_rsa_key_pair
+from sentry.testutils.helpers.backups import FakeKeyManagementServiceClient, generate_rsa_key_pair
 from sentry.testutils.silo import region_silo_test
 from sentry.utils import json
-from sentry.utils.relocation import RELOCATION_FILE_TYPE
+from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE
 
 
 class RelocationTaskTestCase(TestCase):
@@ -64,6 +91,37 @@ class RelocationTaskTestCase(TestCase):
 
             return file
 
+    def swap_file(
+        self, file: File, fixture_name: str, blob_size: int = RELOCATION_BLOB_SIZE
+    ) -> None:
+        with TemporaryDirectory() as tmp_dir:
+            tmp_priv_key_path = Path(tmp_dir).joinpath("key")
+            tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub")
+            with open(tmp_priv_key_path, "wb") as f:
+                f.write(self.priv_key_pem)
+            with open(tmp_pub_key_path, "wb") as f:
+                f.write(self.pub_key_pem)
+            with open(get_fixture_path("backup", fixture_name)) as f:
+                data = json.load(f)
+                with open(tmp_pub_key_path, "rb") as p:
+                    self.tarball = create_encrypted_export_tarball(data, p).getvalue()
+                    file.putfile(BytesIO(self.tarball), blob_size=blob_size)
+
+    def mock_kms_client(self, fake_kms_client: FakeKeyManagementServiceClient):
+        fake_kms_client.asymmetric_decrypt.call_count = 0
+        fake_kms_client.get_public_key.call_count = 0
+
+        unwrapped = unwrap_encrypted_export_tarball(BytesIO(self.tarball))
+        plaintext_dek = decrypt_data_encryption_key_local(unwrapped, self.priv_key_pem)
+
+        fake_kms_client.asymmetric_decrypt.return_value = SimpleNamespace(
+            plaintext=plaintext_dek,
+            plaintext_crc32c=crc32c(plaintext_dek),
+        )
+        fake_kms_client.get_public_key.return_value = SimpleNamespace(
+            pem=self.pub_key_pem.decode("utf-8")
+        )
+
 
 @patch("sentry.tasks.relocation.preprocessing_scan.delay")
 @region_silo_test
@@ -86,6 +144,7 @@ class UploadingCompleteTest(RelocationTaskTestCase):
         assert preprocessing_scan_mock.call_count == 0
 
     def test_fail_if_no_attempts_left(self, preprocessing_scan_mock: Mock):
+        self.relocation.latest_task = "UPLOADING_COMPLETE"
         self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
         self.relocation.save()
         RelocationFile.objects.filter(relocation=self.relocation).delete()
@@ -94,5 +153,467 @@ class UploadingCompleteTest(RelocationTaskTestCase):
 
         relocation = Relocation.objects.get(uuid=self.uuid)
         assert relocation.status == Relocation.Status.FAILURE.value
-        assert relocation.failure_reason == ERR_FILE_UPLOAD
+        assert relocation.failure_reason == ERR_UPLOADING_FAILED
         assert preprocessing_scan_mock.call_count == 0
+
+
+@patch(
+    "sentry.backup.helpers.KeyManagementServiceClient",
+    new_callable=lambda: FakeKeyManagementServiceClient,
+)
+@patch("sentry.tasks.relocation.preprocessing_baseline_config.delay")
+@region_silo_test
+class PreprocessingScanTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.PREPROCESSING.value
+        self.relocation.latest_task = "UPLOADING_COMPLETE"
+        self.relocation.save()
+
+    def test_success(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        assert fake_kms_client.asymmetric_decrypt.call_count == 1
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_baseline_config_mock.call_count == 1
+        assert Relocation.objects.get(uuid=self.uuid).want_usernames == ["testing@example.com"]
+
+    def test_retry_if_attempts_left(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.mock_kms_client(fake_kms_client)
+            preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_if_no_attempts_left(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.relocation.latest_task = "PREPROCESSING_SCAN"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INTERNAL
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_invalid_tarball(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        file = RelocationFile.objects.get(relocation=self.relocation).file
+        corrupted_tarball_bytes = bytearray(file.getfile().read())[9:]
+        file.putfile(BytesIO(bytes(corrupted_tarball_bytes)))
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INVALID_TARBALL
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_decryption_failure(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        # Add invalid 2-octet UTF-8 sequence to the returned plaintext.
+        self.mock_kms_client(fake_kms_client)
+        fake_kms_client.asymmetric_decrypt.return_value.plaintext += b"\xc3\x28"
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_DECRYPTION
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_invalid_json(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        file = RelocationFile.objects.get(relocation=self.relocation).file
+        self.swap_file(file, "invalid-user.json")
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INVALID_JSON
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_no_users(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        file = RelocationFile.objects.get(relocation=self.relocation).file
+        self.swap_file(file, "single-option.json")
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_NO_USERS
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    @patch("sentry.tasks.relocation.MAX_USERS_PER_RELOCATION", 0)
+    def test_fail_too_many_users(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_TOO_MANY_USERS.substitute(count=1)
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_no_orgs(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        file = RelocationFile.objects.get(relocation=self.relocation).file
+        self.swap_file(file, "user-with-minimum-privileges.json")
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_NO_ORGS
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    @patch("sentry.tasks.relocation.MAX_ORGS_PER_RELOCATION", 0)
+    def test_fail_too_many_orgs(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_TOO_MANY_ORGS.substitute(count=1)
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+    def test_fail_missing_orgs(
+        self,
+        preprocessing_baseline_config_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        orgs = ["does-not-exist"]
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        relocation.want_org_slugs = orgs
+        relocation.save()
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_scan(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_MISSING_ORGS.substitute(
+            orgs=",".join(orgs)
+        )
+        assert preprocessing_baseline_config_mock.call_count == 0
+
+
+@patch(
+    "sentry.backup.helpers.KeyManagementServiceClient",
+    new_callable=lambda: FakeKeyManagementServiceClient,
+)
+@patch("sentry.tasks.relocation.preprocessing_colliding_users.delay")
+@region_silo_test
+class PreprocessingBaselineConfigTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.PREPROCESSING.value
+        self.relocation.latest_task = "PREPROCESSING_SCAN"
+        self.relocation.save()
+
+    def test_success(
+        self,
+        preprocessing_colliding_users_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_baseline_config(self.relocation.uuid)
+
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 1
+        assert preprocessing_colliding_users_mock.call_count == 1
+
+        relocation_file = (
+            RelocationFile.objects.filter(
+                relocation=self.relocation,
+                kind=RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA.value,
+            )
+            .select_related("file")
+            .first()
+        )
+        assert relocation_file.file.name == "baseline-config.tar"
+
+        with relocation_file.file.getfile() as fp:
+            json_models = json.loads(
+                decrypt_encrypted_tarball(fp, False, BytesIO(self.priv_key_pem))
+            )
+        assert len(json_models) > 0
+
+        # Only user `superuser` is an admin, so only they should be exported.
+        for json_model in json_models:
+            if NormalizedModelName(json_model["model"]) == get_model_name(User):
+                assert json_model["fields"]["username"] in "superuser"
+
+    @patch(
+        "sentry.tasks.relocation.get_public_key_using_gcp_kms",
+        MagicMock(side_effect=Exception("Test")),
+    )
+    def test_retry_if_attempts_left(
+        self,
+        preprocessing_colliding_users_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.mock_kms_client(fake_kms_client)
+            preprocessing_baseline_config(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_colliding_users_mock.call_count == 0
+
+    @patch(
+        "sentry.tasks.relocation.get_public_key_using_gcp_kms",
+        MagicMock(side_effect=Exception("Test")),
+    )
+    def test_fail_if_no_attempts_left(
+        self,
+        preprocessing_colliding_users_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.relocation.latest_task = "PREPROCESSING_BASELINE_CONFIG"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_baseline_config(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INTERNAL
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_colliding_users_mock.call_count == 0
+
+
+@patch(
+    "sentry.backup.helpers.KeyManagementServiceClient",
+    new_callable=lambda: FakeKeyManagementServiceClient,
+)
+@patch("sentry.tasks.relocation.preprocessing_complete.delay")
+@region_silo_test
+class PreprocessingCollidingUsersTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.PREPROCESSING.value
+        self.relocation.latest_task = "PREPROCESSING_BASELINE_CONFIG"
+        self.relocation.want_usernames = ["a", "b", "c"]
+        self.relocation.save()
+
+        self.create_user("c")
+        self.create_user("d")
+        self.create_user("e")
+
+    def test_success(
+        self,
+        preprocessing_complete_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.mock_kms_client(fake_kms_client)
+        preprocessing_colliding_users(self.relocation.uuid)
+
+        assert preprocessing_complete_mock.call_count == 1
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 1
+
+        relocation_file = (
+            RelocationFile.objects.filter(
+                relocation=self.relocation,
+                kind=RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA.value,
+            )
+            .select_related("file")
+            .first()
+        )
+        assert relocation_file.file.name == "colliding-users.tar"
+
+        with relocation_file.file.getfile() as fp:
+            json_models = json.loads(
+                decrypt_encrypted_tarball(fp, False, BytesIO(self.priv_key_pem))
+            )
+        assert len(json_models) > 0
+
+        # Only user `c` was colliding, so only they should be exported.
+        for json_model in json_models:
+            if NormalizedModelName(json_model["model"]) == get_model_name(User):
+                assert json_model["fields"]["username"] == "c"
+
+    @patch(
+        "sentry.tasks.relocation.get_public_key_using_gcp_kms",
+        MagicMock(side_effect=Exception("Test")),
+    )
+    def test_retry_if_attempts_left(
+        self,
+        preprocessing_complete_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.mock_kms_client(fake_kms_client)
+            preprocessing_colliding_users(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_complete_mock.call_count == 0
+
+    @patch(
+        "sentry.tasks.relocation.get_public_key_using_gcp_kms",
+        MagicMock(side_effect=Exception("Test")),
+    )
+    def test_fail_if_no_attempts_left(
+        self,
+        preprocessing_complete_mock: Mock,
+        fake_kms_client: FakeKeyManagementServiceClient,
+    ):
+        self.relocation.latest_task = "PREPROCESSING_COLLIDING_USERS"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+        self.mock_kms_client(fake_kms_client)
+
+        preprocessing_colliding_users(self.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INTERNAL
+        assert fake_kms_client.asymmetric_decrypt.call_count == 0
+        assert fake_kms_client.get_public_key.call_count == 0
+        assert preprocessing_complete_mock.call_count == 0
+
+
+@patch("sentry.tasks.relocation.validating_start.delay")
+@region_silo_test
+class PreprocessingCompleteTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.PREPROCESSING.value
+        self.relocation.latest_task = "PREPROCESSING_COLLIDING_USERS"
+        self.relocation.want_usernames = ["importing"]
+        self.relocation.save()
+        self.create_user("importing")
+        self.storage = get_storage()
+
+        file = File.objects.create(name="baseline-config.tar", type=RELOCATION_FILE_TYPE)
+        self.swap_file(file, "single-option.json", blob_size=16384)  # No chunking
+        RelocationFile.objects.create(
+            relocation=self.relocation,
+            file=file,
+            kind=RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA.value,
+        )
+        assert file.blobs.count() == 1  # So small that chunking is unnecessary.
+
+        file = File.objects.create(name="colliding-users.tar", type=RELOCATION_FILE_TYPE)
+        self.swap_file(file, "user-with-maximum-privileges.json", blob_size=8192)  # Forces chunks
+        RelocationFile.objects.create(
+            relocation=self.relocation,
+            file=file,
+            kind=RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA.value,
+        )
+        assert file.blobs.count() > 1  # A bit bigger, so we get chunks.
+
+    def test_success(self, validating_start_mock: Mock):
+        assert not self.storage.exists(f"relocations/runs/{self.relocation.uuid}")
+
+        preprocessing_complete(self.relocation.uuid)
+
+        self.relocation.refresh_from_db()
+        assert validating_start_mock.call_count == 1
+
+        (_, files) = self.storage.listdir(f"relocations/runs/{self.relocation.uuid}/in")
+        assert len(files) == 3
+        assert "raw-relocation-data.tar" in files
+        assert "baseline-config.tar" in files
+        assert "colliding-users.tar" in files
+
+    def test_retry_if_attempts_left(self, validating_start_mock: Mock):
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            preprocessing_complete(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert validating_start_mock.call_count == 0
+
+    def test_fail_if_no_attempts_left(self, validating_start_mock: Mock):
+        self.relocation.latest_task = "PREPROCESSING_COMPLETE"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        preprocessing_complete(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_PREPROCESSING_INTERNAL
+        assert validating_start_mock.call_count == 0

Some files were not shown because too many files changed in this diff