Browse Source

ref(sourcemaps): Refactor and improve bundle assembling (#52433)

Riccardo Busetti 1 year ago
parent
commit
d17ffa8780

+ 46 - 2
src/sentry/models/artifactbundle.py

@@ -1,6 +1,6 @@
 import zipfile
 import zipfile
 from enum import Enum
 from enum import Enum
-from typing import IO, Callable, Dict, List, Mapping, Optional, Tuple
+from typing import IO, Callable, Dict, List, Mapping, Optional, Set, Tuple
 
 
 from django.db import models
 from django.db import models
 from django.db.models.signals import post_delete
 from django.db.models.signals import post_delete
@@ -197,9 +197,16 @@ class ArtifactBundleArchive:
         self._fileobj = fileobj
         self._fileobj = fileobj
         self._zip_file = zipfile.ZipFile(self._fileobj)
         self._zip_file = zipfile.ZipFile(self._fileobj)
         self.manifest = self._read_manifest()
         self.manifest = self._read_manifest()
+        self.artifact_count = len(self.manifest.get("files", {}))
         if build_memory_map:
         if build_memory_map:
             self._build_memory_maps()
             self._build_memory_maps()
 
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc, value, tb):
+        self.close()
+
     def close(self):
     def close(self):
         self._zip_file.close()
         self._zip_file.close()
         self._fileobj.close()
         self._fileobj.close()
@@ -232,7 +239,6 @@ class ArtifactBundleArchive:
         self._entries_by_debug_id = {}
         self._entries_by_debug_id = {}
         self._entries_by_url = {}
         self._entries_by_url = {}
 
 
-        # TODO(iambriccardo): generalize the manifest reading methods across assemble and processor.
         files = self.manifest.get("files", {})
         files = self.manifest.get("files", {})
         for file_path, info in files.items():
         for file_path, info in files.items():
             # Building the map for debug_id lookup.
             # Building the map for debug_id lookup.
@@ -255,6 +261,43 @@ class ArtifactBundleArchive:
             # Building the map for url lookup.
             # Building the map for url lookup.
             self._entries_by_url[info.get("url")] = (file_path, info)
             self._entries_by_url[info.get("url")] = (file_path, info)
 
 
+    def extract_debug_ids_from_manifest(
+        self,
+    ) -> Tuple[Optional[str], Set[Tuple[SourceFileType, str]]]:
+        # We use a set, since we might have the same debug_id and file_type.
+        debug_ids_with_types = set()
+
+        # We also want to extract the bundle_id which is also known as the bundle debug_id. This id is used to uniquely
+        # identify a specific ArtifactBundle in case for example of future deletion.
+        #
+        # If no id is found, it means that we must have an associated release to this ArtifactBundle, through the
+        # ReleaseArtifactBundle table.
+        bundle_id = self._extract_bundle_id()
+
+        files = self.manifest.get("files", {})
+        for file_path, info in files.items():
+            headers = self.normalize_headers(info.get("headers", {}))
+            if (debug_id := headers.get("debug-id")) is not None:
+                debug_id = self.normalize_debug_id(debug_id)
+                file_type = info.get("type")
+                if (
+                    debug_id is not None
+                    and file_type is not None
+                    and (source_file_type := SourceFileType.from_lowercase_key(file_type))
+                    is not None
+                ):
+                    debug_ids_with_types.add((source_file_type, debug_id))
+
+        return bundle_id, debug_ids_with_types
+
+    def _extract_bundle_id(self):
+        bundle_id = self.manifest.get("debug_id")
+
+        if bundle_id is not None:
+            bundle_id = self.normalize_debug_id(bundle_id)
+
+        return bundle_id
+
     def get_files(self) -> Dict[str, dict]:
     def get_files(self) -> Dict[str, dict]:
         return self.manifest.get("files", {})
         return self.manifest.get("files", {})
 
 
@@ -329,4 +372,5 @@ class ArtifactBundleArchive:
     def get_file_url_by_file_path(self, file_path):
     def get_file_url_by_file_path(self, file_path):
         files = self.manifest.get("files", {})
         files = self.manifest.get("files", {})
         file_info = files.get(file_path, {})
         file_info = files.get(file_path, {})
+
         return file_info.get("url")
         return file_info.get("url")

+ 3 - 0
src/sentry/models/releasefile.py

@@ -198,6 +198,9 @@ class ReleaseArchive:
         return self
         return self
 
 
     def __exit__(self, exc, value, tb):
     def __exit__(self, exc, value, tb):
+        self.close()
+
+    def close(self):
         self._zip_file.close()
         self._zip_file.close()
         self._fileobj.close()
         self._fileobj.close()
 
 

+ 499 - 466
src/sentry/tasks/assemble.py

@@ -3,16 +3,15 @@ from __future__ import annotations
 import hashlib
 import hashlib
 import logging
 import logging
 import uuid
 import uuid
+from abc import ABC, abstractmethod
 from datetime import datetime
 from datetime import datetime
 from os import path
 from os import path
-from typing import List, Optional, Set, Tuple
+from typing import IO, List, NamedTuple, Optional, Tuple
 
 
 import sentry_sdk
 import sentry_sdk
 from django.db import IntegrityError, router
 from django.db import IntegrityError, router
 from django.db.models import Q
 from django.db.models import Q
 from django.utils import timezone
 from django.utils import timezone
-from symbolic.debuginfo import normalize_debug_id
-from symbolic.exceptions import SymbolicError
 
 
 from sentry import analytics, features, options
 from sentry import analytics, features, options
 from sentry.api.serializers import serialize
 from sentry.api.serializers import serialize
@@ -23,11 +22,11 @@ from sentry.models.artifactbundle import (
     INDEXING_THRESHOLD,
     INDEXING_THRESHOLD,
     NULL_STRING,
     NULL_STRING,
     ArtifactBundle,
     ArtifactBundle,
+    ArtifactBundleArchive,
     ArtifactBundleIndexingState,
     ArtifactBundleIndexingState,
     DebugIdArtifactBundle,
     DebugIdArtifactBundle,
     ProjectArtifactBundle,
     ProjectArtifactBundle,
     ReleaseArtifactBundle,
     ReleaseArtifactBundle,
-    SourceFileType,
 )
 )
 from sentry.models.releasefile import ReleaseArchive, update_artifact_index
 from sentry.models.releasefile import ReleaseArchive, update_artifact_index
 from sentry.tasks.base import instrumented_task
 from sentry.tasks.base import instrumented_task
@@ -53,6 +52,86 @@ class AssembleTask:
     ARTIFACT_BUNDLE = "organization.artifact_bundle"  # Artifact bundle upload
     ARTIFACT_BUNDLE = "organization.artifact_bundle"  # Artifact bundle upload
 
 
 
 
+class AssembleResult(NamedTuple):
+    # File object stored in the database.
+    bundle: File
+    # Temporary in-memory object representing the file used for efficiency.
+    bundle_temp_file: IO
+
+    def delete_bundle(self):
+        self.bundle.delete()
+
+
+def assemble_file(
+    task, org_or_project, name, checksum, chunks, file_type
+) -> Optional[AssembleResult]:
+    """
+    Verifies and assembles a file model from chunks.
+
+    This downloads all chunks from blob store to verify their integrity and
+    associates them with a created file model. Additionally, it assembles the
+    full file in a temporary location and verifies the complete content hash.
+
+    Returns a tuple ``(File, TempFile)`` on success, or ``None`` on error.
+    """
+    from sentry.models import AssembleChecksumMismatch, File, FileBlob, Project
+
+    if isinstance(org_or_project, Project):
+        organization = org_or_project.organization
+    else:
+        organization = org_or_project
+
+    # Load all FileBlobs from db since we can be sure here we already own all chunks need to build the file.
+    file_blobs = FileBlob.objects.filter(checksum__in=chunks).values_list("id", "checksum", "size")
+
+    # Reject all files that exceed the maximum allowed size for this organization.
+    file_size = sum(x[2] for x in file_blobs)
+    if file_size > get_max_file_size(organization):
+        set_assemble_status(
+            task,
+            org_or_project.id,
+            checksum,
+            ChunkFileState.ERROR,
+            detail="File exceeds maximum size",
+        )
+
+        return None
+
+    # Sanity check. In case not all blobs exist at this point we have a race condition.
+    if {x[1] for x in file_blobs} != set(chunks):
+        set_assemble_status(
+            task,
+            org_or_project.id,
+            checksum,
+            ChunkFileState.ERROR,
+            detail="Not all chunks available for assembling",
+        )
+
+        return None
+
+    # Ensure blobs are in the order and duplication in which they were
+    # transmitted. Otherwise, we would assemble the file in the wrong order.
+    ids_by_checksum = {chks: id for id, chks, _ in file_blobs}
+    file_blob_ids = [ids_by_checksum[c] for c in chunks]
+
+    file = File.objects.create(name=name, checksum=checksum, type=file_type)
+    try:
+        temp_file = file.assemble_from_file_blob_ids(file_blob_ids, checksum)
+    except AssembleChecksumMismatch:
+        file.delete()
+        set_assemble_status(
+            task,
+            org_or_project.id,
+            checksum,
+            ChunkFileState.ERROR,
+            detail="Reported checksum mismatch",
+        )
+    else:
+        file.save()
+
+        return AssembleResult(bundle=file, bundle_temp_file=temp_file)
+
+
 def _get_cache_key(task, scope, checksum):
 def _get_cache_key(task, scope, checksum):
     """Computes the cache key for assemble status.
     """Computes the cache key for assemble status.
 
 
@@ -181,420 +260,449 @@ class AssembleArtifactsError(Exception):
     pass
     pass
 
 
 
 
-def _simple_update(
-    release_file: ReleaseFile, new_file: File, new_archive: ReleaseArchive, additional_fields: dict
-) -> bool:
-    """Update function used in _upsert_release_file"""
-    old_file = release_file.file
-    release_file.update(file=new_file, **additional_fields)
-    old_file.delete()
+class PostAssembler(ABC):
+    def __init__(self, assemble_result: AssembleResult):
+        self.assemble_result = assemble_result
+        self._validate_bundle_guarded()
 
 
-    return True
+    def __enter__(self):
+        return self
 
 
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        # In case any exception happens in the `with` block, we will capture it, and we want to delete the actual `File`
+        # object created in the database, to avoid orphan entries.
+        if exc_type is not None:
+            self._delete_bundle_file_object()
 
 
-def _upsert_release_file(
-    file: File, archive: ReleaseArchive, update_fn, key_fields, additional_fields
-) -> bool:
-    success = False
-    release_file = None
+        self.close()
 
 
-    # Release files must have unique names within their release
-    # and dist. If a matching file already exists, replace its
-    # file with the new one; otherwise create it.
-    try:
-        release_file = ReleaseFile.objects.get(**key_fields)
-    except ReleaseFile.DoesNotExist:
-        try:
-            with atomic_transaction(using=router.db_for_write(ReleaseFile)):
-                release_file = ReleaseFile.objects.create(
-                    file=file, **dict(key_fields, **additional_fields)
-                )
-        except IntegrityError:
-            # NB: This indicates a race, where another assemble task or
-            # file upload job has just created a conflicting file. Since
-            # we're upserting here anyway, yield to the faster actor and
-            # do not try again.
-            file.delete()
-        else:
-            success = True
-    else:
-        success = update_fn(release_file, file, archive, additional_fields)
+    def _delete_bundle_file_object(self):
+        self.assemble_result.delete_bundle()
 
 
-    return success
+    def _validate_bundle_guarded(self):
+        try:
+            self._validate_bundle()
+        except Exception:
+            metrics.incr("tasks.assemble.invalid_bundle")
+            # In case the bundle is invalid, we want to delete the actual `File` object created in the database, to
+            # avoid orphan entries.
+            self._delete_bundle_file_object()
+            raise AssembleArtifactsError("the bundle is invalid")
+
+    @abstractmethod
+    def _validate_bundle(self):
+        pass
+
+    @abstractmethod
+    def close(self):
+        pass
+
+    @abstractmethod
+    def post_assemble(self):
+        pass
+
+
+class ReleaseBundlePostAssembler(PostAssembler):
+    def __init__(self, assemble_result: AssembleResult, organization: Organization, version: str):
+        super().__init__(assemble_result)
+        self.organization = organization
+        self.version = version
+
+    def _validate_bundle(self):
+        self.archive = ReleaseArchive(self.assemble_result.bundle_temp_file)
+        metrics.incr(
+            "tasks.assemble.release_bundle.artifact_count", amount=self.archive.artifact_count
+        )
 
 
+    def close(self):
+        self.archive.close()
 
 
-def get_artifact_basename(url):
-    return url.rsplit("/", 1)[-1]
+    def post_assemble(self):
+        with metrics.timer("tasks.assemble.release_bundle"):
+            self._create_release_file()
 
 
+    def _create_release_file(self):
+        manifest = self.archive.manifest
 
 
-def _store_single_files(archive: ReleaseArchive, meta: dict, count_as_artifacts: bool):
-    try:
-        temp_dir = archive.extract()
-    except Exception:
-        raise AssembleArtifactsError("failed to extract bundle")
+        if manifest.get("org") != self.organization.slug:
+            raise AssembleArtifactsError("organization does not match uploaded bundle")
 
 
-    with temp_dir:
-        artifacts = archive.manifest.get("files", {})
-        for rel_path, artifact in artifacts.items():
-            artifact_url = artifact.get("url", rel_path)
-            artifact_basename = get_artifact_basename(artifact_url)
+        if manifest.get("release") != self.version:
+            raise AssembleArtifactsError("release does not match uploaded bundle")
 
 
-            file = File.objects.create(
-                name=artifact_basename, type="release.file", headers=artifact.get("headers", {})
+        try:
+            release = Release.objects.get(
+                organization_id=self.organization.id, version=self.version
             )
             )
+        except Release.DoesNotExist:
+            raise AssembleArtifactsError("release does not exist")
 
 
-            full_path = path.join(temp_dir.name, rel_path)
-            with open(full_path, "rb") as fp:
-                file.putfile(fp, logger=logger)
+        dist_name = manifest.get("dist")
+        dist = release.add_dist(dist_name) if dist_name else None
 
 
-            kwargs = dict(meta, name=artifact_url)
-            extra_fields = {"artifact_count": 1 if count_as_artifacts else 0}
-            _upsert_release_file(file, None, _simple_update, kwargs, extra_fields)
+        min_artifact_count = options.get("processing.release-archive-min-files")
+        saved_as_archive = False
 
 
+        if self.archive.artifact_count >= min_artifact_count:
+            try:
+                update_artifact_index(release, dist, self.assemble_result.bundle)
+                saved_as_archive = True
+            except Exception as exc:
+                logger.error("Unable to update artifact index", exc_info=exc)
+
+        if not saved_as_archive:
+            meta = {
+                "organization_id": self.organization.id,
+                "release_id": release.id,
+                "dist_id": dist.id if dist else dist,
+            }
+            self._store_single_files(meta, True)
+
+    def _store_single_files(self, meta: dict, count_as_artifacts: bool):
+        try:
+            temp_dir = self.archive.extract()
+        except Exception:
+            raise AssembleArtifactsError("failed to extract bundle")
 
 
-def _normalize_headers(headers: dict) -> dict:
-    return {k.lower(): v for k, v in headers.items()}
+        with temp_dir:
+            artifacts = self.archive.manifest.get("files", {})
+            for rel_path, artifact in artifacts.items():
+                artifact_url = artifact.get("url", rel_path)
+                artifact_basename = self._get_artifact_basename(artifact_url)
 
 
+                file = File.objects.create(
+                    name=artifact_basename, type="release.file", headers=artifact.get("headers", {})
+                )
 
 
-def _normalize_debug_id(debug_id: Optional[str]) -> Optional[str]:
-    try:
-        return normalize_debug_id(debug_id)
-    except SymbolicError:
-        return None
+                full_path = path.join(temp_dir.name, rel_path)
+                with open(full_path, "rb") as fp:
+                    file.putfile(fp, logger=logger)
 
 
+                kwargs = dict(meta, name=artifact_url)
+                extra_fields = {"artifact_count": 1 if count_as_artifacts else 0}
+                self._upsert_release_file(file, self._simple_update, kwargs, extra_fields)
 
 
-def _extract_debug_ids_from_manifest(
-    manifest: dict,
-) -> Tuple[Optional[str], Set[Tuple[SourceFileType, str]]]:
-    # We use a set, since we might have the same debug_id and file_type.
-    debug_ids_with_types = set()
-
-    # We also want to extract the bundle_id which is also known as the bundle debug_id. This id is used to uniquely
-    # identify a specific ArtifactBundle in case for example of future deletion.
-    #
-    # If no id is found, it means that we must have an associated release to this ArtifactBundle, through the
-    # ReleaseArtifactBundle table.
-    bundle_id = manifest.get("debug_id")
-    if bundle_id is not None:
-        bundle_id = _normalize_debug_id(bundle_id)
-
-    files = manifest.get("files", {})
-    for file_path, info in files.items():
-        headers = _normalize_headers(info.get("headers", {}))
-        if (debug_id := headers.get("debug-id")) is not None:
-            debug_id = _normalize_debug_id(debug_id)
-            file_type = info.get("type")
-            if (
-                debug_id is not None
-                and file_type is not None
-                and (source_file_type := SourceFileType.from_lowercase_key(file_type)) is not None
-            ):
-                debug_ids_with_types.add((source_file_type, debug_id))
-
-    return bundle_id, debug_ids_with_types
-
-
-def _remove_duplicate_artifact_bundles(org_id: int, ids: List[int]):
-    # In case there are no ids to delete, we don't want to run the query, otherwise it will result in a deletion of
-    # all ArtifactBundle(s) with the specific bundle_id.
-    if not ids:
-        return
-
-    # Even though we delete via a QuerySet the associated file is also deleted, because django will still
-    # fire the on_delete signal.
-    ArtifactBundle.objects.filter(Q(id__in=ids), organization_id=org_id).delete()
-
-
-def _bind_or_create_artifact_bundle(
-    bundle_id: str | None,
-    date_added: datetime,
-    org_id: int,
-    archive_file: File,
-    artifact_count: int,
-) -> Tuple[ArtifactBundle, bool]:
-    existing_artifact_bundles = list(
-        ArtifactBundle.objects.filter(organization_id=org_id, bundle_id=bundle_id)
-    )
+    @staticmethod
+    def _get_artifact_basename(url):
+        return url.rsplit("/", 1)[-1]
 
 
-    if len(existing_artifact_bundles) == 0:
-        existing_artifact_bundle = None
-    else:
-        existing_artifact_bundle = existing_artifact_bundles.pop()
-        # We want to remove all the duplicate artifact bundles that have the same bundle_id.
-        _remove_duplicate_artifact_bundles(
-            org_id=org_id, ids=list(map(lambda value: value.id, existing_artifact_bundles))
-        )
+    @staticmethod
+    def _upsert_release_file(file: File, update_fn, key_fields, additional_fields) -> bool:
+        success = False
+        release_file = None
 
 
-    # In case there is not ArtifactBundle with a specific bundle_id, we just create it and return.
-    if existing_artifact_bundle is None:
-        artifact_bundle = ArtifactBundle.objects.create(
-            organization_id=org_id,
-            # In case we didn't find the bundle_id in the manifest, we will just generate our own.
-            bundle_id=bundle_id or uuid.uuid4().hex,
-            file=archive_file,
-            artifact_count=artifact_count,
-            # By default, a bundle is not indexed.
-            indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
-            # "date_added" and "date_uploaded" will have the same value, but they will diverge once renewal is performed
-            # by other parts of Sentry. Renewal is required since we want to expire unused bundles after ~90 days.
-            date_added=date_added,
-            date_uploaded=date_added,
-            # When creating a new bundle by default its last modified date corresponds to the creation date.
-            date_last_modified=date_added,
+        # Release files must have unique names within their release
+        # and dist. If a matching file already exists, replace its
+        # file with the new one; otherwise create it.
+        try:
+            release_file = ReleaseFile.objects.get(**key_fields)
+        except ReleaseFile.DoesNotExist:
+            try:
+                with atomic_transaction(using=router.db_for_write(ReleaseFile)):
+                    release_file = ReleaseFile.objects.create(
+                        file=file, **dict(key_fields, **additional_fields)
+                    )
+            except IntegrityError:
+                # NB: This indicates a race, where another assemble task or
+                # file upload job has just created a conflicting file. Since
+                # we're upserting here anyway, yield to the faster actor and
+                # do not try again.
+                file.delete()
+            else:
+                success = True
+        else:
+            success = update_fn(release_file, file, additional_fields)
+
+        return success
+
+    @staticmethod
+    def _simple_update(release_file: ReleaseFile, new_file: File, additional_fields: dict) -> bool:
+        """Update function used in _upsert_release_file"""
+        old_file = release_file.file
+        release_file.update(file=new_file, **additional_fields)
+        old_file.delete()
+
+        return True
+
+
+class ArtifactBundlePostAssembler(PostAssembler):
+    def __init__(
+        self,
+        assemble_result: AssembleResult,
+        organization: Organization,
+        release: Optional[str],
+        dist: Optional[str],
+        project_ids: Optional[List[int]],
+    ):
+        super().__init__(assemble_result)
+        self.organization = organization
+        self.release = release
+        self.dist = dist
+        self.projects_ids = project_ids
+
+    def _validate_bundle(self):
+        self.archive = ArtifactBundleArchive(self.assemble_result.bundle_temp_file)
+        metrics.incr(
+            "tasks.assemble.artifact_bundle.artifact_count", amount=self.archive.artifact_count
         )
         )
 
 
-        return artifact_bundle, True
-    else:
-        # We store a reference to the previous file to which the bundle was pointing to.
-        existing_file = existing_artifact_bundle.file
-
-        # Only if the file objects are different we want to update the database, otherwise we will end up deleting
-        # a newly bound file.
-        if existing_file != archive_file:
-            # In case there is an ArtifactBundle with a specific bundle_id, we want to change its underlying File model
-            # with its corresponding artifact count and also update the dates.
-            existing_artifact_bundle.update(
-                file=archive_file,
-                artifact_count=artifact_count,
-                date_added=date_added,
-                # If you upload a bundle which already exists, we track this as a modification since our goal is to show
-                # first all the bundles that have had the most recent activity.
-                date_last_modified=date_added,
-            )
-
-            # We now delete that file, in order to avoid orphan files in the database.
-            existing_file.delete()
-
-        return existing_artifact_bundle, False
-
-
-def _index_bundle_if_needed(org_id: int, release: str, dist: str, date_snapshot: datetime):
-    # We collect how many times we tried to perform indexing.
-    metrics.incr("tasks.assemble.artifact_bundle.try_indexing")
-
-    # We get the number of associations by upper bounding the query to the "date_snapshot", which is done to prevent
-    # the case in which concurrent updates on the database will lead to problems. For example if we have a threshold
-    # of 1, and we have two uploads happening concurrently and the database will contain two associations even when
-    # the assembling of the first upload is running this query, we will have the first upload task see 2 associations
-    # , thus it will trigger the indexing. The same will also happen for the second upload but in reality we just
-    # want the second upload to perform indexing.
-    #
-    # This date implementation might still lead to issues, more specifically in the case in which the
-    # "date_last_modified" is the same but the probability of that happening is so low that it's a negligible
-    # detail for now, as long as the indexing is idempotent.
-    associated_bundles = list(
-        ArtifactBundle.objects.filter(
-            organization_id=org_id,
-            # Since the `date_snapshot` will be the same as `date_last_modified` of the last bundle uploaded in this
-            # async job, we want to use the `<=` condition for time, effectively saying give me all the bundles that
-            # were created now or in the past.
-            date_last_modified__lte=date_snapshot,
-            releaseartifactbundle__release_name=release,
-            releaseartifactbundle__dist_name=dist,
-        )
-    )
+    def close(self):
+        self.archive.close()
 
 
-    # In case we didn't surpass the threshold, indexing will not happen.
-    if len(associated_bundles) <= INDEXING_THRESHOLD:
-        return
+    def post_assemble(self):
+        with metrics.timer("tasks.assemble.artifact_bundle"):
+            self._create_artifact_bundle()
 
 
-    # We collect how many times we run indexing.
-    metrics.incr("tasks.assemble.artifact_bundle.start_indexing")
+    def _create_artifact_bundle(self) -> None:
+        # We want to give precedence to the request fields and only if they are unset fallback to the manifest's
+        # contents.
+        self.release = self.release or self.archive.manifest.get("release")
+        self.dist = self.dist or self.archive.manifest.get("dist")
 
 
-    # We want to measure how much time it takes to perform indexing.
-    with metrics.timer("tasks.assemble.artifact_bundle.index_bundles"):
-        # We now call the indexing logic with all the bundles that require indexing. We might need to make this call
-        # async if we see a performance degradation of assembling.
-        try:
-            # We only want to get the bundles that are not indexed. Keep in mind that this query is concurrency unsafe
-            # since in the meanwhile the bundles might be modified and the modification will not be reflected in the
-            # objects that we are iterating here.
-            #
-            # In case of concurrency issues, we might do extra work but due to the idempotency of the indexing function
-            # no consistency issues should arise.
-            bundles_to_index = [
-                associated_bundle
-                for associated_bundle in associated_bundles
-                if associated_bundle.indexing_state == ArtifactBundleIndexingState.NOT_INDEXED.value
-            ]
-
-            # We want to index only if we have bundles to index.
-            if len(bundles_to_index) > 0:
-                index_artifact_bundles_for_release(
-                    organization_id=org_id,
-                    artifact_bundles=bundles_to_index,
-                    release=release,
-                    dist=dist,
-                )
-        except Exception as e:
-            # We want to capture any exception happening during indexing, since it's crucial to understand if
-            # the system is behaving well because the database can easily end up in an inconsistent state.
-            metrics.incr("tasks.assemble.artifact_bundle.index_artifact_bundles_error")
-            sentry_sdk.capture_exception(e)
-
-
-def _create_artifact_bundle(
-    release: Optional[str],
-    dist: Optional[str],
-    org_id: int,
-    project_ids: Optional[List[int]],
-    archive_file: File,
-    artifact_count: int,
-) -> None:
-    with ReleaseArchive(archive_file.getfile()) as archive:
         # We want to measure how much time it takes to extract debug ids from manifest.
         # We want to measure how much time it takes to extract debug ids from manifest.
         with metrics.timer("tasks.assemble.artifact_bundle.extract_debug_ids"):
         with metrics.timer("tasks.assemble.artifact_bundle.extract_debug_ids"):
-            bundle_id, debug_ids_with_types = _extract_debug_ids_from_manifest(archive.manifest)
+            bundle_id, debug_ids_with_types = self.archive.extract_debug_ids_from_manifest()
 
 
         analytics.record(
         analytics.record(
             "artifactbundle.manifest_extracted",
             "artifactbundle.manifest_extracted",
-            organization_id=org_id,
-            project_ids=project_ids,
+            organization_id=self.organization.id,
+            project_ids=self.projects_ids,
             has_debug_ids=len(debug_ids_with_types) > 0,
             has_debug_ids=len(debug_ids_with_types) > 0,
         )
         )
 
 
-        # We want to save an artifact bundle only if we have found debug ids in the manifest or if the user specified
-        # a release for the upload.
-        if len(debug_ids_with_types) > 0 or release:
-            # We take a snapshot in time in order to have consistent values in the database.
-            date_snapshot = timezone.now()
-
-            # We have to add this dictionary to both `values` and `defaults` since we want to update the date_added in
-            # case of a re-upload because the `date_added` of the ArtifactBundle is also updated.
-            new_date_added = {"date_added": date_snapshot}
-
-            # Since dist is non-nullable in the db, but we actually use a sentinel value to represent nullability, here
-            # we have to do the conversion in case it is "None".
-            dist = dist or NULL_STRING
-            release = release or NULL_STRING
-
-            # We want to run everything in a transaction, since we don't want the database to be in an inconsistent
-            # state after all of these updates.
-            with atomic_transaction(
-                using=(
-                    router.db_for_write(ArtifactBundle),
-                    router.db_for_write(File),
-                    router.db_for_write(ReleaseArtifactBundle),
-                    router.db_for_write(ProjectArtifactBundle),
-                    router.db_for_write(DebugIdArtifactBundle),
+        # We don't allow the creation of a bundle if no debug ids and release are present, since we are not able to
+        # efficiently index
+        if len(debug_ids_with_types) == 0 and not self.release:
+            raise AssembleArtifactsError(
+                "uploading a bundle without debug ids or release is prohibited"
+            )
+
+        # We take a snapshot in time in order to have consistent values in the database.
+        date_snapshot = timezone.now()
+
+        # We have to add this dictionary to both `values` and `defaults` since we want to update the date_added in
+        # case of a re-upload because the `date_added` of the ArtifactBundle is also updated.
+        new_date_added = {"date_added": date_snapshot}
+
+        # We want to run everything in a transaction, since we don't want the database to be in an inconsistent
+        # state after all of these updates.
+        with atomic_transaction(
+            using=(
+                router.db_for_write(ArtifactBundle),
+                router.db_for_write(File),
+                router.db_for_write(ReleaseArtifactBundle),
+                router.db_for_write(ProjectArtifactBundle),
+                router.db_for_write(DebugIdArtifactBundle),
+            )
+        ):
+            artifact_bundle, created = self._bind_or_create_artifact_bundle(
+                bundle_id=bundle_id, date_added=date_snapshot
+            )
+
+            # If a release version is passed, we want to create the weak association between a bundle and a release.
+            if self.release:
+                ReleaseArtifactBundle.objects.create_or_update(
+                    organization_id=self.organization.id,
+                    release_name=self.release,
+                    # In case no dist is provided, we will fall back to "" which is the NULL equivalent for our
+                    # tables.
+                    dist_name=self.dist or NULL_STRING,
+                    artifact_bundle=artifact_bundle,
+                    values=new_date_added,
+                    defaults=new_date_added,
                 )
                 )
-            ):
-                artifact_bundle, created = _bind_or_create_artifact_bundle(
-                    bundle_id=bundle_id,
-                    date_added=date_snapshot,
-                    org_id=org_id,
-                    archive_file=archive_file,
-                    artifact_count=artifact_count,
+
+            for project_id in self.projects_ids or ():
+                ProjectArtifactBundle.objects.create_or_update(
+                    organization_id=self.organization.id,
+                    project_id=project_id,
+                    artifact_bundle=artifact_bundle,
+                    values=new_date_added,
+                    defaults=new_date_added,
                 )
                 )
 
 
-                # If a release version is passed, we want to create the weak association between a bundle and a release.
-                if release:
-                    ReleaseArtifactBundle.objects.create_or_update(
-                        organization_id=org_id,
-                        release_name=release,
-                        # In case no dist is provided, we will fall back to "" which is the NULL equivalent for our
-                        # tables.
-                        dist_name=dist,
-                        artifact_bundle=artifact_bundle,
-                        values=new_date_added,
-                        defaults=new_date_added,
-                    )
+            for source_file_type, debug_id in debug_ids_with_types:
+                DebugIdArtifactBundle.objects.create_or_update(
+                    organization_id=self.organization.id,
+                    debug_id=debug_id,
+                    artifact_bundle=artifact_bundle,
+                    source_file_type=source_file_type.value,
+                    values=new_date_added,
+                    defaults=new_date_added,
+                )
 
 
-                for project_id in project_ids or ():
-                    ProjectArtifactBundle.objects.create_or_update(
-                        organization_id=org_id,
-                        project_id=project_id,
-                        artifact_bundle=artifact_bundle,
-                        values=new_date_added,
-                        defaults=new_date_added,
-                    )
+        try:
+            organization = Organization.objects.get_from_cache(id=self.organization.id)
+        except Organization.DoesNotExist:
+            organization = None
+
+        # If we don't have a release set, we don't want to run indexing, since we need at least the release for
+        # fast indexing performance. We might though run indexing if a customer has debug ids in the manifest, since
+        # we want to have a fallback mechanism in case they have problems setting them up (e.g., SDK version does
+        # not support them, some files were not injected...).
+        if (
+            organization is not None
+            and self.release
+            and features.has("organizations:sourcemaps-bundle-indexing", organization, actor=None)
+        ):
+            # After we committed the transaction we want to try and run indexing by passing non-null release and
+            # dist. The dist here can be "" since it will be the equivalent of NULL for the db query.
+            self._index_bundle_if_needed(
+                release=self.release,
+                dist=(self.dist or NULL_STRING),
+                date_snapshot=date_snapshot,
+            )
 
 
-                for source_file_type, debug_id in debug_ids_with_types:
-                    DebugIdArtifactBundle.objects.create_or_update(
-                        organization_id=org_id,
-                        debug_id=debug_id,
-                        artifact_bundle=artifact_bundle,
-                        source_file_type=source_file_type.value,
-                        values=new_date_added,
-                        defaults=new_date_added,
-                    )
+    def _bind_or_create_artifact_bundle(
+        self, bundle_id: Optional[str], date_added: datetime
+    ) -> Tuple[ArtifactBundle, bool]:
+        existing_artifact_bundles = list(
+            ArtifactBundle.objects.filter(organization_id=self.organization.id, bundle_id=bundle_id)
+        )
 
 
-            try:
-                organization = Organization.objects.get_from_cache(id=org_id)
-            except Organization.DoesNotExist:
-                organization = None
-
-            # If we don't have a release set, we don't want to run indexing, since we need at least the release for
-            # fast indexing performance. We might though run indexing if a customer has debug ids in the manifest, since
-            # we want to have a fallback mechanism in case they have problems setting them up (e.g., SDK version does
-            # not support them, some files were not injected...).
-            if (
-                organization is not None
-                and release
-                and features.has(
-                    "organizations:sourcemaps-bundle-indexing", organization, actor=None
-                )
-            ):
-                # After we committed the transaction we want to try and run indexing.
-                _index_bundle_if_needed(
-                    org_id=org_id, release=release, dist=dist, date_snapshot=date_snapshot
-                )
+        if len(existing_artifact_bundles) == 0:
+            existing_artifact_bundle = None
         else:
         else:
-            raise AssembleArtifactsError(
-                "uploading a bundle without debug ids or release is prohibited"
+            existing_artifact_bundle = existing_artifact_bundles.pop()
+            # We want to remove all the duplicate artifact bundles that have the same bundle_id.
+            self._remove_duplicate_artifact_bundles(
+                ids=list(map(lambda value: value.id, existing_artifact_bundles))
             )
             )
 
 
+        # In case there is not ArtifactBundle with a specific bundle_id, we just create it and return.
+        if existing_artifact_bundle is None:
+            artifact_bundle = ArtifactBundle.objects.create(
+                organization_id=self.organization.id,
+                # In case we didn't find the bundle_id in the manifest, we will just generate our own.
+                bundle_id=bundle_id or uuid.uuid4().hex,
+                file=self.assemble_result.bundle,
+                artifact_count=self.archive.artifact_count,
+                # By default, a bundle is not indexed.
+                indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
+                # "date_added" and "date_uploaded" will have the same value, but they will diverge once renewal is
+                # performed by other parts of Sentry. Renewal is required since we want to expire unused bundles
+                # after ~90 days.
+                date_added=date_added,
+                date_uploaded=date_added,
+                # When creating a new bundle by default its last modified date corresponds to the creation date.
+                date_last_modified=date_added,
+            )
 
 
-def handle_assemble_for_release_file(bundle, archive, organization, version):
-    manifest = archive.manifest
+            return artifact_bundle, True
+        else:
+            # We store a reference to the previous file to which the bundle was pointing to.
+            existing_file = existing_artifact_bundle.file
+
+            # Only if the file objects are different we want to update the database, otherwise we will end up deleting
+            # a newly bound file.
+            if existing_file != self.assemble_result.bundle:
+                # In case there is an ArtifactBundle with a specific bundle_id, we want to change its underlying File
+                # model with its corresponding artifact count and also update the dates.
+                existing_artifact_bundle.update(
+                    file=self.assemble_result.bundle,
+                    artifact_count=self.archive.artifact_count,
+                    date_added=date_added,
+                    # If you upload a bundle which already exists, we track this as a modification since our goal is
+                    # to show first all the bundles that have had the most recent activity.
+                    date_last_modified=date_added,
+                )
 
 
-    if manifest.get("org") != organization.slug:
-        raise AssembleArtifactsError("organization does not match uploaded bundle")
+                # We now delete that file, in order to avoid orphan files in the database.
+                existing_file.delete()
 
 
-    if manifest.get("release") != version:
-        raise AssembleArtifactsError("release does not match uploaded bundle")
+            return existing_artifact_bundle, False
 
 
-    try:
-        release = Release.objects.get(organization_id=organization.id, version=version)
-    except Release.DoesNotExist:
-        raise AssembleArtifactsError("release does not exist")
+    def _remove_duplicate_artifact_bundles(self, ids: List[int]):
+        # In case there are no ids to delete, we don't want to run the query, otherwise it will result in a deletion of
+        # all ArtifactBundle(s) with the specific bundle_id.
+        if not ids:
+            return
 
 
-    dist_name = manifest.get("dist")
-    dist = release.add_dist(dist_name) if dist_name else None
+        # Even though we delete via a QuerySet the associated file is also deleted, because django will still
+        # fire the on_delete signal.
+        ArtifactBundle.objects.filter(Q(id__in=ids), organization_id=self.organization.id).delete()
+
+    def _index_bundle_if_needed(self, release: str, dist: str, date_snapshot: datetime):
+        # We collect how many times we tried to perform indexing.
+        metrics.incr("tasks.assemble.artifact_bundle.try_indexing")
+
+        # We get the number of associations by upper bounding the query to the "date_snapshot", which is done to
+        # prevent the case in which concurrent updates on the database will lead to problems. For example if we have
+        # a threshold of 1, and we have two uploads happening concurrently and the database will contain two
+        # associations even when the assembling of the first upload is running this query, we will have the first
+        # upload task see 2 associations , thus it will trigger the indexing. The same will also happen for the
+        # second upload but in reality we just want the second upload to perform indexing.
+        #
+        # This date implementation might still lead to issues, more specifically in the case in which the
+        # "date_last_modified" is the same but the probability of that happening is so low that it's a negligible
+        # detail for now, as long as the indexing is idempotent.
+        associated_bundles = list(
+            ArtifactBundle.objects.filter(
+                organization_id=self.organization.id,
+                # Since the `date_snapshot` will be the same as `date_last_modified` of the last bundle uploaded in this
+                # async job, we want to use the `<=` condition for time, effectively saying give me all the bundles that
+                # were created now or in the past.
+                date_last_modified__lte=date_snapshot,
+                releaseartifactbundle__release_name=release,
+                releaseartifactbundle__dist_name=dist,
+            )
+        )
+
+        # In case we didn't surpass the threshold, indexing will not happen.
+        if len(associated_bundles) <= INDEXING_THRESHOLD:
+            return
 
 
-    min_artifact_count = options.get("processing.release-archive-min-files")
-    saved_as_archive = False
+        # We collect how many times we run indexing.
+        metrics.incr("tasks.assemble.artifact_bundle.start_indexing")
 
 
-    if archive.artifact_count >= min_artifact_count:
-        try:
-            update_artifact_index(release, dist, bundle)
-            saved_as_archive = True
-        except Exception as exc:
-            logger.error("Unable to update artifact index", exc_info=exc)
-
-    if not saved_as_archive:
-        meta = {
-            "organization_id": organization.id,
-            "release_id": release.id,
-            "dist_id": dist.id if dist else dist,
-        }
-        _store_single_files(archive, meta, True)
-
-
-def handle_assemble_for_artifact_bundle(bundle, archive, organization, version, dist, project_ids):
-    # We want to give precedence to the request fields and only if they are unset fallback to the manifest's
-    # contents.
-    version = version or archive.manifest.get("release")
-    dist = dist or archive.manifest.get("dist")
-
-    _create_artifact_bundle(
-        release=version,
-        dist=dist,
-        org_id=organization.id,
-        project_ids=project_ids,
-        archive_file=bundle,
-        artifact_count=archive.artifact_count,
-    )
+        # We want to measure how much time it takes to perform indexing.
+        with metrics.timer("tasks.assemble.artifact_bundle.index_bundles"):
+            # We now call the indexing logic with all the bundles that require indexing. We might need to make this call
+            # async if we see a performance degradation of assembling.
+            try:
+                # We only want to get the bundles that are not indexed. Keep in mind that this query is concurrency
+                # unsafe since in the meanwhile the bundles might be modified and the modification will not be
+                # reflected in the objects that we are iterating here.
+                #
+                # In case of concurrency issues, we might do extra work but due to the idempotency of the indexing
+                # function no consistency issues should arise.
+                bundles_to_index = [
+                    associated_bundle
+                    for associated_bundle in associated_bundles
+                    if associated_bundle.indexing_state
+                    == ArtifactBundleIndexingState.NOT_INDEXED.value
+                ]
+
+                # We want to index only if we have bundles to index.
+                if len(bundles_to_index) > 0:
+                    index_artifact_bundles_for_release(
+                        organization_id=self.organization.id,
+                        artifact_bundles=bundles_to_index,
+                        release=release,
+                        dist=dist,
+                    )
+            except Exception as e:
+                # We want to capture any exception happening during indexing, since it's crucial to understand if
+                # the system is behaving well because the database can easily end up in an inconsistent state.
+                metrics.incr("tasks.assemble.artifact_bundle.index_artifact_bundles_error")
+                sentry_sdk.capture_exception(e)
+
+
+def prepare_post_assembler(
+    assemble_result, organization, release, dist, project_ids, upload_as_artifact_bundle
+) -> PostAssembler:
+    if upload_as_artifact_bundle:
+        return ArtifactBundlePostAssembler(
+            assemble_result=assemble_result,
+            organization=organization,
+            release=release,
+            dist=dist,
+            project_ids=project_ids,
+        )
+    else:
+        return ReleaseBundlePostAssembler(
+            assemble_result=assemble_result, organization=organization, version=release
+        )
 
 
 
 
 @instrumented_task(name="sentry.tasks.assemble.assemble_artifacts", queue="assemble")
 @instrumented_task(name="sentry.tasks.assemble.assemble_artifacts", queue="assemble")
@@ -631,43 +739,36 @@ def assemble_artifacts(
         file_type = "artifact.bundle" if upload_as_artifact_bundle else "release.bundle"
         file_type = "artifact.bundle" if upload_as_artifact_bundle else "release.bundle"
 
 
         # Assemble the chunks into a temporary file
         # Assemble the chunks into a temporary file
-        rv = assemble_file(
-            assemble_task,
-            organization,
-            archive_filename,
-            checksum,
-            chunks,
-            file_type,
+        assemble_result = assemble_file(
+            task=assemble_task,
+            org_or_project=organization,
+            name=archive_filename,
+            checksum=checksum,
+            chunks=chunks,
+            file_type=file_type,
         )
         )
 
 
-        # If not file has been created this means that the file failed to
-        # assemble because of bad input data. In this case, assemble_file
-        # has set the assemble status already.
-        if rv is None:
+        # If not file has been created this means that the file failed to assemble because of bad input data.
+        # In this case, assemble_file has set the assemble status already.
+        if assemble_result is None:
             return
             return
 
 
-        bundle, temp_file = rv
-
-        try:
-            archive = ReleaseArchive(temp_file)
-        except Exception:
-            raise AssembleArtifactsError("failed to open release manifest")
-
-        with archive:
-            if upload_as_artifact_bundle:
-                with metrics.timer("tasks.assemble.artifact_bundle"):
-                    handle_assemble_for_artifact_bundle(
-                        bundle, archive, organization, version, dist, project_ids
-                    )
-            else:
-                with metrics.timer("tasks.assemble.release_bundle"):
-                    handle_assemble_for_release_file(bundle, archive, organization, version)
-
-            metrics.incr("tasks.assemble.extracted_files", amount=archive.artifact_count)
+        # We first want to prepare the post assembler which will take care of validating the archive.
+        with prepare_post_assembler(
+            assemble_result=assemble_result,
+            organization=organization,
+            release=version,
+            dist=dist,
+            project_ids=project_ids,
+            upload_as_artifact_bundle=upload_as_artifact_bundle,
+        ) as post_assembler:
+            # Once the archive is valid, the post assembler can run the post assembling job.
+            post_assembler.post_assemble()
     except AssembleArtifactsError as e:
     except AssembleArtifactsError as e:
         set_assemble_status(assemble_task, org_id, checksum, ChunkFileState.ERROR, detail=str(e))
         set_assemble_status(assemble_task, org_id, checksum, ChunkFileState.ERROR, detail=str(e))
-    except Exception:
-        logger.error("failed to assemble release bundle", exc_info=True)
+    except Exception as e:
+        logger.error("failed to assemble bundle", exc_info=True)
+        sentry_sdk.capture_exception(e)
         set_assemble_status(
         set_assemble_status(
             assemble_task,
             assemble_task,
             org_id,
             org_id,
@@ -677,71 +778,3 @@ def assemble_artifacts(
         )
         )
     else:
     else:
         set_assemble_status(assemble_task, org_id, checksum, ChunkFileState.OK)
         set_assemble_status(assemble_task, org_id, checksum, ChunkFileState.OK)
-
-
-def assemble_file(task, org_or_project, name, checksum, chunks, file_type):
-    """
-    Verifies and assembles a file model from chunks.
-
-    This downloads all chunks from blob store to verify their integrity and
-    associates them with a created file model. Additionally, it assembles the
-    full file in a temporary location and verifies the complete content hash.
-
-    Returns a tuple ``(File, TempFile)`` on success, or ``None`` on error.
-    """
-    from sentry.models import AssembleChecksumMismatch, File, FileBlob, Project
-
-    if isinstance(org_or_project, Project):
-        organization = org_or_project.organization
-    else:
-        organization = org_or_project
-
-    # Load all FileBlobs from db since we can be sure here we already own all
-    # chunks need to build the file
-    file_blobs = FileBlob.objects.filter(checksum__in=chunks).values_list("id", "checksum", "size")
-
-    # Reject all files that exceed the maximum allowed size for this
-    # organization. This value cannot be
-    file_size = sum(x[2] for x in file_blobs)
-    if file_size > get_max_file_size(organization):
-        set_assemble_status(
-            task,
-            org_or_project.id,
-            checksum,
-            ChunkFileState.ERROR,
-            detail="File exceeds maximum size",
-        )
-        return
-
-    # Sanity check.  In case not all blobs exist at this point we have a
-    # race condition.
-    if {x[1] for x in file_blobs} != set(chunks):
-        set_assemble_status(
-            task,
-            org_or_project.id,
-            checksum,
-            ChunkFileState.ERROR,
-            detail="Not all chunks available for assembling",
-        )
-        return
-
-    # Ensure blobs are in the order and duplication in which they were
-    # transmitted. Otherwise, we would assemble the file in the wrong order.
-    ids_by_checksum = {chks: id for id, chks, _ in file_blobs}
-    file_blob_ids = [ids_by_checksum[c] for c in chunks]
-
-    file = File.objects.create(name=name, checksum=checksum, type=file_type)
-    try:
-        temp_file = file.assemble_from_file_blob_ids(file_blob_ids, checksum)
-    except AssembleChecksumMismatch:
-        file.delete()
-        set_assemble_status(
-            task,
-            org_or_project.id,
-            checksum,
-            ChunkFileState.ERROR,
-            detail="Reported checksum mismatch",
-        )
-    else:
-        file.save()
-        return file, temp_file

+ 6 - 2
tests/sentry/api/endpoints/test_dif_assemble.py

@@ -191,9 +191,13 @@ class DifAssembleEndpoint(APITestCase):
             }
             }
         )
         )
 
 
-        file = assemble_file(
+        assemble_result = assemble_file(
             AssembleTask.DIF, self.project, "test", total_checksum, chunks, "project.dif"
             AssembleTask.DIF, self.project, "test", total_checksum, chunks, "project.dif"
-        )[0]
+        )
+
+        assert assemble_result is not None
+
+        file = assemble_result.bundle
         status, _ = get_assemble_status(AssembleTask.DIF, self.project.id, total_checksum)
         status, _ = get_assemble_status(AssembleTask.DIF, self.project.id, total_checksum)
         assert status != ChunkFileState.ERROR
         assert status != ChunkFileState.ERROR
         assert file.checksum == total_checksum
         assert file.checksum == total_checksum

+ 116 - 16
tests/sentry/tasks/test_assemble.py

@@ -19,9 +19,10 @@ from sentry.models.artifactbundle import (
 from sentry.models.debugfile import ProjectDebugFile
 from sentry.models.debugfile import ProjectDebugFile
 from sentry.models.releasefile import read_artifact_index
 from sentry.models.releasefile import read_artifact_index
 from sentry.tasks.assemble import (
 from sentry.tasks.assemble import (
+    ArtifactBundlePostAssembler,
+    AssembleResult,
     AssembleTask,
     AssembleTask,
     ChunkFileState,
     ChunkFileState,
-    _index_bundle_if_needed,
     assemble_artifacts,
     assemble_artifacts,
     assemble_dif,
     assemble_dif,
     assemble_file,
     assemble_file,
@@ -271,6 +272,52 @@ class AssembleArtifactsTest(BaseAssembleTest):
             ReleaseArtifactBundle.objects.all().delete()
             ReleaseArtifactBundle.objects.all().delete()
             ProjectArtifactBundle.objects.all().delete()
             ProjectArtifactBundle.objects.all().delete()
 
 
+    @patch("sentry.tasks.assemble.ArtifactBundlePostAssembler.post_assemble")
+    def test_assembled_bundle_is_deleted_if_post_assembler_error_occurs(self, post_assemble):
+        post_assemble.side_effect = Exception
+
+        bundle_file = self.create_artifact_bundle_zip(
+            fixture_path="artifact_bundle_debug_ids", project=self.project.id
+        )
+        blob1 = FileBlob.from_file(ContentFile(bundle_file))
+        total_checksum = sha1(bundle_file).hexdigest()
+
+        assemble_artifacts(
+            org_id=self.organization.id,
+            project_ids=[self.project.id],
+            version="1.0",
+            dist="android",
+            checksum=total_checksum,
+            chunks=[blob1.checksum],
+            upload_as_artifact_bundle=True,
+        )
+
+        files = File.objects.filter()
+        assert len(files) == 0
+
+    @patch("sentry.tasks.assemble.ArtifactBundleArchive")
+    def test_assembled_bundle_is_deleted_if_archive_is_invalid(self, artifact_bundle_archive):
+        artifact_bundle_archive.side_effect = Exception
+
+        bundle_file = self.create_artifact_bundle_zip(
+            fixture_path="artifact_bundle_debug_ids", project=self.project.id
+        )
+        blob1 = FileBlob.from_file(ContentFile(bundle_file))
+        total_checksum = sha1(bundle_file).hexdigest()
+
+        assemble_artifacts(
+            org_id=self.organization.id,
+            project_ids=[self.project.id],
+            version="1.0",
+            dist="android",
+            checksum=total_checksum,
+            chunks=[blob1.checksum],
+            upload_as_artifact_bundle=True,
+        )
+
+        files = File.objects.filter()
+        assert len(files) == 0
+
     def test_upload_artifacts_with_duplicated_debug_ids(self):
     def test_upload_artifacts_with_duplicated_debug_ids(self):
         bundle_file = self.create_artifact_bundle_zip(
         bundle_file = self.create_artifact_bundle_zip(
             fixture_path="artifact_bundle_duplicated_debug_ids", project=self.project.id
             fixture_path="artifact_bundle_duplicated_debug_ids", project=self.project.id
@@ -756,16 +803,39 @@ class ArtifactBundleIndexingTest(TestCase):
 
 
         return artifact_bundle
         return artifact_bundle
 
 
+    def mock_assemble_result(self) -> AssembleResult:
+        bundle_file = self.create_artifact_bundle_zip(
+            fixture_path="artifact_bundle_debug_ids", project=self.project.id
+        )
+        blob1 = FileBlob.from_file(ContentFile(bundle_file))
+        total_checksum = sha1(bundle_file).hexdigest()
+        return assemble_file(
+            task=AssembleTask.ARTIFACT_BUNDLE,
+            org_or_project=self.organization,
+            name="bundle.zip",
+            checksum=total_checksum,
+            chunks=[blob1.checksum],
+            file_type="artifact.bundle",
+        )
+
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     def test_index_if_needed_with_no_bundles(self, index_artifact_bundles_for_release):
     def test_index_if_needed_with_no_bundles(self, index_artifact_bundles_for_release):
         release = "1.0"
         release = "1.0"
         dist = "android"
         dist = "android"
 
 
-        _index_bundle_if_needed(
-            org_id=self.organization.id, release=release, dist=dist, date_snapshot=datetime.now()
-        )
+        with ArtifactBundlePostAssembler(
+            assemble_result=self.mock_assemble_result(),
+            organization=self.organization,
+            release=release,
+            dist=dist,
+            project_ids=[],
+        ) as post_assembler:
+            post_assembler._index_bundle_if_needed(
+                release=release, dist=dist, date_snapshot=datetime.now()
+            )
 
 
         index_artifact_bundles_for_release.assert_not_called()
         index_artifact_bundles_for_release.assert_not_called()
+        post_assembler.close()
 
 
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     def test_index_if_needed_with_lower_bundles_than_threshold(
     def test_index_if_needed_with_lower_bundles_than_threshold(
@@ -782,11 +852,19 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() - timedelta(hours=1),
             date=datetime.now() - timedelta(hours=1),
         )
         )
 
 
-        _index_bundle_if_needed(
-            org_id=self.organization.id, release=release, dist=dist, date_snapshot=datetime.now()
-        )
+        with ArtifactBundlePostAssembler(
+            assemble_result=self.mock_assemble_result(),
+            organization=self.organization,
+            release=release,
+            dist=dist,
+            project_ids=[],
+        ) as post_assembler:
+            post_assembler._index_bundle_if_needed(
+                release=release, dist=dist, date_snapshot=datetime.now()
+            )
 
 
         index_artifact_bundles_for_release.assert_not_called()
         index_artifact_bundles_for_release.assert_not_called()
+        post_assembler.close()
 
 
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     def test_index_if_needed_with_higher_bundles_than_threshold(
     def test_index_if_needed_with_higher_bundles_than_threshold(
@@ -811,9 +889,16 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() - timedelta(hours=1),
             date=datetime.now() - timedelta(hours=1),
         )
         )
 
 
-        _index_bundle_if_needed(
-            org_id=self.organization.id, release=release, dist=dist, date_snapshot=datetime.now()
-        )
+        with ArtifactBundlePostAssembler(
+            assemble_result=self.mock_assemble_result(),
+            organization=self.organization,
+            release=release,
+            dist=dist,
+            project_ids=[],
+        ) as post_assembler:
+            post_assembler._index_bundle_if_needed(
+                release=release, dist=dist, date_snapshot=datetime.now()
+            )
 
 
         index_artifact_bundles_for_release.assert_called_with(
         index_artifact_bundles_for_release.assert_called_with(
             organization_id=self.organization.id,
             organization_id=self.organization.id,
@@ -821,6 +906,7 @@ class ArtifactBundleIndexingTest(TestCase):
             release=release,
             release=release,
             dist=dist,
             dist=dist,
         )
         )
+        post_assembler.close()
 
 
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
     def test_index_if_needed_with_bundles_already_indexed(self, index_artifact_bundles_for_release):
     def test_index_if_needed_with_bundles_already_indexed(self, index_artifact_bundles_for_release):
@@ -843,9 +929,16 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() - timedelta(hours=1),
             date=datetime.now() - timedelta(hours=1),
         )
         )
 
 
-        _index_bundle_if_needed(
-            org_id=self.organization.id, release=release, dist=dist, date_snapshot=datetime.now()
-        )
+        with ArtifactBundlePostAssembler(
+            assemble_result=self.mock_assemble_result(),
+            organization=self.organization,
+            release=release,
+            dist=dist,
+            project_ids=[],
+        ) as post_assembler:
+            post_assembler._index_bundle_if_needed(
+                release=release, dist=dist, date_snapshot=datetime.now()
+            )
 
 
         index_artifact_bundles_for_release.assert_not_called()
         index_artifact_bundles_for_release.assert_not_called()
 
 
@@ -882,9 +975,16 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() + timedelta(hours=1),
             date=datetime.now() + timedelta(hours=1),
         )
         )
 
 
-        _index_bundle_if_needed(
-            org_id=self.organization.id, release=release, dist=dist, date_snapshot=datetime.now()
-        )
+        with ArtifactBundlePostAssembler(
+            assemble_result=self.mock_assemble_result(),
+            organization=self.organization,
+            release=release,
+            dist=dist,
+            project_ids=[],
+        ) as post_assembler:
+            post_assembler._index_bundle_if_needed(
+                release=release, dist=dist, date_snapshot=datetime.now()
+            )
 
 
         index_artifact_bundles_for_release.assert_called_with(
         index_artifact_bundles_for_release.assert_called_with(
             organization_id=self.organization.id,
             organization_id=self.organization.id,