Browse Source

fix(relocation): Better UUID typing for tasks (#74971)

Fixes: SENTRY-3CDC

There are three ways a UUID can be reasonably represented vis-a-vis
relocation code: as a str, as a stdlib `UUID`, and as an instance of our
custom `UUIDField`. We were being pretty loose with our casting between
these three. In particular, we were testing using `str` UUIDs, but
actually sending `UUID` objects around in prod. Since this has now
caused a bug, I've gone through and made everything more correct, and
more in line with how our production task schedulers (outboxes, celery)
actually pass these objects around.
Alex Zaslavsky 7 months ago
parent
commit
96ad1cf321

+ 46 - 41
src/sentry/tasks/relocation.py

@@ -158,7 +158,7 @@ ERR_COMPLETED_INTERNAL = "Internal error during relocation wrap-up."
     retry_backoff_jitter=True,
     soft_time_limit=FAST_TIME_LIMIT,
 )
-def uploading_start(uuid: str, replying_region_name: str | None, org_slug: str | None) -> None:
+def uploading_start(uuid: UUID, replying_region_name: str | None, org_slug: str | None) -> None:
     """
     The very first action in the relocation pipeline. In the case of a `SAAS_TO_SAAS` relocation, it
     will trigger the export of the requested organization from the region it currently live in. If
@@ -279,7 +279,7 @@ def uploading_start(uuid: str, replying_region_name: str | None, org_slug: str |
 
             # Send out the cross-region request.
             control_relocation_export_service.request_new_export(
-                relocation_uuid=uuid,
+                relocation_uuid=str(uuid),
                 requesting_region_name=get_local_region().name,
                 replying_region_name=replying_region_name,
                 org_slug=org_slug,
@@ -314,7 +314,7 @@ def uploading_start(uuid: str, replying_region_name: str | None, org_slug: str |
     silo_mode=SiloMode.REGION,
 )
 def fulfill_cross_region_export_request(
-    uuid: str,
+    uuid_str: str,
     requesting_region_name: str,
     replying_region_name: str,
     org_slug: str,
@@ -341,7 +341,7 @@ def fulfill_cross_region_export_request(
         logger.error(
             "Cross region relocation fulfillment timeout",
             extra={
-                "uuid": uuid,
+                "uuid": uuid_str,
                 "requesting_region_name": requesting_region_name,
                 "replying_region_name": replying_region_name,
                 "org_slug": org_slug,
@@ -352,6 +352,7 @@ def fulfill_cross_region_export_request(
         return
 
     log_gcp_credentials_details(logger)
+    uuid = UUID(uuid_str)
     path = f"runs/{uuid}/saas_to_saas_export/{org_slug}.tar"
     relocation_storage = get_relocation_storage()
     fp = BytesIO()
@@ -364,14 +365,14 @@ def fulfill_cross_region_export_request(
     fp.seek(0)
     relocation_storage.save(path, fp)
 
-    identifier = uuid_to_identifier(UUID(uuid))
+    identifier = uuid_to_identifier(uuid)
     RegionOutbox(
         shard_scope=OutboxScope.RELOCATION_SCOPE,
         category=OutboxCategory.RELOCATION_EXPORT_REPLY,
         shard_identifier=identifier,
         object_identifier=identifier,
         payload=RelocationExportReplyWithExportParameters(
-            relocation_uuid=uuid,
+            relocation_uuid=uuid_str,
             requesting_region_name=requesting_region_name,
             replying_region_name=replying_region_name,
             org_slug=org_slug,
@@ -390,7 +391,7 @@ def fulfill_cross_region_export_request(
     silo_mode=SiloMode.REGION,
 )
 def cross_region_export_timeout_check(
-    uuid: str,
+    uuid: UUID,
 ) -> None:
     """
     Not part of the primary `OrderedTask` queue. This task is only used to ensure that cross-region
@@ -403,7 +404,7 @@ def cross_region_export_timeout_check(
         logger.exception("Could not locate Relocation model by UUID: %s", uuid)
         return
 
-    logger_data = {"uuid": relocation.uuid, "task": "cross_region_export_timeout_check"}
+    logger_data = {"uuid": str(relocation.uuid), "task": "cross_region_export_timeout_check"}
     logger.info(
         "Cross region timeout check: started",
         extra=logger_data,
@@ -446,7 +447,7 @@ def cross_region_export_timeout_check(
     retry_backoff_jitter=True,
     soft_time_limit=FAST_TIME_LIMIT,
 )
-def uploading_complete(uuid: str) -> None:
+def uploading_complete(uuid: UUID) -> None:
     """
     Just check to ensure that uploading the (potentially very large!) backup file has completed
     before we try to do all sorts of fun stuff with it.
@@ -495,7 +496,7 @@ def uploading_complete(uuid: str) -> None:
     soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def preprocessing_scan(uuid: str) -> None:
+def preprocessing_scan(uuid: UUID) -> None:
     """
     Performs the very first part of the `PREPROCESSING` step of a `Relocation`, which involves
     decrypting the user-supplied tarball and picking out some useful information for it. This let's
@@ -671,7 +672,7 @@ def preprocessing_scan(uuid: str) -> None:
     soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def preprocessing_transfer(uuid: str) -> None:
+def preprocessing_transfer(uuid: UUID) -> None:
     """
     We currently have the user's relocation data stored in the main filestore bucket, but we need to
     move it to the relocation bucket. This task handles that transfer.
@@ -761,7 +762,7 @@ def preprocessing_transfer(uuid: str) -> None:
     soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def preprocessing_baseline_config(uuid: str) -> None:
+def preprocessing_baseline_config(uuid: UUID) -> None:
     """
     Pulls down the global config data we'll need to check for collisions and global data integrity.
 
@@ -814,7 +815,7 @@ def preprocessing_baseline_config(uuid: str) -> None:
     soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def preprocessing_colliding_users(uuid: str) -> None:
+def preprocessing_colliding_users(uuid: UUID) -> None:
     """
     Pulls down any already existing users whose usernames match those found in the import - we'll
     need to validate that none of these are mutated during import.
@@ -865,7 +866,7 @@ def preprocessing_colliding_users(uuid: str) -> None:
     soft_time_limit=MEDIUM_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def preprocessing_complete(uuid: str) -> None:
+def preprocessing_complete(uuid: UUID) -> None:
     """
     This task ensures that every file CloudBuild will need to do its work is actually present and
     available. Even if we've "finished" our uploads from the previous step, they may still not (yet)
@@ -988,15 +989,18 @@ def _update_relocation_validation_attempt(
             router.db_for_write(RelocationValidationAttempt),
         )
     ):
+        uuid_str = str(relocation.uuid)
+        uuid = UUID(uuid_str)
+
         # If no interesting status updates occurred, check again in a minute.
         if status == ValidationStatus.IN_PROGRESS:
             logger.info(
                 "Validation polling: scheduled",
-                extra={"uuid": relocation.uuid, "task": task.name},
+                extra={"uuid": uuid_str, "task": task.name},
             )
             return NextTask(
                 task=validating_poll,
-                args=[relocation.uuid, str(relocation_validation_attempt.build_id)],
+                args=[uuid, str(relocation_validation_attempt.build_id)],
                 countdown=60,
             )
 
@@ -1017,10 +1021,10 @@ def _update_relocation_validation_attempt(
 
                 logger.info(
                     "Validation timed out",
-                    extra={"uuid": relocation.uuid, "task": task.name},
+                    extra={"uuid": uuid_str, "task": task.name},
                 )
 
-                return NextTask(task=validating_start, args=[relocation.uuid])
+                return NextTask(task=validating_start, args=[uuid])
 
             # Always accept the numerically higher `ValidationStatus`, since that is a more definite
             # result.
@@ -1048,7 +1052,7 @@ def _update_relocation_validation_attempt(
         if status == ValidationStatus.INVALID:
             logger.info(
                 "Validation result: invalid",
-                extra={"uuid": relocation.uuid, "task": task.name},
+                extra={"uuid": uuid_str, "task": task.name},
             )
             transaction.on_commit(
                 lambda: fail_relocation(
@@ -1066,10 +1070,10 @@ def _update_relocation_validation_attempt(
 
         logger.info(
             "Validation result: valid",
-            extra={"uuid": relocation.uuid, "task": task.name},
+            extra={"uuid": uuid_str, "task": task.name},
         )
 
-        return NextTask(task=importing, args=[relocation.uuid])
+        return NextTask(task=importing, args=[uuid])
 
 
 @instrumented_task(
@@ -1082,7 +1086,7 @@ def _update_relocation_validation_attempt(
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def validating_start(uuid: str) -> None:
+def validating_start(uuid: UUID) -> None:
     """
     Calls into Google CloudBuild and kicks off a validation run.
 
@@ -1159,7 +1163,7 @@ def validating_start(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def validating_poll(uuid: str, build_id: str) -> None:
+def validating_poll(uuid: UUID, build_id: str) -> None:
     """
     Checks the progress of a Google CloudBuild validation run.
 
@@ -1189,7 +1193,7 @@ def validating_poll(uuid: str, build_id: str) -> None:
     logger.info(
         "Validation polling: active",
         extra={
-            "uuid": relocation.uuid,
+            "uuid": str(relocation.uuid),
             "task": OrderedTask.VALIDATING_POLL.name,
             "build_id": build_id,
         },
@@ -1259,7 +1263,7 @@ def validating_poll(uuid: str, build_id: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def validating_complete(uuid: str, build_id: str) -> None:
+def validating_complete(uuid: UUID, build_id: str) -> None:
     """
     Wraps up a validation run, and reports on what we found. If this task is being called, the
     CloudBuild run as completed successfully, so we just need to figure out if there were any
@@ -1349,7 +1353,7 @@ def validating_complete(uuid: str, build_id: str) -> None:
     soft_time_limit=SLOW_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def importing(uuid: str) -> None:
+def importing(uuid: UUID) -> None:
     """
     Perform the import on the actual live instance we are targeting.
 
@@ -1413,7 +1417,7 @@ def importing(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def postprocessing(uuid: str) -> None:
+def postprocessing(uuid: UUID) -> None:
     """
     Make the owner of this relocation an owner of all of the organizations we just imported.
     """
@@ -1434,9 +1438,10 @@ def postprocessing(uuid: str) -> None:
         attempts_left,
         ERR_POSTPROCESSING_INTERNAL,
     ):
+        uuid_str = str(uuid)
         imported_org_ids: set[int] = set()
         for chunk in RegionImportChunk.objects.filter(
-            import_uuid=str(uuid), model="sentry.organization"
+            import_uuid=uuid_str, model="sentry.organization"
         ):
             imported_org_ids = imported_org_ids.union(set(chunk.inserted_map.values()))
 
@@ -1468,7 +1473,7 @@ def postprocessing(uuid: str) -> None:
         # Last, but certainly not least: trigger signals, so that interested subscribers in eg:
         # getsentry can do whatever postprocessing they need to. If even a single one fails, we fail
         # the entire task.
-        for _, result in relocated.send_robust(sender=postprocessing, relocation_uuid=uuid):
+        for _, result in relocated.send_robust(sender=postprocessing, relocation_uuid=uuid_str):
             if isinstance(result, Exception):
                 raise result
 
@@ -1477,7 +1482,7 @@ def postprocessing(uuid: str) -> None:
         relocation_redeem_promo_code.send_robust(
             sender=postprocessing,
             user_id=relocation.owner_id,
-            relocation_uuid=uuid,
+            relocation_uuid=uuid_str,
             orgs=list(imported_orgs),
         )
 
@@ -1486,7 +1491,7 @@ def postprocessing(uuid: str) -> None:
                 analytics.record(
                     "relocation.organization_imported",
                     organization_id=org.id,
-                    relocation_uuid=str(relocation.uuid),
+                    relocation_uuid=uuid_str,
                     slug=org.slug,
                     owner_id=relocation.owner_id,
                 )
@@ -1506,7 +1511,7 @@ def postprocessing(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def notifying_unhide(uuid: str) -> None:
+def notifying_unhide(uuid: UUID) -> None:
     """
     Un-hide the just-imported organizations, making them visible to users in the UI.
     """
@@ -1554,7 +1559,7 @@ def notifying_unhide(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def notifying_users(uuid: str) -> None:
+def notifying_users(uuid: UUID) -> None:
     """
     Send an email to all users that have been imported, telling them to claim their accounts.
     """
@@ -1575,16 +1580,15 @@ def notifying_users(uuid: str) -> None:
         attempts_left,
         ERR_NOTIFYING_INTERNAL,
     ):
+        uuid_str = str(uuid)
         imported_user_ids: set[int] = set()
-        chunks = ControlImportChunkReplica.objects.filter(
-            import_uuid=str(uuid), model="sentry.user"
-        )
+        chunks = ControlImportChunkReplica.objects.filter(import_uuid=uuid_str, model="sentry.user")
         for control_chunk in chunks:
             imported_user_ids = imported_user_ids.union(set(control_chunk.inserted_map.values()))
 
         imported_org_slugs: set[int] = set()
         for region_chunk in RegionImportChunk.objects.filter(
-            import_uuid=str(uuid), model="sentry.organization"
+            import_uuid=uuid_str, model="sentry.organization"
         ):
             imported_org_slugs = imported_org_slugs.union(
                 set(region_chunk.inserted_identifiers.values())
@@ -1631,7 +1635,7 @@ def notifying_users(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def notifying_owner(uuid: str) -> None:
+def notifying_owner(uuid: UUID) -> None:
     """
     Send an email to the creator and owner, telling them that their relocation was successful.
     """
@@ -1652,9 +1656,10 @@ def notifying_owner(uuid: str) -> None:
         attempts_left,
         ERR_NOTIFYING_INTERNAL,
     ):
+        uuid_str = str(uuid)
         imported_org_slugs: set[int] = set()
         for chunk in RegionImportChunk.objects.filter(
-            import_uuid=str(uuid), model="sentry.organization"
+            import_uuid=uuid_str, model="sentry.organization"
         ):
             imported_org_slugs = imported_org_slugs.union(set(chunk.inserted_identifiers.values()))
 
@@ -1662,7 +1667,7 @@ def notifying_owner(uuid: str) -> None:
             relocation,
             Relocation.EmailKind.SUCCEEDED,
             {
-                "uuid": str(relocation.uuid),
+                "uuid": uuid_str,
                 "orgs": list(imported_org_slugs),
             },
         )
@@ -1680,7 +1685,7 @@ def notifying_owner(uuid: str) -> None:
     soft_time_limit=FAST_TIME_LIMIT,
     silo_mode=SiloMode.REGION,
 )
-def completed(uuid: str) -> None:
+def completed(uuid: UUID) -> None:
     """
     Finish up a relocation by marking it a success.
     """

+ 9 - 7
src/sentry/utils/relocation.py

@@ -311,7 +311,7 @@ COMPARE_VALIDATION_STEP_TEMPLATE = Template(
 # A custom logger that roughly matches the parts of the `click.echo` interface that the
 # `import_*` methods rely on.
 class LoggingPrinter(Printer):
-    def __init__(self, uuid: str):
+    def __init__(self, uuid: UUID):
         self.uuid = uuid
         super().__init__()
 
@@ -326,13 +326,13 @@ class LoggingPrinter(Printer):
             logger.error(
                 "Import failed: %s",
                 text,
-                extra={"uuid": self.uuid, "task": OrderedTask.IMPORTING.name},
+                extra={"uuid": str(self.uuid), "task": OrderedTask.IMPORTING.name},
             )
         else:
             logger.info(
                 "Import info: %s",
                 text,
-                extra={"uuid": self.uuid, "task": OrderedTask.IMPORTING.name},
+                extra={"uuid": str(self.uuid), "task": OrderedTask.IMPORTING.name},
             )
 
 
@@ -365,7 +365,7 @@ def send_relocation_update_email(
 
 
 def start_relocation_task(
-    uuid: str, task: OrderedTask, allowed_task_attempts: int
+    uuid: UUID, task: OrderedTask, allowed_task_attempts: int
 ) -> tuple[Relocation | None, int]:
     """
     All tasks for relocation are done sequentially, and take the UUID of the `Relocation` model as
@@ -374,7 +374,7 @@ def start_relocation_task(
     Returns a tuple of relocation model and the number of attempts remaining for this task.
     """
 
-    logger_data = {"uuid": uuid}
+    logger_data = {"uuid": str(uuid)}
     try:
         relocation: Relocation = Relocation.objects.get(uuid=uuid)
     except Relocation.DoesNotExist:
@@ -489,7 +489,9 @@ def fail_relocation(relocation: Relocation, task: OrderedTask, reason: str = "")
     relocation.status = Relocation.Status.FAILURE.value
     relocation.save()
 
-    logger.info("Task failed", extra={"uuid": relocation.uuid, "task": task.name, "reason": reason})
+    logger.info(
+        "Task failed", extra={"uuid": str(relocation.uuid), "task": task.name, "reason": reason}
+    )
     send_relocation_update_email(
         relocation,
         Relocation.EmailKind.FAILED,
@@ -514,7 +516,7 @@ def retry_task_or_fail_relocation(
     instead.
     """
 
-    logger_data = {"uuid": relocation.uuid, "task": task.name, "attempts_left": attempts_left}
+    logger_data = {"uuid": str(relocation.uuid), "task": task.name, "attempts_left": attempts_left}
     try:
         yield
     except Exception:

+ 4 - 4
tests/sentry/tasks/test_relocation.py

@@ -5,7 +5,7 @@ from pathlib import Path
 from tempfile import TemporaryDirectory
 from types import SimpleNamespace
 from unittest.mock import MagicMock, Mock, patch
-from uuid import uuid4
+from uuid import UUID, uuid4
 
 import pytest
 import yaml
@@ -145,7 +145,7 @@ class RelocationTaskTestCase(TestCase):
             file=self.file,
             kind=RelocationFile.Kind.RAW_USER_DATA.value,
         )
-        self.uuid = str(self.relocation.uuid)
+        self.uuid = UUID(str(self.relocation.uuid))
 
     @cached_property
     def file(self):
@@ -268,7 +268,7 @@ class UploadingStartTest(RelocationTaskTestCase):
                 latest_task=OrderedTask.UPLOADING_START.name,
                 provenance=Relocation.Provenance.SAAS_TO_SAAS,
             )
-            self.uuid = str(self.relocation.uuid)
+            self.uuid = UUID(str(self.relocation.uuid))
 
     @override_settings(
         SENTRY_MONOLITH_REGION=REQUESTING_TEST_REGION, SENTRY_REGION=REQUESTING_TEST_REGION
@@ -1807,7 +1807,7 @@ class ValidatingPollTest(RelocationTaskTestCase):
         assert relocation.failure_reason == ERR_VALIDATING_INTERNAL
 
 
-def mock_invalid_finding(storage: Storage, uuid: str):
+def mock_invalid_finding(storage: Storage, uuid: UUID):
     storage.save(
         f"runs/{uuid}/findings/import-baseline-config.json",
         BytesIO(

+ 1 - 1
tests/sentry/utils/test_relocation.py

@@ -39,7 +39,7 @@ class RelocationStartTestCase(RelocationUtilsTestCase):
     def test_bad_relocation_not_found(self, fake_message_builder: Mock):
         self.mock_message_builder(fake_message_builder)
 
-        uuid = uuid4().hex
+        uuid = uuid4()
         (rel, attempts_left) = start_relocation_task(uuid, OrderedTask.UPLOADING_COMPLETE, 3)
 
         assert fake_message_builder.call_count == 0