Browse Source

feat(backup): Uploading_complete relocation task (#59162)

This is the first task in the "chain" of tasks that a relocation needs
to complete. It merely checks that the upload to GCS has completed and
is resolvable - this is important, as future tasks will assume that this
information is available.

Issue: getsentry/team-ospo#203
Alex Zaslavsky 1 year ago
parent
commit
99de21b974

+ 4 - 0
.github/CODEOWNERS

@@ -94,8 +94,12 @@ Makefile                                                 @getsentry/owners-sentr
 /src/sentry/backup/                                      @getsentry/open-source
 /src/sentry/backup/                                      @getsentry/open-source
 /src/sentry/runner/commands/backup.py                    @getsentry/open-source
 /src/sentry/runner/commands/backup.py                    @getsentry/open-source
 /src/sentry/services/hybrid-cloud/import_export/         @getsentry/open-source
 /src/sentry/services/hybrid-cloud/import_export/         @getsentry/open-source
+/src/sentry/tasks/relocation.py                          @getsentry/open-source
 /src/sentry/testutils/helpers/backups.py                 @getsentry/open-source
 /src/sentry/testutils/helpers/backups.py                 @getsentry/open-source
+/src/sentry/utils/relocation.py                          @getsentry/open-source
 /tests/sentry/api/endpoints/test_relocation.py           @getsentry/open-source
 /tests/sentry/api/endpoints/test_relocation.py           @getsentry/open-source
+/tests/sentry/tasks/test_relocation.py                   @getsentry/open-source
+/tests/sentry/utils/test_relocation.py                   @getsentry/open-source
 /tests/sentry/backup                                     @getsentry/open-source
 /tests/sentry/backup                                     @getsentry/open-source
 /tests/sentry/runner/commands/test_backup.py             @getsentry/open-source
 /tests/sentry/runner/commands/test_backup.py             @getsentry/open-source
 
 

+ 4 - 13
src/sentry/api/endpoints/relocation.py

@@ -14,19 +14,9 @@ from sentry.models.files.file import File
 from sentry.models.relocation import Relocation, RelocationFile
 from sentry.models.relocation import Relocation, RelocationFile
 from sentry.models.user import MAX_USERNAME_LENGTH
 from sentry.models.user import MAX_USERNAME_LENGTH
 from sentry.services.hybrid_cloud.user.service import user_service
 from sentry.services.hybrid_cloud.user.service import user_service
+from sentry.tasks.relocation import uploading_complete
 from sentry.utils.db import atomic_transaction
 from sentry.utils.db import atomic_transaction
-
-# Relocation input files are uploaded as tarballs, and chunked and stored using the normal
-# `File`/`AbstractFile` mechanism, which has a hard limit of 2GiB, because we need to represent the
-# offset into it as a 32-bit int. This means that the largest tarball we are able to import at this
-# time is 2GiB. When validating this tarball, we will need to make a "composite object" from the
-# uploaded blobs in Google Cloud Storage, which has a limit of 32 components. Thus, we get our blob
-# size of the maximum overall file size (2GiB) divided by the maximum number of blobs (32): 64MiB
-# per blob.
-#
-# Note that the actual production file size limit, set by uwsgi, is currently 209715200 bytes, or
-# ~200MB, so we should never see more than ~4 blobs in
-RELOCATION_BLOB_SIZE = int((2**31) / 32)
+from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE
 
 
 ERR_DUPLICATE_RELOCATION = "An in-progress relocation already exists for this owner"
 ERR_DUPLICATE_RELOCATION = "An in-progress relocation already exists for this owner"
 ERR_FEATURE_DISABLED = "This feature is not yet enabled"
 ERR_FEATURE_DISABLED = "This feature is not yet enabled"
@@ -94,7 +84,7 @@ class RelocationEndpoint(Endpoint):
         # TODO(getsentry/team-ospo#203): check import size, and maybe do throttle based on that
         # TODO(getsentry/team-ospo#203): check import size, and maybe do throttle based on that
         # information.
         # information.
 
 
-        file = File.objects.create(name="raw-relocation-data.tar", type="relocation.file")
+        file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE)
         file.putfile(fileobj, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
         file.putfile(fileobj, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
 
 
         with atomic_transaction(
         with atomic_transaction(
@@ -112,4 +102,5 @@ class RelocationEndpoint(Endpoint):
                 kind=RelocationFile.Kind.RAW_USER_DATA.value,
                 kind=RelocationFile.Kind.RAW_USER_DATA.value,
             )
             )
 
 
+        uploading_complete.delay(relocation.uuid)
         return Response(status=201)
         return Response(status=201)

+ 1 - 0
src/sentry/conf/server.py

@@ -744,6 +744,7 @@ CELERY_IMPORTS = (
     "sentry.tasks.recap_servers",
     "sentry.tasks.recap_servers",
     "sentry.tasks.relay",
     "sentry.tasks.relay",
     "sentry.tasks.release_registry",
     "sentry.tasks.release_registry",
+    "sentry.tasks.relocation",
     "sentry.tasks.weekly_reports",
     "sentry.tasks.weekly_reports",
     "sentry.tasks.reprocessing",
     "sentry.tasks.reprocessing",
     "sentry.tasks.reprocessing2",
     "sentry.tasks.reprocessing2",

+ 99 - 0
src/sentry/tasks/relocation.py

@@ -0,0 +1,99 @@
+from __future__ import annotations
+
+import logging
+from typing import Optional
+
+from sentry.models.relocation import Relocation, RelocationFile
+from sentry.silo import SiloMode
+from sentry.tasks.base import instrumented_task
+from sentry.utils.relocation import OrderedTask, retry_task_or_fail_relocation, start_task
+
+logger = logging.getLogger(__name__)
+
+# Time limits for various steps in the process.
+RETRY_BACKOFF = 60  # So the 1st retry is after ~1 min, 2nd after ~2 min, 3rd after ~4 min.
+UPLOADING_TIME_LIMIT = 60  # This should be quick - we're just pinging the DB, then GCS.
+PREPROCESSING_TIME_LIMIT = 60 * 5  # 5 minutes is plenty for all preprocessing task attempts.
+
+# All pre and post processing tasks have the same number of retries.
+MAX_FAST_TASK_RETRIES = 2
+MAX_FAST_TASK_ATTEMPTS = MAX_FAST_TASK_RETRIES + 1
+
+# Some reasonable limits on the amount of data we import - we can adjust these as needed.
+MAX_ORGS_PER_RELOCATION = 20
+MAX_USERS_PER_RELOCATION = 200
+
+RELOCATION_FILES_TO_BE_VALIDATED = [
+    RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA,
+    RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA,
+    RelocationFile.Kind.RAW_USER_DATA,
+]
+
+# Various error strings that we want to surface to users.
+ERR_FILE_UPLOAD = "Internal error during file upload"
+
+
+# TODO(getsentry/team-ospo#203): We should split this task in two, one for "small" imports of say
+# <=10MB, and one for large imports >10MB. Then we should limit the number of daily executions of
+# the latter.
+@instrumented_task(
+    name="sentry.relocation.uploading_complete",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=UPLOADING_TIME_LIMIT,
+)
+def uploading_complete(uuid: str) -> None:
+    """
+    Just check to ensure that uploading the (potentially very large!) backup file has completed
+    before we try to do all sorts of fun stuff with it.
+    """
+
+    relocation: Optional[Relocation]
+    attempts_left: int
+    (relocation, attempts_left) = start_task(
+        uuid=uuid,
+        step=Relocation.Step.UPLOADING,
+        task=OrderedTask.UPLOADING_COMPLETE,
+        allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
+    )
+    if relocation is None:
+        return
+
+    # Pull down the `RelocationFile` associated with this `Relocation`. Fallibility is expected
+    # here: we're pushing a potentially very large file with many blobs to a cloud store, so it is
+    # possible (likely, even) that not all of the blobs are yet available. If this segment fails,
+    # we'll just allow the Exception to bubble up and retry the task if possible.
+    with retry_task_or_fail_relocation(
+        relocation,
+        OrderedTask.UPLOADING_COMPLETE,
+        attempts_left,
+        ERR_FILE_UPLOAD,
+    ):
+        raw_relocation_file = (
+            RelocationFile.objects.filter(
+                relocation=relocation,
+                kind=RelocationFile.Kind.RAW_USER_DATA.value,
+            )
+            .select_related("file")
+            .first()
+        )
+        fp = raw_relocation_file.file.getfile()
+
+        with fp:
+            preprocessing_scan.delay(uuid)
+
+
+@instrumented_task(
+    name="sentry.relocation.preprocessing_scan",
+    queue="relocation",
+    max_retries=MAX_FAST_TASK_RETRIES,
+    retry_backoff=RETRY_BACKOFF,
+    retry_backoff_jitter=True,
+    soft_time_limit=PREPROCESSING_TIME_LIMIT,
+    silo_mode=SiloMode.REGION,
+)
+def preprocessing_scan(uuid: str) -> None:
+    # TODO(getsentry/team-ospo#203): Implement this.
+    pass

+ 144 - 0
src/sentry/utils/relocation.py

@@ -0,0 +1,144 @@
+from __future__ import annotations
+
+import logging
+from contextlib import contextmanager
+from enum import Enum, unique
+from typing import Generator, Optional, Tuple
+
+from sentry.models.relocation import Relocation
+
+logger = logging.getLogger("sentry.relocation.tasks")
+
+
+# Relocation tasks are always performed in sequential order. We can leverage this to check for any
+# weird out-of-order executions.
+@unique
+class OrderedTask(Enum):
+    NONE = 0
+    UPLOADING_COMPLETE = 1
+    PREPROCESSING_SCAN = 2
+
+
+# The file type for a relocation export tarball of any kind.
+RELOCATION_FILE_TYPE = "relocation.file"
+
+# Relocation input files are uploaded as tarballs, and chunked and stored using the normal
+# `File`/`AbstractFile` mechanism, which has a hard limit of 2GiB, because we need to represent the
+# offset into it as a 32-bit int. This means that the largest tarball we are able to import at this
+# time is 2GiB. When validating this tarball, we will need to make a "composite object" from the
+# uploaded blobs in Google Cloud Storage, which has a limit of 32 components. Thus, we get our blob
+# size of the maximum overall file size (2GiB) divided by the maximum number of blobs (32): 65536MiB
+# per blob.
+#
+# Note that the actual production file size limit, set by uwsgi, is currently 209715200 bytes, or
+# ~200MB, so we should never see more than ~4 blobs in
+RELOCATION_BLOB_SIZE = int((2**31) / 32)
+
+
+def start_task(
+    uuid: str, step: Relocation.Step, task: OrderedTask, allowed_task_attempts: int
+) -> Tuple[Optional[Relocation], int]:
+    """
+    All tasks for relocation are done sequentially, and take the UUID of the `Relocation` model as
+    the input. We can leverage this information to do some common pre-task setup.
+
+    Returns a tuple of relocation model and the number of attempts remaining for this task.
+    """
+
+    logger_data = {"uuid": uuid}
+    try:
+        relocation = Relocation.objects.get(uuid=uuid)
+    except Relocation.DoesNotExist as exc:
+        logger.error(f"Could not locate Relocation model by UUID: {uuid}", exc_info=exc)
+        return (None, 0)
+    if relocation.status != Relocation.Status.IN_PROGRESS.value:
+        logger.error(
+            f"Relocation has already completed as `{Relocation.Status(relocation.status)}`",
+            extra=logger_data,
+        )
+        return (None, 0)
+
+    try:
+        prev_task_name = "" if task.value == 1 else OrderedTask(task.value - 1).name
+    except Exception:
+        logger.error("Attempted to execute unknown relocation task", extra=logger_data)
+        fail_relocation(relocation, OrderedTask.NONE)
+        return (None, 0)
+
+    logger_data["task"] = task.name
+    if relocation.latest_task == task.name:
+        relocation.latest_task_attempts += 1
+    elif relocation.latest_task not in {prev_task_name, task.name}:
+        logger.error(
+            f"Task {task.name} tried to follow {relocation.latest_task} which is the wrong order",
+            extra=logger_data,
+        )
+        fail_relocation(relocation, task)
+        return (None, 0)
+    else:
+        relocation.latest_task = task.name
+        relocation.latest_task_attempts += 1
+
+    relocation.step = step.value
+    relocation.save()
+
+    logger.info("Task started", extra=logger_data)
+    return (relocation, allowed_task_attempts - relocation.latest_task_attempts)
+
+
+def fail_relocation(relocation: Relocation, task: OrderedTask, reason: str = "") -> None:
+    """
+    Helper function that conveniently fails a relocation celery task in such a way that the failure
+    reason is recorded for the user and no further retries occur. It should be used like:
+
+    >>> relocation = Relocation.objects.get(...)
+    >>> if failure_condition:
+    >>>     fail_relocation(relocation, "Some user-friendly reason why this failed.")
+    >>>     return  # Always exit the task immediately upon failure
+
+    This function is ideal for non-transient failures, where we know there is no need to retry
+    because the result won't change, like invalid input data or conclusive validation results. For
+    transient failures where retrying at a later time may be useful, use `retry_or_fail_relocation`
+    instead.
+    """
+
+    if reason:
+        relocation.failure_reason = reason
+
+    relocation.status = Relocation.Status.FAILURE.value
+    relocation.save()
+
+    logger.info("Task failed", extra={"uuid": relocation.uuid, "task": task.name, "reason": reason})
+    return
+
+
+@contextmanager
+def retry_task_or_fail_relocation(
+    relocation: Relocation, task: OrderedTask, attempts_left: int, reason: str = ""
+) -> Generator[None, None, None]:
+    """
+    Catches all exceptions, and does one of two things: calls into `fail_relocation` if there are no
+    retry attempts forthcoming, or simply bubbles them up (thereby triggering a celery retry) if
+    there are.
+
+    This function is ideal for transient failures, like networked service lag, where retrying at a
+    later time might yield a different result. For non-transient failures, use `fail_relocation`
+    instead.
+    """
+
+    logger_data = {"uuid": relocation.uuid, "task": task.name, "attempts_left": attempts_left}
+    try:
+        yield
+    except Exception as e:
+        # If this is the last attempt, fail in the manner requested before reraising the exception.
+        # This ensures that the database entry for this `Relocation` correctly notes it as a
+        # `FAILURE`.
+        if attempts_left == 0:
+            fail_relocation(relocation, task, reason)
+            return
+
+        logger_data["reason"] = reason
+        logger.info("Task retried", extra=logger_data)
+        raise e
+    else:
+        logger.info("Task finished", extra=logger_data)

+ 4 - 1
tests/sentry/api/endpoints/test_relocation.py

@@ -1,6 +1,7 @@
 import tempfile
 import tempfile
 from pathlib import Path
 from pathlib import Path
 from typing import Tuple
 from typing import Tuple
+from unittest.mock import patch
 
 
 from django.core.files.uploadedfile import SimpleUploadedFile
 from django.core.files.uploadedfile import SimpleUploadedFile
 from django.urls import reverse
 from django.urls import reverse
@@ -42,7 +43,8 @@ class RelocationCreateTest(APITestCase):
 
 
         return (tmp_priv_key_path, tmp_pub_key_path)
         return (tmp_priv_key_path, tmp_pub_key_path)
 
 
-    def test_success_simple(self):
+    @patch("sentry.tasks.relocation.uploading_complete.delay")
+    def test_success_simple(self, uploading_complete_mock):
         relocation_count = Relocation.objects.count()
         relocation_count = Relocation.objects.count()
         relocation_file_count = RelocationFile.objects.count()
         relocation_file_count = RelocationFile.objects.count()
 
 
@@ -69,6 +71,7 @@ class RelocationCreateTest(APITestCase):
         assert response.status_code == 201
         assert response.status_code == 201
         assert Relocation.objects.count() == relocation_count + 1
         assert Relocation.objects.count() == relocation_count + 1
         assert RelocationFile.objects.count() == relocation_file_count + 1
         assert RelocationFile.objects.count() == relocation_file_count + 1
+        assert uploading_complete_mock.called == 1
 
 
     def test_success_relocation_for_same_owner_already_completed(self):
     def test_success_relocation_for_same_owner_already_completed(self):
         Relocation.objects.create(
         Relocation.objects.create(

+ 98 - 0
tests/sentry/tasks/test_relocation.py

@@ -0,0 +1,98 @@
+from functools import cached_property
+from io import BytesIO
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest.mock import Mock, patch
+
+import pytest
+
+from sentry.backup.helpers import create_encrypted_export_tarball
+from sentry.models.files.file import File
+from sentry.models.relocation import Relocation, RelocationFile
+from sentry.tasks.relocation import ERR_FILE_UPLOAD, MAX_FAST_TASK_RETRIES, uploading_complete
+from sentry.testutils.cases import TestCase
+from sentry.testutils.factories import get_fixture_path
+from sentry.testutils.helpers.backups import generate_rsa_key_pair
+from sentry.testutils.silo import region_silo_test
+from sentry.utils import json
+from sentry.utils.relocation import RELOCATION_FILE_TYPE
+
+
+class RelocationTaskTestCase(TestCase):
+    def setUp(self):
+        super().setUp()
+        self.owner = self.create_user(
+            email="owner", is_superuser=False, is_staff=False, is_active=True
+        )
+        self.superuser = self.create_user(
+            "superuser", is_superuser=True, is_staff=True, is_active=True
+        )
+        self.login_as(user=self.superuser, superuser=True)
+        self.relocation: Relocation = Relocation.objects.create(
+            creator=self.superuser.id,
+            owner=self.owner.id,
+            want_org_slugs=["testing"],
+            step=Relocation.Step.UPLOADING.value,
+        )
+        self.relocation_file = RelocationFile.objects.create(
+            relocation=self.relocation,
+            file=self.file,
+            kind=RelocationFile.Kind.RAW_USER_DATA.value,
+        )
+        self.uuid = self.relocation.uuid
+
+    @cached_property
+    def file(self):
+        with TemporaryDirectory() as tmp_dir:
+            (priv_key_pem, pub_key_pem) = generate_rsa_key_pair()
+            tmp_priv_key_path = Path(tmp_dir).joinpath("key")
+            self.priv_key_pem = priv_key_pem
+            with open(tmp_priv_key_path, "wb") as f:
+                f.write(priv_key_pem)
+
+            tmp_pub_key_path = Path(tmp_dir).joinpath("key.pub")
+            self.pub_key_pem = pub_key_pem
+            with open(tmp_pub_key_path, "wb") as f:
+                f.write(pub_key_pem)
+
+            with open(get_fixture_path("backup", "fresh-install.json")) as f:
+                data = json.load(f)
+                with open(tmp_pub_key_path, "rb") as p:
+                    file = File.objects.create(name="export.tar", type=RELOCATION_FILE_TYPE)
+                    self.tarball = create_encrypted_export_tarball(data, p).getvalue()
+                    file.putfile(BytesIO(self.tarball))
+
+            return file
+
+
+@patch("sentry.tasks.relocation.preprocessing_scan.delay")
+@region_silo_test
+class UploadingCompleteTest(RelocationTaskTestCase):
+    def test_success(self, preprocessing_scan_mock: Mock):
+        uploading_complete(self.relocation.uuid)
+
+        assert preprocessing_scan_mock.call_count == 1
+
+    def test_retry_if_attempts_left(self, preprocessing_scan_mock: Mock):
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        # An exception being raised will trigger a retry in celery.
+        with pytest.raises(Exception):
+            uploading_complete(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+        assert preprocessing_scan_mock.call_count == 0
+
+    def test_fail_if_no_attempts_left(self, preprocessing_scan_mock: Mock):
+        self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES
+        self.relocation.save()
+        RelocationFile.objects.filter(relocation=self.relocation).delete()
+
+        uploading_complete(self.relocation.uuid)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == ERR_FILE_UPLOAD
+        assert preprocessing_scan_mock.call_count == 0

+ 167 - 0
tests/sentry/utils/test_relocation.py

@@ -0,0 +1,167 @@
+from uuid import uuid4
+
+import pytest
+
+from sentry.models.relocation import Relocation
+from sentry.testutils.cases import TestCase
+from sentry.utils.relocation import (
+    OrderedTask,
+    fail_relocation,
+    retry_task_or_fail_relocation,
+    start_task,
+)
+
+
+class RelocationUtilsTestCase(TestCase):
+    def setUp(self):
+        super().setUp()
+        self.owner = self.create_user(
+            email="owner", is_superuser=False, is_staff=False, is_active=True
+        )
+        self.superuser = self.create_user(
+            "superuser", is_superuser=True, is_staff=True, is_active=True
+        )
+        self.relocation: Relocation = Relocation.objects.create(
+            creator=self.superuser.id,
+            owner=self.owner.id,
+            want_org_slugs=["testing"],
+            step=Relocation.Step.UPLOADING.value,
+        )
+        self.uuid = self.relocation.uuid
+
+
+class RelocationStartTestCase(RelocationUtilsTestCase):
+    def test_bad_relocation_not_found(self):
+        uuid = uuid4().hex
+        (relocation, attempts_left) = start_task(
+            uuid, Relocation.Step.UPLOADING, OrderedTask.UPLOADING_COMPLETE, 3
+        )
+
+        assert relocation is None
+        assert not attempts_left
+
+    def test_bad_relocation_completed(self):
+        self.relocation.status = Relocation.Status.FAILURE.value
+        self.relocation.save()
+
+        (relocation, attempts_left) = start_task(
+            self.uuid, Relocation.Step.UPLOADING, OrderedTask.UPLOADING_COMPLETE, 3
+        )
+
+        assert relocation is None
+        assert not attempts_left
+        assert Relocation.objects.get(uuid=self.uuid).status == Relocation.Status.FAILURE.value
+
+    def test_bad_unknown_task(self):
+        (relocation, attempts_left) = start_task(
+            self.uuid, Relocation.Step.UPLOADING, OrderedTask.NONE, 3
+        )
+
+        assert relocation is None
+        assert not attempts_left
+        assert Relocation.objects.get(uuid=self.uuid).status == Relocation.Status.FAILURE.value
+
+    def test_bad_task_out_of_order(self):
+        self.relocation.latest_task = OrderedTask.PREPROCESSING_SCAN.name
+        self.relocation.save()
+
+        (relocation, attempts_left) = start_task(
+            self.uuid, Relocation.Step.UPLOADING, OrderedTask.UPLOADING_COMPLETE, 3
+        )
+
+        assert relocation is None
+        assert not attempts_left
+        assert Relocation.objects.get(uuid=self.uuid).status == Relocation.Status.FAILURE.value
+
+    def test_good_first_task(self):
+        (relocation, attempts_left) = start_task(
+            self.uuid, Relocation.Step.UPLOADING, OrderedTask.UPLOADING_COMPLETE, 3
+        )
+
+        assert relocation is not None
+        assert attempts_left == 2
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation is not None
+        assert relocation.step == Relocation.Step.UPLOADING.value
+        assert relocation.status != Relocation.Status.FAILURE.value
+
+    def test_good_next_task(self):
+        self.relocation.latest_task = OrderedTask.UPLOADING_COMPLETE.name
+        self.relocation.save()
+
+        assert self.relocation.step == Relocation.Step.UPLOADING.value
+
+        (relocation, attempts_left) = start_task(
+            self.uuid, Relocation.Step.PREPROCESSING, OrderedTask.PREPROCESSING_SCAN, 3
+        )
+
+        assert relocation is not None
+        assert attempts_left == 2
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation is not None
+        assert relocation.step == Relocation.Step.PREPROCESSING.value
+        assert relocation.status != Relocation.Status.FAILURE.value
+
+
+class RelocationFailTestCase(RelocationUtilsTestCase):
+    def test_no_reason(self):
+        fail_relocation(self.relocation, OrderedTask.UPLOADING_COMPLETE)
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert not relocation.failure_reason
+
+    def test_with_reason(self):
+        fail_relocation(self.relocation, OrderedTask.UPLOADING_COMPLETE, "foo")
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == "foo"
+
+
+class RelocationRetryOrFailTestCase(RelocationUtilsTestCase):
+    def test_no_reason_attempts_left(self):
+        with pytest.raises(ValueError):
+            with retry_task_or_fail_relocation(self.relocation, OrderedTask.UPLOADING_COMPLETE, 3):
+                raise ValueError("Some sort of failure")
+
+        assert Relocation.objects.get(uuid=self.uuid).status == Relocation.Status.IN_PROGRESS.value
+
+    def test_no_reason_last_attempt(self):
+        # Wrap in `try/except` to make mypy happy.
+        try:
+            with retry_task_or_fail_relocation(self.relocation, OrderedTask.UPLOADING_COMPLETE, 0):
+                raise ValueError("Some sort of failure")
+        except Exception:
+            pass
+
+        assert Relocation.objects.get(uuid=self.uuid).status == Relocation.Status.FAILURE.value
+
+    def test_with_reason_attempts_left(self):
+        with pytest.raises(ValueError):
+            with retry_task_or_fail_relocation(
+                self.relocation, OrderedTask.UPLOADING_COMPLETE, 3, "foo"
+            ):
+                raise ValueError("Some sort of failure")
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation is not None
+        assert relocation.status == Relocation.Status.IN_PROGRESS.value
+        assert not relocation.failure_reason
+
+    def test_with_reason_last_attempt(self):
+        # Wrap in `try/except` to make mypy happy.
+        try:
+            with retry_task_or_fail_relocation(
+                self.relocation, OrderedTask.UPLOADING_COMPLETE, 0, "foo"
+            ):
+                raise ValueError("Some sort of failure")
+        except Exception:
+            pass
+
+        relocation = Relocation.objects.get(uuid=self.uuid)
+        assert relocation is not None
+        assert relocation.status == Relocation.Status.FAILURE.value
+        assert relocation.failure_reason == "foo"