|
@@ -290,7 +290,7 @@ def uploading_start(uuid: UUID, replying_region_name: str | None, org_slug: str
|
|
|
# reasonable amount of time, go ahead and fail the relocation.
|
|
|
cross_region_export_timeout_check.apply_async(
|
|
|
args=[uuid],
|
|
|
- countdown=CROSS_REGION_EXPORT_TIMEOUT * 60,
|
|
|
+ countdown=int(CROSS_REGION_EXPORT_TIMEOUT.total_seconds()),
|
|
|
)
|
|
|
return
|
|
|
|
|
@@ -332,6 +332,20 @@ def fulfill_cross_region_export_request(
|
|
|
`SAAS_TO_SAAS` relocation's pipeline, namely `uploading_complete`.
|
|
|
"""
|
|
|
|
|
|
+ logger_data = {
|
|
|
+ "uuid": uuid_str,
|
|
|
+ "task": "fulfill_cross_region_export_request",
|
|
|
+ "requesting_region_name": requesting_region_name,
|
|
|
+ "replying_region_name": replying_region_name,
|
|
|
+ "org_slug": org_slug,
|
|
|
+ "encrypted_public_key_size": len(encrypt_with_public_key),
|
|
|
+ "scheduled_at": scheduled_at,
|
|
|
+ }
|
|
|
+ logger.info(
|
|
|
+ "fulfill_cross_region_export_request: started",
|
|
|
+ extra=logger_data,
|
|
|
+ )
|
|
|
+
|
|
|
# Because we use `acks_late`, we need to be careful to prevent infinite scheduling due to some
|
|
|
# persistent bug, like an error in the export logic. So, if `CROSS_REGION_EXPORT_TIMEOUT` time
|
|
|
# has elapsed, always fail this task. Note that we don't report proactively back this failure,
|
|
@@ -339,15 +353,8 @@ def fulfill_cross_region_export_request(
|
|
|
scheduled_at_dt = datetime.fromtimestamp(scheduled_at, tz=UTC)
|
|
|
if scheduled_at_dt + CROSS_REGION_EXPORT_TIMEOUT < datetime.now(tz=UTC):
|
|
|
logger.error(
|
|
|
- "Cross region relocation fulfillment timeout",
|
|
|
- extra={
|
|
|
- "uuid": uuid_str,
|
|
|
- "requesting_region_name": requesting_region_name,
|
|
|
- "replying_region_name": replying_region_name,
|
|
|
- "org_slug": org_slug,
|
|
|
- "encrypted_contents_size": len(encrypt_with_public_key),
|
|
|
- "scheduled_at": scheduled_at,
|
|
|
- },
|
|
|
+ "fulfill_cross_region_export_request: timeout",
|
|
|
+ extra=logger_data,
|
|
|
)
|
|
|
return
|
|
|
|
|
@@ -356,14 +363,29 @@ def fulfill_cross_region_export_request(
|
|
|
path = f"runs/{uuid}/saas_to_saas_export/{org_slug}.tar"
|
|
|
relocation_storage = get_relocation_storage()
|
|
|
fp = BytesIO()
|
|
|
+ logger.error(
|
|
|
+ "fulfill_cross_region_export_request: exporting",
|
|
|
+ extra=logger_data,
|
|
|
+ )
|
|
|
+
|
|
|
export_in_organization_scope(
|
|
|
fp,
|
|
|
encryptor=LocalFileEncryptor(BytesIO(encrypt_with_public_key)),
|
|
|
org_filter={org_slug},
|
|
|
printer=LoggingPrinter(uuid),
|
|
|
)
|
|
|
+ logger.error(
|
|
|
+ "fulfill_cross_region_export_request: exported",
|
|
|
+ extra=logger_data,
|
|
|
+ )
|
|
|
+
|
|
|
fp.seek(0)
|
|
|
relocation_storage.save(path, fp)
|
|
|
+ logger_data["encrypted_contents_size"] = fp.tell()
|
|
|
+ logger.error(
|
|
|
+ "fulfill_cross_region_export_request: saved",
|
|
|
+ extra=logger_data,
|
|
|
+ )
|
|
|
|
|
|
identifier = uuid_to_identifier(uuid)
|
|
|
RegionOutbox(
|
|
@@ -378,6 +400,10 @@ def fulfill_cross_region_export_request(
|
|
|
org_slug=org_slug,
|
|
|
).dict(),
|
|
|
).save()
|
|
|
+ logger.error(
|
|
|
+ "fulfill_cross_region_export_request: scheduled",
|
|
|
+ extra=logger_data,
|
|
|
+ )
|
|
|
|
|
|
|
|
|
@instrumented_task(
|