Browse Source

Improve ArtifactBundle (DB) Indexing (#61045)

This primarily increases the threshold at which we start indexing
ArtifactBundles from 2 to 3.

While doing so, it also moves backfilling to a background task, to not
regress tail latencies of the main assemble job.

So in summary, this should lower the total number of bundles indexed,
and also reduce tail latencies of `assemble_artifact`.
Arpad Borsos 1 year ago
parent
commit
da22cec2a8

+ 1 - 1
fixtures/artifact_bundle_duplicated_debug_ids/manifest.json

@@ -1,5 +1,5 @@
 {
-  "debug_id": "67429b2f-1d9e-43bb-a626-771a1e37555c",
+  "debug_id": "67429b2f-1d9e-43bb-a626-771a1e37555d",
   "files": {
     "files/_/_/index.js": {
       "url": "~/index.js",

+ 42 - 18
src/sentry/debug_files/artifact_bundles.py

@@ -12,7 +12,6 @@ from django.utils import timezone
 
 from sentry import options
 from sentry.models.artifactbundle import (
-    INDEXING_THRESHOLD,
     ArtifactBundle,
     ArtifactBundleArchive,
     ArtifactBundleIndex,
@@ -22,6 +21,7 @@ from sentry.models.artifactbundle import (
     ProjectArtifactBundle,
     ReleaseArtifactBundle,
 )
+from sentry.models.organization import Organization
 from sentry.models.project import Project
 from sentry.utils import metrics, redis
 from sentry.utils.db import atomic_transaction
@@ -29,6 +29,10 @@ from sentry.utils.db import atomic_transaction
 # The number of Artifact Bundles that we return in case of incomplete indexes.
 MAX_BUNDLES_QUERY = 5
 
+# Number of bundles that have to be associated to a release/dist pair before indexing takes place.
+# A value of 3 means that the third upload will trigger indexing and backfill.
+INDEXING_THRESHOLD = 3
+
 # Number of days that determine whether an artifact bundle is ready for being renewed.
 AVAILABLE_FOR_RENEWAL_DAYS = 30
 
@@ -36,7 +40,6 @@ AVAILABLE_FOR_RENEWAL_DAYS = 30
 # optimize it based on the time taken to perform the indexing (on average).
 INDEXING_CACHE_TIMEOUT = 600
 
-
 # ===== Indexing of Artifact Bundles =====
 
 
@@ -96,7 +99,7 @@ def index_artifact_bundles_for_release(
                 metrics.incr("artifact_bundle_indexing.bundle_already_being_indexed")
                 continue
 
-            _index_urls_in_bundle(organization_id, artifact_bundle, archive)
+            index_urls_in_bundle(organization_id, artifact_bundle, archive)
         except Exception as e:
             # We want to catch the error and continue execution, since we can try to index the other bundles.
             metrics.incr("artifact_bundle_indexing.index_single_artifact_bundle_error")
@@ -107,8 +110,20 @@ def index_artifact_bundles_for_release(
             # debounce this in case there is a persistent error?
 
 
+def backfill_artifact_bundle_db_indexing(organization_id: int, release: str, dist: str):
+    artifact_bundles = ArtifactBundle.objects.filter(
+        releaseartifactbundle__organization_id=organization_id,
+        releaseartifactbundle__release_name=release,
+        releaseartifactbundle__dist_name=dist,
+        indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
+    )
+    artifact_bundles = [(ab, None) for ab in artifact_bundles]
+
+    index_artifact_bundles_for_release(organization_id, artifact_bundles)
+
+
 @sentry_sdk.tracing.trace
-def _index_urls_in_bundle(
+def index_urls_in_bundle(
     organization_id: int,
     artifact_bundle: ArtifactBundle,
     existing_archive: ArtifactBundleArchive | None,
@@ -231,7 +246,7 @@ def maybe_renew_artifact_bundles(used_artifact_bundles: Dict[int, datetime]):
     # We compute the threshold used to determine whether we want to renew the specific bundle.
     threshold_date = now - timedelta(days=AVAILABLE_FOR_RENEWAL_DAYS)
 
-    for (artifact_bundle_id, date_added) in used_artifact_bundles.items():
+    for artifact_bundle_id, date_added in used_artifact_bundles.items():
         # We perform the condition check also before running the query, in order to reduce the amount of queries to the database.
         if date_added > threshold_date:
             continue
@@ -333,12 +348,14 @@ def query_artifact_bundles_containing_file(
     # want to return the N most recent bundles associated with the release,
     # under the assumption that one of those should ideally contain the file we
     # are looking for.
-    is_fully_indexed = total_bundles > INDEXING_THRESHOLD and indexed_bundles == total_bundles
+    is_fully_indexed = total_bundles >= INDEXING_THRESHOLD and indexed_bundles == total_bundles
 
-    if total_bundles > INDEXING_THRESHOLD and indexed_bundles < total_bundles:
+    if total_bundles >= INDEXING_THRESHOLD and indexed_bundles < total_bundles:
         metrics.incr("artifact_bundle_indexing.query_partial_index")
         # TODO: spawn an async task to backfill non-indexed bundles
         # lets do this in a different PR though :-)
+        # ^ we would want to use a Redis SET to not spawn a ton of duplicated
+        # celery tasks here.
 
     # We keep track of all the discovered artifact bundles, by the various means of lookup.
     # We are intentionally overwriting the `resolved` flag, as we want to rank these from
@@ -346,7 +363,7 @@ def query_artifact_bundles_containing_file(
     artifact_bundles: Dict[int, Tuple[datetime, str]] = dict()
 
     def update_bundles(bundles: Set[Tuple[int, datetime]], resolved: str):
-        for (bundle_id, date_added) in bundles:
+        for bundle_id, date_added in bundles:
             artifact_bundles[bundle_id] = (date_added, resolved)
 
     # First, get the N most recently uploaded bundles for the release,
@@ -380,7 +397,9 @@ def query_artifact_bundles_containing_file(
 # multiple tables in a single query.
 
 
-def get_bundles_indexing_state(project: Project, release_name: str, dist_name: str):
+def get_bundles_indexing_state(
+    org_or_project: Project | Organization, release_name: str, dist_name: str
+) -> Tuple[int, int]:
     """
     Returns the number of total bundles, and the number of fully indexed bundles
     associated with the given `release` / `dist`.
@@ -388,16 +407,21 @@ def get_bundles_indexing_state(project: Project, release_name: str, dist_name: s
     total_bundles = 0
     indexed_bundles = 0
 
-    for state, count in (
-        ArtifactBundle.objects.filter(
-            releaseartifactbundle__organization_id=project.organization.id,
-            releaseartifactbundle__release_name=release_name,
-            releaseartifactbundle__dist_name=dist_name,
-            projectartifactbundle__project_id=project.id,
+    query = ArtifactBundle.objects.filter(
+        releaseartifactbundle__release_name=release_name,
+        releaseartifactbundle__dist_name=dist_name,
+    )
+    if isinstance(org_or_project, Project):
+        query = query.filter(
+            releaseartifactbundle__organization_id=org_or_project.organization.id,
+            projectartifactbundle__project_id=org_or_project.id,
         )
-        .values_list("indexing_state")
-        .annotate(count=Count("*"))
-    ):
+    else:
+        query = query.filter(
+            releaseartifactbundle__organization_id=org_or_project.id,
+        )
+
+    for state, count in query.values_list("indexing_state").annotate(count=Count("*")):
         if state == ArtifactBundleIndexingState.WAS_INDEXED.value:
             indexed_bundles = count
         total_bundles += count

+ 10 - 0
src/sentry/debug_files/tasks.py

@@ -23,3 +23,13 @@ def refresh_artifact_bundles_in_use():
     from .artifact_bundles import refresh_artifact_bundles_in_use as do_refresh
 
     do_refresh()
+
+
+@instrumented_task(
+    name="sentry.debug_files.tasks.backfill_artifact_bundle_db_indexing",
+    queue="assemble",
+)
+def backfill_artifact_bundle_db_indexing(organization_id: int, release: str, dist: str):
+    from .artifact_bundles import backfill_artifact_bundle_db_indexing as do_backfill
+
+    do_backfill(organization_id, release, dist)

+ 0 - 2
src/sentry/models/artifactbundle.py

@@ -29,8 +29,6 @@ from sentry.utils.services import LazyServiceWrapper
 # always different from `NULL`.
 NULL_UUID = "00000000-00000000-00000000-00000000"
 NULL_STRING = ""
-# Number of bundles that have to be associated to a release/dist pair before indexing takes place.
-INDEXING_THRESHOLD = 1
 
 
 class SourceFileType(Enum):

+ 19 - 57
src/sentry/tasks/assemble.py

@@ -22,9 +22,13 @@ from sentry.debug_files.artifact_bundle_indexing import (
     mark_bundle_for_flat_file_indexing,
     update_artifact_bundle_index,
 )
-from sentry.debug_files.artifact_bundles import index_artifact_bundles_for_release
-from sentry.models.artifactbundle import (
+from sentry.debug_files.artifact_bundles import (
     INDEXING_THRESHOLD,
+    get_bundles_indexing_state,
+    index_artifact_bundles_for_release,
+)
+from sentry.debug_files.tasks import backfill_artifact_bundle_db_indexing
+from sentry.models.artifactbundle import (
     NULL_STRING,
     ArtifactBundle,
     ArtifactBundleArchive,
@@ -623,14 +627,13 @@ class ArtifactBundlePostAssembler(PostAssembler[ArtifactBundleArchive]):
         # fast indexing performance. We might though run indexing if a customer has debug ids in the manifest, since
         # we want to have a fallback mechanism in case they have problems setting them up (e.g., SDK version does
         # not support them, some files were not injected...).
-        if self.release:
+        if created and self.release:
             # After we committed the transaction we want to try and run indexing by passing non-null release and
             # dist. The dist here can be "" since it will be the equivalent of NULL for the db query.
             self._index_bundle_if_needed(
                 artifact_bundle,
                 release=self.release,
                 dist=(self.dist or NULL_STRING),
-                date_snapshot=date_snapshot,
             )
 
         if features.has("organizations:sourcemaps-bundle-flat-file-indexing", self.organization):
@@ -714,36 +717,16 @@ class ArtifactBundlePostAssembler(PostAssembler[ArtifactBundleArchive]):
         ArtifactBundle.objects.filter(Q(id__in=ids), organization_id=self.organization.id).delete()
 
     @sentry_sdk.tracing.trace
-    def _index_bundle_if_needed(
-        self, artifact_bundle: ArtifactBundle, release: str, dist: str, date_snapshot: datetime
-    ):
+    def _index_bundle_if_needed(self, artifact_bundle: ArtifactBundle, release: str, dist: str):
         # We collect how many times we tried to perform indexing.
         metrics.incr("tasks.assemble.artifact_bundle.try_indexing")
 
-        # We get the number of associations by upper bounding the query to the "date_snapshot", which is done to
-        # prevent the case in which concurrent updates on the database will lead to problems. For example if we have
-        # a threshold of 1, and we have two uploads happening concurrently and the database will contain two
-        # associations even when the assembling of the first upload is running this query, we will have the first
-        # upload task see 2 associations , thus it will trigger the indexing. The same will also happen for the
-        # second upload but in reality we just want the second upload to perform indexing.
-        #
-        # This date implementation might still lead to issues, more specifically in the case in which the
-        # "date_last_modified" is the same but the probability of that happening is so low that it's a negligible
-        # detail for now, as long as the indexing is idempotent.
-        associated_bundles = list(
-            ArtifactBundle.objects.filter(
-                releaseartifactbundle__organization_id=self.organization.id,
-                releaseartifactbundle__release_name=release,
-                releaseartifactbundle__dist_name=dist,
-                # Since the `date_snapshot` will be the same as `date_last_modified` of the last bundle uploaded in this
-                # async job, we want to use the `<=` condition for time, effectively saying give me all the bundles that
-                # were created now or in the past.
-                date_last_modified__lte=date_snapshot,
-            )
+        (total_bundles, indexed_bundles) = get_bundles_indexing_state(
+            self.organization, release, dist
         )
 
         # In case we didn't surpass the threshold, indexing will not happen.
-        if len(associated_bundles) <= INDEXING_THRESHOLD:
+        if total_bundles < INDEXING_THRESHOLD:
             return
 
         # We collect how many times we run indexing.
@@ -751,36 +734,15 @@ class ArtifactBundlePostAssembler(PostAssembler[ArtifactBundleArchive]):
 
         # We want to measure how much time it takes to perform indexing.
         with metrics.timer("tasks.assemble.artifact_bundle.index_bundles"):
-            # We now call the indexing logic with all the bundles that require indexing. We might need to make this call
-            # async if we see a performance degradation of assembling.
-            try:
-                # We only want to get the bundles that are not indexed. Keep in mind that this query is concurrency
-                # unsafe since in the meanwhile the bundles might be modified and the modification will not be
-                # reflected in the objects that we are iterating here.
-                #
-                # In case of concurrency issues, we might do extra work but due to the idempotency of the indexing
-                # function no consistency issues should arise.
-                bundles_to_index = [
-                    (
-                        associated_bundle,
-                        self.archive if associated_bundle.id == artifact_bundle.id else None,
-                    )
-                    for associated_bundle in associated_bundles
-                    if associated_bundle.indexing_state
-                    == ArtifactBundleIndexingState.NOT_INDEXED.value
-                ]
+            # NOTE: this is doing a try/catch internally
+            index_artifact_bundles_for_release(
+                organization_id=self.organization.id,
+                artifact_bundles=[(artifact_bundle, self.archive)],
+            )
 
-                # We want to index only if we have bundles to index.
-                if len(bundles_to_index) > 0:
-                    index_artifact_bundles_for_release(
-                        organization_id=self.organization.id,
-                        artifact_bundles=bundles_to_index,
-                    )
-            except Exception as e:
-                # We want to capture any exception happening during indexing, since it's crucial to understand if
-                # the system is behaving well because the database can easily end up in an inconsistent state.
-                metrics.incr("tasks.assemble.artifact_bundle.index_artifact_bundles_error")
-                sentry_sdk.capture_exception(e)
+        # Backfill older bundles we did not index yet if any are missing
+        if indexed_bundles + 1 < total_bundles:
+            backfill_artifact_bundle_db_indexing.delay(self.organization.id, release, dist)
 
     @sentry_sdk.tracing.trace
     def _index_bundle_into_flat_file(self, artifact_bundle: ArtifactBundle):

+ 8 - 12
tests/sentry/debug_files/test_artifact_bundles.py

@@ -61,7 +61,7 @@ def get_artifact_bundles(project, release_name="", dist_name=""):
             projectartifactbundle__project_id=project.id,
             releaseartifactbundle__release_name=release_name,
             releaseartifactbundle__dist_name=dist_name,
-        )
+        ).order_by("date_last_modified")
     )
 
 
@@ -101,7 +101,8 @@ class ArtifactLookupTest(TestCase):
                 },
             }
         )
-        upload_bundle(bundle, self.project, "1.0.0")
+        with self.tasks():
+            upload_bundle(bundle, self.project, "1.0.0")
 
         # the first upload will not index anything
         bundles = get_artifact_bundles(self.project, "1.0.0")
@@ -121,7 +122,8 @@ class ArtifactLookupTest(TestCase):
                 },
             }
         )
-        upload_bundle(bundle, self.project, "1.0.0")
+        with self.tasks():
+            upload_bundle(bundle, self.project, "1.0.0")
 
         # Uploading the same bundle a second time which internally still creates two artifact bundles, which both
         # cover the same set of files.
@@ -129,14 +131,7 @@ class ArtifactLookupTest(TestCase):
         bundles = get_artifact_bundles(self.project, "1.0.0")
         assert len(bundles) == 2
         indexed = get_indexed_files(self.project, "1.0.0", distinct=True)
-        assert len(indexed) == 3
-
-        assert indexed[0].url == "~/path/to/app.js"
-        assert indexed[0].artifact_bundle == bundles[1]
-        assert indexed[1].url == "~/path/to/only1.js"
-        assert indexed[1].artifact_bundle == bundles[0]
-        assert indexed[2].url == "~/path/to/other1.js"
-        assert indexed[2].artifact_bundle == bundles[1]
+        assert len(indexed) == 0
 
         bundle = make_compressed_zip_file(
             {
@@ -150,7 +145,8 @@ class ArtifactLookupTest(TestCase):
                 },
             }
         )
-        upload_bundle(bundle, self.project, "1.0.0")
+        with self.tasks():
+            upload_bundle(bundle, self.project, "1.0.0")
 
         # the second upload will backfill everything that needs indexing
         bundles = get_artifact_bundles(self.project, "1.0.0")

+ 42 - 14
tests/sentry/tasks/test_assemble.py

@@ -665,12 +665,32 @@ class AssembleArtifactsTest(BaseAssembleTest):
             upload_as_artifact_bundle=True,
         )
 
+        # Since the threshold is not surpassed we expect the system to not perform indexing.
+        index_artifact_bundles_for_release.assert_not_called()
+
+        bundle_file_3 = self.create_artifact_bundle_zip(
+            fixture_path="artifact_bundle_duplicated_debug_ids", project=self.project.id
+        )
+        blob1_3 = FileBlob.from_file(ContentFile(bundle_file_3))
+        total_checksum_3 = sha1(bundle_file_3).hexdigest()
+
+        # We try to upload the first bundle.
+        assemble_artifacts(
+            org_id=self.organization.id,
+            project_ids=[self.project.id],
+            version=release,
+            dist=dist,
+            checksum=total_checksum_3,
+            chunks=[blob1_3.checksum],
+            upload_as_artifact_bundle=True,
+        )
+
         bundles = ArtifactBundle.objects.all()
 
         # Since the threshold is now passed, we expect the system to perform indexing.
         index_artifact_bundles_for_release.assert_called_with(
             organization_id=self.organization.id,
-            artifact_bundles=[(bundles[0], None), (bundles[1], mock.ANY)],
+            artifact_bundles=[(bundles[2], mock.ANY)],
         )
 
     def test_bundle_flat_file_indexing(self):
@@ -889,7 +909,9 @@ class ArtifactBundleIndexingTest(TestCase):
             project_ids=[],
         ) as post_assembler:
             post_assembler._index_bundle_if_needed(
-                artifact_bundle=None, release=release, dist=dist, date_snapshot=datetime.now()
+                artifact_bundle=None,
+                release=release,
+                dist=dist,
             )
 
         index_artifact_bundles_for_release.assert_not_called()
@@ -917,7 +939,9 @@ class ArtifactBundleIndexingTest(TestCase):
             project_ids=[],
         ) as post_assembler:
             post_assembler._index_bundle_if_needed(
-                artifact_bundle=None, release=release, dist=dist, date_snapshot=datetime.now()
+                artifact_bundle=None,
+                release=release,
+                dist=dist,
             )
 
         index_artifact_bundles_for_release.assert_not_called()
@@ -929,7 +953,7 @@ class ArtifactBundleIndexingTest(TestCase):
         release = "1.0"
         dist = "android"
 
-        artifact_bundle_1 = self._create_bundle_and_bind_to_release(
+        self._create_bundle_and_bind_to_release(
             release=release,
             dist=dist,
             bundle_id="2c5b367b-4fef-4db8-849d-b9e79607d630",
@@ -937,7 +961,15 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() - timedelta(hours=2),
         )
 
-        artifact_bundle_2 = self._create_bundle_and_bind_to_release(
+        self._create_bundle_and_bind_to_release(
+            release=release,
+            dist=dist,
+            bundle_id="0cf678f2-0771-4e2f-8ace-d6cea8493f0c",
+            indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
+            date=datetime.now() - timedelta(hours=1),
+        )
+
+        artifact_bundle_3 = self._create_bundle_and_bind_to_release(
             release=release,
             dist=dist,
             bundle_id="0cf678f2-0771-4e2f-8ace-d6cea8493f0d",
@@ -953,15 +985,14 @@ class ArtifactBundleIndexingTest(TestCase):
             project_ids=[],
         ) as post_assembler:
             post_assembler._index_bundle_if_needed(
-                artifact_bundle=artifact_bundle_2,
+                artifact_bundle=artifact_bundle_3,
                 release=release,
                 dist=dist,
-                date_snapshot=datetime.now(),
             )
 
         index_artifact_bundles_for_release.assert_called_with(
             organization_id=self.organization.id,
-            artifact_bundles=[(artifact_bundle_1, None), (artifact_bundle_2, mock.ANY)],
+            artifact_bundles=[(artifact_bundle_3, mock.ANY)],
         )
 
     @patch("sentry.tasks.assemble.index_artifact_bundles_for_release")
@@ -992,9 +1023,7 @@ class ArtifactBundleIndexingTest(TestCase):
             dist=dist,
             project_ids=[],
         ) as post_assembler:
-            post_assembler._index_bundle_if_needed(
-                artifact_bundle=None, release=release, dist=dist, date_snapshot=datetime.now()
-            )
+            post_assembler._index_bundle_if_needed(artifact_bundle=None, release=release, dist=dist)
 
         index_artifact_bundles_for_release.assert_not_called()
 
@@ -1013,7 +1042,7 @@ class ArtifactBundleIndexingTest(TestCase):
             date=datetime.now() - timedelta(hours=1),
         )
 
-        artifact_bundle_2 = self._create_bundle_and_bind_to_release(
+        self._create_bundle_and_bind_to_release(
             release=release,
             dist=dist,
             bundle_id="2c5b367b-4fef-4db8-849d-b9e79607d630",
@@ -1042,10 +1071,9 @@ class ArtifactBundleIndexingTest(TestCase):
                 artifact_bundle=artifact_bundle_1,
                 release=release,
                 dist=dist,
-                date_snapshot=datetime.now(),
             )
 
         index_artifact_bundles_for_release.assert_called_with(
             organization_id=self.organization.id,
-            artifact_bundles=[(artifact_bundle_1, mock.ANY), (artifact_bundle_2, None)],
+            artifact_bundles=[(artifact_bundle_1, mock.ANY)],
         )