Просмотр исходного кода

feat(backup): Validation steps for relocation tasks (#59382)

These steps start, poll, and resolve a single Google CloudBuild
validation run.

Issue: getsentry/team-ospo#203
Alex Zaslavsky 1 год назад
Родитель
Сommit
1b2c7887df

+ 1 - 0
requirements-base.txt

@@ -20,6 +20,7 @@ email-reply-parser>=0.5.12
 google-api-core>=2.12.0
 google-auth>=2.4.0
 google-cloud-bigtable>=2.11.3
+google-cloud-build>=3.20.1
 google-cloud-core>=2.3.2
 google-cloud-functions>=1.8.1
 google-cloud-kms>=2.19.1

+ 1 - 0
requirements-dev-frozen.txt

@@ -60,6 +60,7 @@ frozenlist==1.3.3
 google-api-core==2.12.0
 google-auth==2.16.0
 google-cloud-bigtable==2.11.3
+google-cloud-build==3.20.1
 google-cloud-core==2.3.2
 google-cloud-functions==1.8.1
 google-cloud-kms==2.19.1

+ 1 - 0
requirements-frozen.txt

@@ -44,6 +44,7 @@ frozenlist==1.3.3
 google-api-core==2.12.0
 google-auth==2.16.0
 google-cloud-bigtable==2.11.3
+google-cloud-build==3.20.1
 google-cloud-core==2.3.2
 google-cloud-functions==1.8.1
 google-cloud-kms==2.19.1

+ 415 - 15
src/sentry/tasks/relocation.py

@@ -1,12 +1,20 @@
 from __future__ import annotations
 
 import logging
+import re
+from datetime import datetime, timedelta, timezone
 from io import BytesIO
 from string import Template
 from typing import Optional
+from zipfile import ZipFile
 
+import yaml
 from cryptography.fernet import Fernet
+from django.db import router
+from google.cloud.devtools.cloudbuild_v1 import Build
+from google.cloud.devtools.cloudbuild_v1 import CloudBuildClient as CloudBuildClient
 
+from sentry.api.serializers.rest_framework.base import camel_to_snake_case, convert_dict_key_case
 from sentry.backup.dependencies import NormalizedModelName, get_model
 from sentry.backup.exports import export_in_config_scope, export_in_user_scope
 from sentry.backup.helpers import (
@@ -19,17 +27,26 @@ from sentry.filestore.gcs import GoogleCloudStorage
 from sentry.models.files.file import File
 from sentry.models.files.utils import get_storage
 from sentry.models.organization import Organization
-from sentry.models.relocation import Relocation, RelocationFile
+from sentry.models.relocation import (
+    Relocation,
+    RelocationFile,
+    RelocationValidation,
+    RelocationValidationAttempt,
+    ValidationStatus,
+)
 from sentry.models.user import User
 from sentry.silo import SiloMode
 from sentry.tasks.base import instrumented_task
 from sentry.utils import json
+from sentry.utils.db import atomic_transaction
+from sentry.utils.env import gcp_project_id
 from sentry.utils.relocation import (
     RELOCATION_BLOB_SIZE,
     RELOCATION_FILE_TYPE,
     OrderedTask,
     create_cloudbuild_yaml,
     fail_relocation,
+    get_bucket_name,
     retry_task_or_fail_relocation,
     start_relocation_task,
 )
@@ -37,13 +54,18 @@ from sentry.utils.relocation import (
 logger = logging.getLogger(__name__)
 
 # Time limits for various steps in the process.
-RETRY_BACKOFF = 60  # So the 1st retry is after ~1 min, 2nd after ~2 min, 3rd after ~4 min.
-UPLOADING_TIME_LIMIT = 60  # This should be quick - we're just pinging the DB, then GCS.
-PREPROCESSING_TIME_LIMIT = 60 * 5  # 5 minutes is plenty for all preprocessing task attempts.
+RETRY_BACKOFF = 60  # So the 1st retry is after ~1 min, 2nd after ~2 min, 3rd after ~4 min, etc.
+FAST_TIME_LIMIT = 60
+MEDIUM_TIME_LIMIT = 60 * 5
+SLOW_TIME_LIMIT = 60 * 30
+DEFAULT_VALIDATION_TIMEOUT = timedelta(minutes=60)
 
 # All pre and post processing tasks have the same number of retries.
 MAX_FAST_TASK_RETRIES = 2
 MAX_FAST_TASK_ATTEMPTS = MAX_FAST_TASK_RETRIES + 1
+MAX_VALIDATION_POLLS = 60
+MAX_VALIDATION_POLL_ATTEMPTS = MAX_VALIDATION_POLLS + 1
+MAX_VALIDATION_RUNS = 3
 
 # Some reasonable limits on the amount of data we import - we can adjust these as needed.
 MAX_ORGS_PER_RELOCATION = 20
@@ -75,6 +97,11 @@ ERR_PREPROCESSING_MISSING_ORGS = Template(
     "The following organization slug imports were requested, but could not be found in your submitted JSON: $orgs."
 )
 
+ERR_VALIDATING_ATTEMPT_MISSING = "Internal error during validating - validation attempt missing."
+ERR_VALIDATING_INSTANCE_MISSING = "Internal error during validating - validation instance missing."
+ERR_VALIDATING_INTERNAL = "Internal error during validating."
+ERR_VALIDATING_MAX_RUNS = "All validation attempts timed out."
+
 
 # TODO(getsentry/team-ospo#203): We should split this task in two, one for "small" imports of say
 # <=10MB, and one for large imports >10MB. Then we should limit the number of daily executions of
@@ -85,7 +112,7 @@ ERR_PREPROCESSING_MISSING_ORGS = Template(
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=UPLOADING_TIME_LIMIT,
+    soft_time_limit=FAST_TIME_LIMIT,
 )
 def uploading_complete(uuid: str) -> None:
     """
@@ -134,7 +161,7 @@ def uploading_complete(uuid: str) -> None:
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
 def preprocessing_scan(uuid: str) -> None:
@@ -282,7 +309,7 @@ def preprocessing_scan(uuid: str) -> None:
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
 def preprocessing_baseline_config(uuid: str) -> None:
@@ -336,7 +363,7 @@ def preprocessing_baseline_config(uuid: str) -> None:
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
 def preprocessing_colliding_users(uuid: str) -> None:
@@ -389,7 +416,7 @@ def preprocessing_colliding_users(uuid: str) -> None:
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
 def preprocessing_complete(uuid: str) -> None:
@@ -420,9 +447,18 @@ def preprocessing_complete(uuid: str) -> None:
     ):
         storage = get_storage()
 
-        # Build the `cloudbuild.yaml` file we'll use for validation.
-        cloudbuild_yaml = create_cloudbuild_yaml(relocation)
-        storage.save(f"relocations/runs/{uuid}/conf/cloudbuild.yaml", BytesIO(cloudbuild_yaml))
+        # Build the `cloudbuild.yaml` file we'll use for validation. CloudBuild requires the storage
+        # source to be zipped, even if it is only a single yaml file.
+        cloudbuild_yaml = BytesIO(create_cloudbuild_yaml(relocation))
+        cloudbuild_zip = BytesIO()
+        with ZipFile(cloudbuild_zip, "w") as zf:
+            zf.writestr("cloudbuild.yaml", cloudbuild_yaml.read())
+
+        # Save the ZIP archive to remote storage, so that we may build from it.
+        cloudbuild_yaml.seek(0)
+        cloudbuild_zip.seek(0)
+        storage.save(f"relocations/runs/{uuid}/conf/cloudbuild.yaml", cloudbuild_yaml)
+        storage.save(f"relocations/runs/{uuid}/conf/cloudbuild.zip", cloudbuild_zip)
 
         # Upload the `key-config.json` file we'll use to identify the correct KMS resource use
         # during validation.
@@ -456,18 +492,143 @@ def preprocessing_complete(uuid: str) -> None:
                 fp.seek(0)
                 storage.save(path, fp)
 
-        relocation.step = Relocation.Step.VALIDATING.value
-        relocation.save()
+        with atomic_transaction(
+            using=(router.db_for_write(Relocation), router.db_for_write(RelocationValidation))
+        ):
+            relocation.step = Relocation.Step.VALIDATING.value
+            relocation.save()
+            RelocationValidation.objects.create(relocation=relocation)
+
         validating_start.delay(uuid)
 
 
+def _get_relocation_validation(
+    relocation: Relocation, task: OrderedTask
+) -> RelocationValidation | None:
+    try:
+        return RelocationValidation.objects.get(relocation=relocation)
+    except RelocationValidation.DoesNotExist:
+        fail_relocation(
+            relocation,
+            task,
+            ERR_VALIDATING_INSTANCE_MISSING,
+        )
+        return None
+
+
+def _get_relocation_validation_attempt(
+    relocation: Relocation,
+    relocation_validation: RelocationValidation,
+    build_id: str,
+    task: OrderedTask,
+) -> RelocationValidationAttempt | None:
+    try:
+        return RelocationValidationAttempt.objects.get(
+            relocation=relocation, relocation_validation=relocation_validation, build_id=build_id
+        )
+    except RelocationValidationAttempt.DoesNotExist:
+        fail_relocation(
+            relocation,
+            task,
+            ERR_VALIDATING_ATTEMPT_MISSING,
+        )
+        return None
+
+
+def _update_relocation_validation_attempt(
+    task: OrderedTask,
+    relocation: Relocation,
+    relocation_validation: RelocationValidation,
+    relocation_validation_attempt: RelocationValidationAttempt,
+    status: ValidationStatus,
+) -> None:
+    """
+    After a `RelocationValidationAttempt` resolves, make sure to update the owning
+    `RelocationValidation` and `Relocation` as well.
+    """
+
+    with atomic_transaction(
+        using=(
+            router.db_for_write(Relocation),
+            router.db_for_write(RelocationValidation),
+            router.db_for_write(RelocationValidationAttempt),
+        )
+    ):
+        # If no interesting status updates occurred, check again in a minute.
+        if status == ValidationStatus.IN_PROGRESS:
+            logger.info(
+                "Validation polling: scheduled",
+                extra={"uuid": relocation.uuid, "task": task.name},
+            )
+            validating_poll.apply_async(
+                args=[relocation.uuid, str(relocation_validation_attempt.build_id)], countdown=60
+            )
+            return
+
+        relocation_validation_attempt.status = status.value
+
+        # These statuses merit failing this attempt and kicking off a new
+        # `RelocationValidationAttempt`, if possible.
+        if status in {ValidationStatus.TIMEOUT, ValidationStatus.FAILURE}:
+            if relocation_validation.attempts < MAX_VALIDATION_POLL_ATTEMPTS:
+                relocation_validation_attempt.status = status.value
+                relocation_validation_attempt.save()
+
+                relocation.latest_task = OrderedTask.VALIDATING_START.name
+                relocation.save()
+
+                logger.info(
+                    "Validation timed out",
+                    extra={"uuid": relocation.uuid, "task": task.name},
+                )
+                validating_start.delay(relocation.uuid)
+                return
+
+            # Always accept the numerically higher `ValidationStatus`, since that is a more definite
+            # result.
+            if relocation_validation.status < status.value:
+                relocation_validation.status = status.value
+                relocation_validation_attempt.save()
+            return fail_relocation(
+                relocation, task, "Validation could not be completed. Please contact support."
+            )
+
+        # All remaining statuses are final, so we can update the owning `RelocationValidation` now.
+        assert status in {ValidationStatus.INVALID, ValidationStatus.VALID}
+        relocation_validation_attempt.status = status.value
+        relocation_validation_attempt.save()
+        relocation_validation.status = status.value
+        relocation_validation.save()
+
+        # If we've reached a definite status, resolve both the `RelocationValidation` and this
+        # constituent `RelocationValidationAttempt`.
+        if status == ValidationStatus.INVALID:
+            logger.info(
+                "Validation result: invalid",
+                extra={"uuid": relocation.uuid, "task": task.name},
+            )
+            return fail_relocation(
+                relocation, task, "The data you provided failed validation. Please contact support."
+            )
+
+        assert status == ValidationStatus.VALID
+        relocation.step = Relocation.Step.IMPORTING.value
+        relocation.save()
+
+        logger.info(
+            "Validation result: valid",
+            extra={"uuid": relocation.uuid, "task": task.name},
+        )
+        importing.delay(relocation.uuid)
+
+
 @instrumented_task(
     name="sentry.relocation.validating_start",
     queue="relocation",
     max_retries=MAX_FAST_TASK_RETRIES,
     retry_backoff=RETRY_BACKOFF,
     retry_backoff_jitter=True,
-    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
 def validating_start(uuid: str) -> None:
@@ -477,5 +638,244 @@ def validating_start(uuid: str) -> None:
     This function is meant to be idempotent, and should be retried with an exponential backoff.
     """
 
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.VALIDATING,
+        task=OrderedTask.VALIDATING_START,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    relocation_validation = _get_relocation_validation(relocation, OrderedTask.VALIDATING_START)
+    if relocation_validation is None:
+        return
+    if relocation_validation.attempts >= MAX_VALIDATION_RUNS:
+        fail_relocation(relocation, OrderedTask.VALIDATING_START, ERR_VALIDATING_MAX_RUNS)
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation, OrderedTask.VALIDATING_START, attempts_left, ERR_VALIDATING_INTERNAL
+    ):
+        cb_client = CloudBuildClient()
+
+        def camel_to_snake_keep_underscores(value):
+            match = re.search(r"(_+)$", value)
+            converted = camel_to_snake_case(value)
+            return converted + (match.group(0) if match else "")
+
+        cb_yaml = create_cloudbuild_yaml(relocation)
+        cb_conf = yaml.safe_load(cb_yaml)
+        build = Build(
+            source={
+                "storage_source": {
+                    "bucket": get_bucket_name(),
+                    "object_": f"relocations/runs/{uuid}/conf/cloudbuild.zip",
+                }
+            },
+            steps=convert_dict_key_case(cb_conf["steps"], camel_to_snake_keep_underscores),
+            artifacts=convert_dict_key_case(cb_conf["artifacts"], camel_to_snake_keep_underscores),
+            timeout=convert_dict_key_case(cb_conf["timeout"], camel_to_snake_keep_underscores),
+            options=convert_dict_key_case(cb_conf["options"], camel_to_snake_keep_underscores),
+        )
+        response = cb_client.create_build(project_id=gcp_project_id(), build=build)
+
+        with atomic_transaction(
+            using=(
+                router.db_for_write(RelocationValidation),
+                router.db_for_write(RelocationValidationAttempt),
+            )
+        ):
+            relocation_validation.attempts += 1
+            relocation_validation.save()
+            RelocationValidationAttempt.objects.create(
+                relocation=relocation,
+                relocation_validation=relocation_validation,
+                build_id=response.metadata.build.id,
+            )
+
+        validating_poll.delay(uuid, response.metadata.build.id)
+
+
+@instrumented_task(
+    name="sentry.relocation.validating_poll",
+    queue="relocation",
+    max_retries=MAX_VALIDATION_POLLS,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=FAST_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def validating_poll(uuid: str, build_id: str) -> None:
+    """
+    Checks the progress of a Google CloudBuild validation run.
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.VALIDATING,
+        task=OrderedTask.VALIDATING_POLL,
+        allowed_task_attempts=MAX_VALIDATION_POLL_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    relocation_validation = _get_relocation_validation(relocation, OrderedTask.VALIDATING_POLL)
+    if relocation_validation is None:
+        return
+
+    relocation_validation_attempt = _get_relocation_validation_attempt(
+        relocation, relocation_validation, build_id, OrderedTask.VALIDATING_POLL
+    )
+    if relocation_validation_attempt is None:
+        return
+
+    logger.info(
+        "Validation polling: active",
+        extra={
+            "uuid": relocation.uuid,
+            "task": OrderedTask.VALIDATING_POLL.name,
+            "build_id": build_id,
+        },
+    )
+    with retry_task_or_fail_relocation(
+        relocation, OrderedTask.VALIDATING_POLL, attempts_left, ERR_VALIDATING_INTERNAL
+    ):
+        cb_client = CloudBuildClient()
+        build = cb_client.get_build(project_id=gcp_project_id(), id=str(build_id))
+        if build.status == Build.Status.SUCCESS:
+            validating_complete.delay(uuid, str(build_id))
+            return
+
+        if build.status in {
+            Build.Status.FAILURE,
+            Build.Status.INTERNAL_ERROR,
+            Build.Status.CANCELLED,
+        }:
+            return _update_relocation_validation_attempt(
+                OrderedTask.VALIDATING_POLL,
+                relocation,
+                relocation_validation,
+                relocation_validation_attempt,
+                ValidationStatus.FAILURE,
+            )
+
+        date_added = (
+            relocation_validation_attempt.date_added
+            if relocation_validation_attempt.date_added is not None
+            else datetime.fromtimestamp(0)
+        )
+        timeout_limit = datetime.now(tz=timezone.utc) - DEFAULT_VALIDATION_TIMEOUT
+        if (
+            build.status in {Build.Status.TIMEOUT, Build.Status.EXPIRED}
+            or date_added < timeout_limit
+        ):
+            return _update_relocation_validation_attempt(
+                OrderedTask.VALIDATING_POLL,
+                relocation,
+                relocation_validation,
+                relocation_validation_attempt,
+                ValidationStatus.TIMEOUT,
+            )
+
+        return _update_relocation_validation_attempt(
+            OrderedTask.VALIDATING_POLL,
+            relocation,
+            relocation_validation,
+            relocation_validation_attempt,
+            ValidationStatus.IN_PROGRESS,
+        )
+
+
+@instrumented_task(
+    name="sentry.relocation.validating_complete",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=FAST_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def validating_complete(uuid: str, build_id: str) -> None:
+    """
+    Wraps up a validation run, and reports on what we found. If this task is being called, the
+    CloudBuild run as completed successfully, so we just need to figure out if there were any
+    findings (failure) or not (success).
+
+    This function is meant to be idempotent, and should be retried with an exponential backoff.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_relocation_task(
+        uuid=uuid,
+        step=Relocation.Step.VALIDATING,
+        task=OrderedTask.VALIDATING_COMPLETE,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    relocation_validation = _get_relocation_validation(relocation, OrderedTask.VALIDATING_COMPLETE)
+    if relocation_validation is None:
+        return
+
+    relocation_validation_attempt = _get_relocation_validation_attempt(
+        relocation, relocation_validation, build_id, OrderedTask.VALIDATING_COMPLETE
+    )
+    if relocation_validation_attempt is None:
+        return
+
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.VALIDATING_COMPLETE,
+        attempts_left,
+        ERR_VALIDATING_INTERNAL,
+    ):
+        storage = get_storage()
+        final_status = ValidationStatus.VALID
+        (_, findings_files) = storage.listdir(f"relocations/runs/{uuid}/findings")
+        for file in sorted(findings_files, reverse=True):
+            # Ignore files prefixed with `artifacts-`, as these are generated by CloudBuild.
+            if file.startswith("artifacts-"):
+                continue
+
+            findings_file = storage.open(f"relocations/runs/{uuid}/findings/{file}")
+            with findings_file:
+                findings = json.load(findings_file)
+                if len(findings) > 0:
+                    final_status = ValidationStatus.INVALID
+                    break
+
+        return _update_relocation_validation_attempt(
+            OrderedTask.VALIDATING_COMPLETE,
+            relocation,
+            relocation_validation,
+            relocation_validation_attempt,
+            final_status,
+        )
+
+
+@instrumented_task(
+    name="sentry.relocation.importing",
+    queue="relocation",
+    max_retries=0,
+    soft_time_limit=SLOW_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def importing(uuid: str) -> None:
+    """
+    Perform the import on the actual live instance we are targeting.
+
+    This function is NOT idempotent - if an import breaks, we should just abandon it rather than
+    trying it again!
+    """
+
     # TODO(getsentry/team-ospo#203): Implement this.
     pass

+ 15 - 6
src/sentry/utils/relocation.py

@@ -27,6 +27,9 @@ class OrderedTask(Enum):
     PREPROCESSING_COLLIDING_USERS = 4
     PREPROCESSING_COMPLETE = 5
     VALIDATING_START = 6
+    VALIDATING_POLL = 7
+    VALIDATING_COMPLETE = 8
+    IMPORTING = 9
 
 
 # The file type for a relocation export tarball of any kind.
@@ -387,6 +390,17 @@ def get_docker_compose_run():
     )
 
 
+@lru_cache(maxsize=1)
+def get_bucket_name():
+    """
+    When using the local FileSystemStorage (ie, in tests), we use a contrived bucket name, since
+    this is really just an alias for a bespoke local directory in that case.
+    """
+
+    storage = get_storage()
+    return "default" if getattr(storage, "bucket_name", None) is None else storage.bucket_name
+
+
 def create_cloudbuild_yaml(relocation: Relocation) -> bytes:
     # Only test existing users for collision and mutation.
     existing_usernames = User.objects.filter(username__in=relocation.want_usernames).values_list(
@@ -397,12 +411,7 @@ def create_cloudbuild_yaml(relocation: Relocation) -> bytes:
         ",".join(existing_usernames) if existing_usernames else ",",
     ]
     filter_org_slugs_args = ["--filter-org-slugs", ",".join(relocation.want_org_slugs)]
-    storage = get_storage()
-    bucket_root = (
-        "gs://default"
-        if getattr(storage, "bucket_name", None) is None
-        else (f"gs://{storage.bucket_name}")
-    )
+    bucket_root = f"gs://{get_bucket_name()}"
 
     validation_steps = [
         create_cloudbuild_validation_step(

+ 2 - 2
tests/sentry/tasks/snapshots/PreprocessingCompleteTest/test_success.pysnap

@@ -5,7 +5,7 @@ source: tests/sentry/tasks/test_relocation.py
 ---
 artifacts:
   objects:
-    location: gs://<BUCKET>/runs/<UUID>/findings/
+    location: gs://<BUCKET>/relocations/runs/<UUID>/findings/
     paths:
     - /workspace/findings/**
 options:
@@ -16,7 +16,7 @@ steps:
 - args:
   - cp
   - -r
-  - gs://<BUCKET>/runs/<UUID>/in
+  - gs://<BUCKET>/relocations/runs/<UUID>/in
   - .
   id: copy-inputs-being-validated
   name: gcr.io/cloud-builders/gsutil

+ 430 - 4
tests/sentry/tasks/test_relocation.py

@@ -4,9 +4,11 @@ from pathlib import Path
 from tempfile import TemporaryDirectory
 from types import SimpleNamespace
 from unittest.mock import MagicMock, Mock, patch
+from uuid import uuid4
 
 import pytest
 import yaml
+from google.cloud.devtools.cloudbuild_v1 import Build
 from google_crc32c import value as crc32c
 
 from sentry.backup.dependencies import NormalizedModelName, get_model_name
@@ -18,7 +20,13 @@ from sentry.backup.helpers import (
 )
 from sentry.models.files.file import File
 from sentry.models.files.utils import get_storage
-from sentry.models.relocation import Relocation, RelocationFile
+from sentry.models.relocation import (
+    Relocation,
+    RelocationFile,
+    RelocationValidation,
+    RelocationValidationAttempt,
+    ValidationStatus,
+)
 from sentry.models.user import User
 from sentry.tasks.relocation import (
     ERR_PREPROCESSING_DECRYPTION,
@@ -31,12 +39,18 @@ from sentry.tasks.relocation import (
     ERR_PREPROCESSING_TOO_MANY_ORGS,
     ERR_PREPROCESSING_TOO_MANY_USERS,
     ERR_UPLOADING_FAILED,
+    ERR_VALIDATING_INTERNAL,
+    ERR_VALIDATING_MAX_RUNS,
     MAX_FAST_TASK_RETRIES,
+    MAX_VALIDATION_POLLS,
     preprocessing_baseline_config,
     preprocessing_colliding_users,
     preprocessing_complete,
     preprocessing_scan,
     uploading_complete,
+    validating_complete,
+    validating_poll,
+    validating_start,
 )
 from sentry.testutils.cases import TestCase
 from sentry.testutils.factories import get_fixture_path
@@ -46,6 +60,15 @@ from sentry.utils import json
 from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE
 
 
+class FakeCloudBuildClient:
+    """
+    Fake version of `CloudBuildClient` that removes the two network calls we rely on.
+    """
+
+    create_build = MagicMock()
+    get_build = MagicMock()
+
+
 class RelocationTaskTestCase(TestCase):
     def setUp(self):
         super().setUp()
@@ -119,9 +142,26 @@ class RelocationTaskTestCase(TestCase):
             plaintext=plaintext_dek,
             plaintext_crc32c=crc32c(plaintext_dek),
         )
+        fake_kms_client.asymmetric_decrypt.side_effect = None
+
         fake_kms_client.get_public_key.return_value = SimpleNamespace(
             pem=self.pub_key_pem.decode("utf-8")
         )
+        fake_kms_client.get_public_key.side_effect = None
+
+    def mock_cloudbuild_client(
+        self, fake_cloudbuild_client: FakeCloudBuildClient, status: Build.Status
+    ):
+        fake_cloudbuild_client.create_build.call_count = 0
+        fake_cloudbuild_client.get_build.call_count = 0
+
+        fake_cloudbuild_client.create_build.return_value = SimpleNamespace(
+            metadata=SimpleNamespace(build=SimpleNamespace(id=uuid4().hex))
+        )
+        fake_cloudbuild_client.create_build.side_effect = None
+
+        fake_cloudbuild_client.get_build.return_value = SimpleNamespace(status=status)
+        fake_cloudbuild_client.get_build.side_effect = None
 
 
 @patch("sentry.tasks.relocation.preprocessing_scan.delay")
@@ -589,8 +629,9 @@ class PreprocessingCompleteTest(RelocationTaskTestCase):
         assert validating_start_mock.call_count == 1
 
         (_, files) = self.storage.listdir(f"relocations/runs/{self.relocation.uuid}/conf")
-        assert len(files) == 1
+        assert len(files) == 2
         assert "cloudbuild.yaml" in files
+        assert "cloudbuild.zip" in files
 
         cb_yaml_file = self.storage.open(
             f"relocations/runs/{self.relocation.uuid}/conf/cloudbuild.yaml"
@@ -607,8 +648,10 @@ class PreprocessingCompleteTest(RelocationTaskTestCase):
         assert findings_path == f"gs://default/relocations/runs/{self.relocation.uuid}/findings/"
 
         # Do a snapshot test of the cloudbuild config.
-        cb_conf["steps"][0]["args"][2] = "gs://<BUCKET>/runs/<UUID>/in"
-        cb_conf["artifacts"]["objects"]["location"] = "gs://<BUCKET>/runs/<UUID>/findings/"
+        cb_conf["steps"][0]["args"][2] = "gs://<BUCKET>/relocations/runs/<UUID>/in"
+        cb_conf["artifacts"]["objects"][
+            "location"
+        ] = "gs://<BUCKET>/relocations/runs/<UUID>/findings/"
         self.insta_snapshot(cb_conf)
 
         (_, files) = self.storage.listdir(f"relocations/runs/{self.relocation.uuid}/in")
@@ -622,6 +665,9 @@ class PreprocessingCompleteTest(RelocationTaskTestCase):
         with kms_file:
             json.load(kms_file)
 
+        assert self.relocation.step == Relocation.Step.VALIDATING.value
+        assert RelocationValidation.objects.filter(relocation=self.relocation).count() == 1
+
     def test_retry_if_attempts_left(self, validating_start_mock: Mock):
         RelocationFile.objects.filter(relocation=self.relocation).delete()
 
@@ -646,3 +692,383 @@ class PreprocessingCompleteTest(RelocationTaskTestCase):
         assert relocation.status == Relocation.Status.FAILURE.value
         assert relocation.failure_reason == ERR_PREPROCESSING_INTERNAL
         assert validating_start_mock.call_count == 0
+
+
+@patch(
+    "sentry.tasks.relocation.CloudBuildClient",
+    new_callable=lambda: FakeCloudBuildClient,
+)
+@patch("sentry.tasks.relocation.validating_poll.delay")
+@region_silo_test
+class ValidatingStartTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.VALIDATING.value
+        self.relocation.latest_task = "PREPROCESSING_COMPLETE"
+        self.relocation.want_usernames = ["testuser"]
+        self.relocation.want_org_slugs = ["test-slug"]
+        self.relocation.save()
+
+        self.relocation_validation: RelocationValidation = RelocationValidation.objects.create(
+            relocation=self.relocation
+        )
+
+    def test_success(
+        self,
+        validating_poll_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+
+        validating_start(self.relocation.uuid)
+
+        self.relocation.refresh_from_db()
+        self.relocation_validation.refresh_from_db()
+        assert validating_poll_mock.call_count == 1
+        assert fake_cloudbuild_client.create_build.call_count == 1
+        assert self.relocation_validation.attempts == 1
+        assert self.relocation_validation.status == ValidationStatus.IN_PROGRESS.value
+
+        relocation_validation_attempt = RelocationValidationAttempt.objects.get(
+            relocation_validation=self.relocation_validation
+        )
+        assert relocation_validation_attempt.status == ValidationStatus.IN_PROGRESS.value
+
+    def test_retry_if_attempts_left(
+        self,
+        validating_poll_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+            fake_cloudbuild_client.create_build.side_effect = Exception("Test")
+
+            validating_start(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert validating_poll_mock.call_count == 0
+
+    def test_fail_if_no_attempts_left(
+        self,
+        validating_poll_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        self.relocation.latest_task = "VALIDATING_START"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+        fake_cloudbuild_client.create_build.side_effect = Exception("Test")
+
+        validating_start(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_VALIDATING_INTERNAL
+        assert validating_poll_mock.call_count == 0
+
+    def test_fail_if_max_runs_attempted(
+        self,
+        validating_poll_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        for _ in range(3):
+            RelocationValidationAttempt.objects.create(
+                relocation=self.relocation,
+                relocation_validation=self.relocation_validation,
+                build_id=uuid4().hex,
+            )
+
+        self.relocation_validation.attempts = 3
+        self.relocation_validation.save()
+
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+
+        self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+        fake_cloudbuild_client.create_build.side_effect = Exception("Test")
+
+        validating_start(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_VALIDATING_MAX_RUNS
+        assert validating_poll_mock.call_count == 0
+
+
+@patch(
+    "sentry.tasks.relocation.CloudBuildClient",
+    new_callable=lambda: FakeCloudBuildClient,
+)
+@region_silo_test
+class ValidatingPollTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.VALIDATING.value
+        self.relocation.latest_task = "VALIDATING_START"
+        self.relocation.want_usernames = ["testuser"]
+        self.relocation.want_org_slugs = ["test-slug"]
+        self.relocation.save()
+
+        self.relocation_validation: RelocationValidation = RelocationValidation.objects.create(
+            relocation=self.relocation, attempts=1
+        )
+
+        self.relocation_validation_attempt: RelocationValidationAttempt = (
+            RelocationValidationAttempt.objects.create(
+                relocation=self.relocation,
+                relocation_validation=self.relocation_validation,
+                build_id=uuid4().hex,
+            )
+        )
+
+    @patch("sentry.tasks.relocation.validating_complete.delay")
+    def test_success(
+        self,
+        validating_complete_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.SUCCESS))
+
+        validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+        self.relocation.refresh_from_db()
+        self.relocation_validation.refresh_from_db()
+        self.relocation_validation_attempt.refresh_from_db()
+
+        assert validating_complete_mock.call_count == 1
+        assert fake_cloudbuild_client.get_build.call_count == 1
+        assert self.relocation.latest_task == "VALIDATING_POLL"
+        assert self.relocation_validation.status == ValidationStatus.IN_PROGRESS.value
+
+    @patch("sentry.tasks.relocation.validating_start.delay")
+    def test_timeout_starts_new_validation_attempt(
+        self,
+        validating_start_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        for stat in {Build.Status.TIMEOUT, Build.Status.EXPIRED}:
+            self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(stat))
+            validating_start_mock.call_count = 0
+
+            validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+            self.relocation.refresh_from_db()
+            self.relocation_validation.refresh_from_db()
+            self.relocation_validation_attempt.refresh_from_db()
+
+            assert validating_start_mock.call_count == 1
+            assert fake_cloudbuild_client.get_build.call_count == 1
+            assert self.relocation.latest_task == "VALIDATING_START"
+            assert self.relocation_validation.status == ValidationStatus.IN_PROGRESS.value
+            assert self.relocation_validation_attempt.status == ValidationStatus.TIMEOUT.value
+
+    @patch("sentry.tasks.relocation.validating_start.delay")
+    def test_failure_starts_new_validation_attempt(
+        self,
+        validating_start_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        for stat in {
+            Build.Status.FAILURE,
+            Build.Status.INTERNAL_ERROR,
+            Build.Status.CANCELLED,
+        }:
+            self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(stat))
+            validating_start_mock.call_count = 0
+
+            validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+            self.relocation.refresh_from_db()
+            self.relocation_validation.refresh_from_db()
+            self.relocation_validation_attempt.refresh_from_db()
+
+            assert validating_start_mock.call_count == 1
+            assert fake_cloudbuild_client.get_build.call_count == 1
+            assert self.relocation.latest_task == "VALIDATING_START"
+            assert self.relocation_validation.status == ValidationStatus.IN_PROGRESS.value
+            assert self.relocation_validation_attempt.status == ValidationStatus.FAILURE.value
+
+    @patch("sentry.tasks.relocation.validating_poll.apply_async")
+    def test_in_progress_retries_poll(
+        self,
+        validating_poll_mock: Mock,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        for stat in {
+            Build.Status.QUEUED,
+            Build.Status.PENDING,
+            Build.Status.WORKING,
+        }:
+            self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(stat))
+            validating_poll_mock.call_count = 0
+
+            validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+            self.relocation.refresh_from_db()
+            self.relocation_validation.refresh_from_db()
+            self.relocation_validation_attempt.refresh_from_db()
+
+            assert validating_poll_mock.call_count == 1
+            assert fake_cloudbuild_client.get_build.call_count == 1
+            assert self.relocation.latest_task == "VALIDATING_POLL"
+            assert self.relocation_validation.status == ValidationStatus.IN_PROGRESS.value
+            assert self.relocation_validation_attempt.status == ValidationStatus.IN_PROGRESS.value
+            assert (
+                RelocationValidationAttempt.objects.filter(
+                    relocation_validation=self.relocation_validation
+                ).count()
+                == 1
+            )
+
+    def test_retry_if_attempts_left(
+        self,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+            fake_cloudbuild_client.get_build.side_effect = Exception("Test")
+
+            validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+
+    def test_fail_if_no_attempts_left(
+        self,
+        fake_cloudbuild_client: FakeCloudBuildClient,
+    ):
+        self.relocation.latest_task = "VALIDATING_POLL"
+        self.relocation.latest_task_attempts = MAX_VALIDATION_POLLS
+        self.relocation.save()
+        self.mock_cloudbuild_client(fake_cloudbuild_client, Build.Status(Build.Status.QUEUED))
+        fake_cloudbuild_client.get_build.side_effect = Exception("Test")
+
+        validating_poll(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_VALIDATING_INTERNAL
+
+
+@patch("sentry.tasks.relocation.importing.delay")
+@region_silo_test
+class ValidatingCompleteTest(RelocationTaskTestCase):
+    def setUp(self):
+        super().setUp()
+        self.relocation.step = Relocation.Step.VALIDATING.value
+        self.relocation.latest_task = "VALIDATING_POLL"
+        self.relocation.want_usernames = ["testuser"]
+        self.relocation.want_org_slugs = ["test-slug"]
+        self.relocation.save()
+
+        self.relocation_validation: RelocationValidation = RelocationValidation.objects.create(
+            relocation=self.relocation, attempts=1
+        )
+
+        self.relocation_validation_attempt: RelocationValidationAttempt = (
+            RelocationValidationAttempt.objects.create(
+                relocation=self.relocation,
+                relocation_validation=self.relocation_validation,
+                build_id=uuid4().hex,
+            )
+        )
+
+        self.storage = get_storage()
+        self.storage.save(
+            f"relocations/runs/{self.relocation.uuid}/findings/artifacts-prefix-should-be-ignored.json",
+            BytesIO(b"invalid-json"),
+        )
+        files = [
+            "null.json",
+            "import-baseline-config.json",
+            "import-colliding-users.json",
+            "import-raw-relocation-data.json",
+            "export-baseline-config.json",
+            "export-colliding-users.json",
+            "export-raw-relocation-data.json",
+            "compare-baseline-config.json",
+            "compare-colliding-users.json",
+        ]
+        for file in files:
+            self.storage.save(
+                f"relocations/runs/{self.relocation.uuid}/findings/{file}", BytesIO(b"[]")
+            )
+
+    def test_valid(self, importing_mock: Mock):
+        validating_complete(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        self.relocation.refresh_from_db()
+        self.relocation_validation.refresh_from_db()
+        self.relocation_validation_attempt.refresh_from_db()
+
+        assert self.relocation.latest_task == "VALIDATING_COMPLETE"
+        assert self.relocation.step == Relocation.Step.IMPORTING.value
+        assert self.relocation_validation.status == ValidationStatus.VALID.value
+        assert self.relocation_validation_attempt.status == ValidationStatus.VALID.value
+        assert importing_mock.call_count == 1
+
+    def test_invalid(self, importing_mock: Mock):
+        self.storage.save(
+            f"relocations/runs/{self.relocation.uuid}/findings/import-baseline-config.json",
+            BytesIO(
+                b"""
+[
+    {
+        "finding": "RpcImportError",
+        "kind": "Unknown",
+        "left_pk": 2,
+        "on": {
+            "model": "sentry.email",
+            "ordinal": 1
+        },
+        "reason": "test reason",
+        "right_pk": 3
+    }
+]
+            """
+            ),
+        )
+
+        validating_complete(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        self.relocation.refresh_from_db()
+        self.relocation_validation.refresh_from_db()
+        self.relocation_validation_attempt.refresh_from_db()
+
+        assert self.relocation.latest_task == "VALIDATING_COMPLETE"
+        assert self.relocation.step == Relocation.Step.VALIDATING.value
+        assert self.relocation.failure_reason is not None
+        assert self.relocation_validation.status == ValidationStatus.INVALID.value
+        assert self.relocation_validation_attempt.status == ValidationStatus.INVALID.value
+        assert importing_mock.call_count == 0
+
+    def test_retry_if_attempts_left(self, _: Mock):
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            self.storage.save(
+                f"relocations/runs/{self.relocation.uuid}/findings/null.json",
+                BytesIO(b"invalid-json"),
+            )
+
+            validating_complete(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+
+    def test_fail_if_no_attempts_left(self, _: Mock):
+        self.relocation.latest_task = "VALIDATING_COMPLETE"
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        self.storage.save(
+            f"relocations/runs/{self.relocation.uuid}/findings/null.json", BytesIO(b"invalid-json")
+        )
+
+        validating_complete(self.relocation.uuid, self.relocation_validation_attempt.build_id)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_VALIDATING_INTERNAL