|
@@ -7,12 +7,10 @@ from datetime import datetime
|
|
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TypeVar
|
|
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, TypeVar
|
|
|
|
|
|
import sentry_sdk
|
|
import sentry_sdk
|
|
-from django.db import router
|
|
|
|
|
|
+from django.db import DatabaseError, router
|
|
from django.utils import timezone
|
|
from django.utils import timezone
|
|
|
|
|
|
from sentry.debug_files.artifact_bundles import get_redis_cluster_for_artifact_bundles
|
|
from sentry.debug_files.artifact_bundles import get_redis_cluster_for_artifact_bundles
|
|
-from sentry.debug_files.utils import size_in_bytes
|
|
|
|
-from sentry.locks import locks
|
|
|
|
from sentry.models.artifactbundle import (
|
|
from sentry.models.artifactbundle import (
|
|
NULL_STRING,
|
|
NULL_STRING,
|
|
ArtifactBundle,
|
|
ArtifactBundle,
|
|
@@ -23,8 +21,6 @@ from sentry.models.artifactbundle import (
|
|
)
|
|
)
|
|
from sentry.utils import json, metrics
|
|
from sentry.utils import json, metrics
|
|
from sentry.utils.db import atomic_transaction
|
|
from sentry.utils.db import atomic_transaction
|
|
-from sentry.utils.locking.lock import Lock
|
|
|
|
-from sentry.utils.retries import TimedRetryPolicy
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -39,6 +35,31 @@ class BundleMeta:
|
|
id: int
|
|
id: int
|
|
timestamp: datetime
|
|
timestamp: datetime
|
|
|
|
|
|
|
|
+ @staticmethod
|
|
|
|
+ def from_artifact_bundle(artifact_bundle: ArtifactBundle) -> BundleMeta:
|
|
|
|
+ return BundleMeta(
|
|
|
|
+ id=artifact_bundle.id,
|
|
|
|
+ # We give priority to the date last modified for total ordering.
|
|
|
|
+ timestamp=(artifact_bundle.date_last_modified or artifact_bundle.date_uploaded),
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@dataclass(frozen=True)
|
|
|
|
+class BundleManifest:
|
|
|
|
+ meta: BundleMeta
|
|
|
|
+ urls: List[str]
|
|
|
|
+ debug_ids: List[str]
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def from_artifact_bundle(
|
|
|
|
+ artifact_bundle: ArtifactBundle, archive: ArtifactBundleArchive
|
|
|
|
+ ) -> BundleManifest:
|
|
|
|
+ meta = BundleMeta.from_artifact_bundle(artifact_bundle)
|
|
|
|
+ urls = archive.get_all_urls()
|
|
|
|
+ debug_ids = archive.get_all_debug_ids()
|
|
|
|
+
|
|
|
|
+ return BundleManifest(meta=meta, urls=urls, debug_ids=debug_ids)
|
|
|
|
+
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
@dataclass(frozen=True)
|
|
class FlatFileMeta:
|
|
class FlatFileMeta:
|
|
@@ -64,77 +85,6 @@ class FlatFileMeta:
|
|
return self.id == -1 and self.date == datetime.utcfromtimestamp(0)
|
|
return self.id == -1 and self.date == datetime.utcfromtimestamp(0)
|
|
|
|
|
|
|
|
|
|
-@sentry_sdk.tracing.trace
|
|
|
|
-def mark_bundle_for_flat_file_indexing(
|
|
|
|
- artifact_bundle: ArtifactBundle,
|
|
|
|
- project_ids: List[int],
|
|
|
|
- release: Optional[str],
|
|
|
|
- dist: Optional[str],
|
|
|
|
-):
|
|
|
|
- identifiers = []
|
|
|
|
-
|
|
|
|
- for project_id in project_ids:
|
|
|
|
- if release:
|
|
|
|
- identifiers.append(
|
|
|
|
- FlatFileIdentifier(project_id, release=release, dist=dist or NULL_STRING)
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- identifiers.append(FlatFileIdentifier.for_debug_id(project_id))
|
|
|
|
-
|
|
|
|
- # Create / Update the indexing state in the database
|
|
|
|
- for identifier in identifiers:
|
|
|
|
- # It turns out our DB integrity, and usage of `get_or_create` is not safe
|
|
|
|
- # in the sense that it can end up with duplicates in the database, yay!
|
|
|
|
- # So just wrap all of this in a lock so we are definitely not creating
|
|
|
|
- # duplicated index entries concurrently.
|
|
|
|
- lock = identifier.get_lock()
|
|
|
|
- with TimedRetryPolicy(60)(lock.acquire), atomic_transaction(
|
|
|
|
- using=(
|
|
|
|
- router.db_for_write(ArtifactBundleFlatFileIndex),
|
|
|
|
- router.db_for_write(FlatFileIndexState),
|
|
|
|
- )
|
|
|
|
- ):
|
|
|
|
- # This used to be `get_or_create`, but that is completely broken
|
|
|
|
- # when you end up with duplicates, so now we gotta clean that mess up:
|
|
|
|
- flat_file_indexes = list(
|
|
|
|
- ArtifactBundleFlatFileIndex.objects.filter(
|
|
|
|
- project_id=identifier.project_id,
|
|
|
|
- release_name=identifier.release,
|
|
|
|
- dist_name=identifier.dist,
|
|
|
|
- )
|
|
|
|
- )
|
|
|
|
- if len(flat_file_indexes) > 0:
|
|
|
|
- flat_file_index = flat_file_indexes.pop(0)
|
|
|
|
- # remove duplicates from the DB:
|
|
|
|
- if len(flat_file_indexes) > 0:
|
|
|
|
- ids = [index.id for index in flat_file_indexes]
|
|
|
|
- ArtifactBundleFlatFileIndex.objects.filter(id__in=ids).delete()
|
|
|
|
- else:
|
|
|
|
- flat_file_index = ArtifactBundleFlatFileIndex.objects.create(
|
|
|
|
- project_id=identifier.project_id,
|
|
|
|
- release_name=identifier.release,
|
|
|
|
- dist_name=identifier.dist,
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- # Lol, turns out that `update_or_create` will also do a `get` under the hood,
|
|
|
|
- # which is equally broken if you end up with duplicates.
|
|
|
|
- rows_updated = FlatFileIndexState.objects.filter(
|
|
|
|
- flat_file_index=flat_file_index,
|
|
|
|
- artifact_bundle=artifact_bundle,
|
|
|
|
- ).update(
|
|
|
|
- indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
|
|
|
|
- date_added=timezone.now(),
|
|
|
|
- )
|
|
|
|
- if rows_updated == 0:
|
|
|
|
- FlatFileIndexState.objects.create(
|
|
|
|
- flat_file_index=flat_file_index,
|
|
|
|
- artifact_bundle=artifact_bundle,
|
|
|
|
- indexing_state=ArtifactBundleIndexingState.NOT_INDEXED.value,
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- return identifiers
|
|
|
|
-
|
|
|
|
-
|
|
|
|
class FlatFileIdentifier(NamedTuple):
|
|
class FlatFileIdentifier(NamedTuple):
|
|
project_id: int
|
|
project_id: int
|
|
release: str
|
|
release: str
|
|
@@ -155,12 +105,6 @@ class FlatFileIdentifier(NamedTuple):
|
|
def _flat_file_meta_cache_key(self):
|
|
def _flat_file_meta_cache_key(self):
|
|
return f"flat_file_index:{self._hashed()}"
|
|
return f"flat_file_index:{self._hashed()}"
|
|
|
|
|
|
- def delete_flat_file_meta_from_cache(self):
|
|
|
|
- cache_key = self._flat_file_meta_cache_key()
|
|
|
|
- redis_client = get_redis_cluster_for_artifact_bundles()
|
|
|
|
-
|
|
|
|
- redis_client.delete(cache_key)
|
|
|
|
-
|
|
|
|
def set_flat_file_meta_in_cache(self, flat_file_meta: FlatFileMeta):
|
|
def set_flat_file_meta_in_cache(self, flat_file_meta: FlatFileMeta):
|
|
cache_key = self._flat_file_meta_cache_key()
|
|
cache_key = self._flat_file_meta_cache_key()
|
|
redis_client = get_redis_cluster_for_artifact_bundles()
|
|
redis_client = get_redis_cluster_for_artifact_bundles()
|
|
@@ -227,70 +171,146 @@ class FlatFileIdentifier(NamedTuple):
|
|
|
|
|
|
return meta
|
|
return meta
|
|
|
|
|
|
- def _locking_key(self):
|
|
|
|
- return f"bundle_index:write:{self._hashed()}"
|
|
|
|
|
|
|
|
- def get_lock(self) -> Lock:
|
|
|
|
- locking_key = self._locking_key()
|
|
|
|
|
|
+@sentry_sdk.tracing.trace
|
|
|
|
+def mark_bundle_for_flat_file_indexing(
|
|
|
|
+ artifact_bundle: ArtifactBundle,
|
|
|
|
+ project_ids: List[int],
|
|
|
|
+ release: Optional[str],
|
|
|
|
+ dist: Optional[str],
|
|
|
|
+) -> List[FlatFileIdentifier]:
|
|
|
|
+ identifiers = []
|
|
|
|
+
|
|
|
|
+ for project_id in project_ids:
|
|
|
|
+ if release:
|
|
|
|
+ identifiers.append(
|
|
|
|
+ FlatFileIdentifier(project_id, release=release, dist=dist or NULL_STRING)
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ identifiers.append(FlatFileIdentifier.for_debug_id(project_id))
|
|
|
|
+
|
|
|
|
+ # Create / Update the indexing state in the database
|
|
|
|
+ for identifier in identifiers:
|
|
|
|
+ with atomic_transaction(
|
|
|
|
+ using=(
|
|
|
|
+ router.db_for_write(ArtifactBundleFlatFileIndex),
|
|
|
|
+ router.db_for_write(FlatFileIndexState),
|
|
|
|
+ )
|
|
|
|
+ ):
|
|
|
|
+ flat_file_index, _created = ArtifactBundleFlatFileIndex.objects.get_or_create(
|
|
|
|
+ project_id=identifier.project_id,
|
|
|
|
+ release_name=identifier.release,
|
|
|
|
+ dist_name=identifier.dist,
|
|
|
|
+ )
|
|
|
|
+ FlatFileIndexState.objects.update_or_create(
|
|
|
|
+ flat_file_index=flat_file_index,
|
|
|
|
+ artifact_bundle=artifact_bundle,
|
|
|
|
+ defaults={
|
|
|
|
+ "indexing_state": ArtifactBundleIndexingState.NOT_INDEXED.value,
|
|
|
|
+ "date_added": timezone.now(),
|
|
|
|
+ },
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ return identifiers
|
|
|
|
|
|
- return locks.get(locking_key, duration=60 * 10, name="bundle_index")
|
|
|
|
|
|
+
|
|
|
|
+@sentry_sdk.tracing.trace
|
|
|
|
+def remove_artifact_bundle_from_indexes(artifact_bundle: ArtifactBundle):
|
|
|
|
+ flat_file_indexes = ArtifactBundleFlatFileIndex.objects.filter(
|
|
|
|
+ flatfileindexstate__artifact_bundle=artifact_bundle
|
|
|
|
+ )
|
|
|
|
+ for idx in flat_file_indexes:
|
|
|
|
+ identifier = FlatFileIdentifier(
|
|
|
|
+ project_id=idx.project_id, release=idx.release_name, dist=idx.dist_name
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ was_removed = update_artifact_bundle_index(
|
|
|
|
+ identifier, bundles_to_remove=[artifact_bundle.id]
|
|
|
|
+ )
|
|
|
|
+ if not was_removed:
|
|
|
|
+ metrics.incr("artifact_bundle_flat_file_indexing.removal.would_block")
|
|
|
|
+ # TODO: mark this for async removal
|
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
|
@sentry_sdk.tracing.trace
|
|
@sentry_sdk.tracing.trace
|
|
def update_artifact_bundle_index(
|
|
def update_artifact_bundle_index(
|
|
- bundle_meta: BundleMeta, bundle_archive: ArtifactBundleArchive, identifier: FlatFileIdentifier
|
|
|
|
-):
|
|
|
|
|
|
+ identifier: FlatFileIdentifier,
|
|
|
|
+ blocking: bool = False,
|
|
|
|
+ bundles_to_add: List[BundleManifest] | None = None,
|
|
|
|
+ bundles_to_remove: List[int] | None = None,
|
|
|
|
+) -> bool:
|
|
"""
|
|
"""
|
|
- This will merge the `ArtifactBundle` given via `bundle_meta` and `bundle_archive`
|
|
|
|
- into the index identified via `identifier`.
|
|
|
|
|
|
+ This will update the index identified via `identifier`.
|
|
|
|
+ Multiple manifests given in `bundles_to_add` and `bundles_to_remove` will be merged
|
|
|
|
+ into the index as one batched operation.
|
|
|
|
|
|
- If this function fails for any reason, it can be, and *has to be* retried at a later point.
|
|
|
|
|
|
+ If this function fails for any reason, it can be, and *has to be* retried at a later point,
|
|
|
|
+ as not doing so will leave inconsistent indexes around.
|
|
"""
|
|
"""
|
|
- # TODO: maybe query `FlatFileIndexState` to avoid double-indexing?
|
|
|
|
-
|
|
|
|
- lock = identifier.get_lock()
|
|
|
|
- with TimedRetryPolicy(60)(lock.acquire):
|
|
|
|
- flat_file_index = ArtifactBundleFlatFileIndex.objects.filter(
|
|
|
|
- project_id=identifier.project_id,
|
|
|
|
- release_name=identifier.release,
|
|
|
|
- dist_name=identifier.dist,
|
|
|
|
- ).first()
|
|
|
|
|
|
+ with atomic_transaction(
|
|
|
|
+ using=(
|
|
|
|
+ router.db_for_write(ArtifactBundleFlatFileIndex),
|
|
|
|
+ router.db_for_write(FlatFileIndexState),
|
|
|
|
+ )
|
|
|
|
+ ):
|
|
|
|
+ # The `nowait=True` will opportunistically lock the row/index without blocking,
|
|
|
|
+ # and throw an error otherwise which we will pass down and handle in the caller.
|
|
|
|
+ try:
|
|
|
|
+ flat_file_index = (
|
|
|
|
+ ArtifactBundleFlatFileIndex.objects.filter(
|
|
|
|
+ project_id=identifier.project_id,
|
|
|
|
+ release_name=identifier.release,
|
|
|
|
+ dist_name=identifier.dist,
|
|
|
|
+ )
|
|
|
|
+ .select_for_update(nowait=not blocking)
|
|
|
|
+ .first()
|
|
|
|
+ )
|
|
|
|
+ except DatabaseError:
|
|
|
|
+ return False
|
|
|
|
|
|
index = FlatFileIndex()
|
|
index = FlatFileIndex()
|
|
# Load the index from the file if it exists
|
|
# Load the index from the file if it exists
|
|
if existing_index := flat_file_index.load_flat_file_index():
|
|
if existing_index := flat_file_index.load_flat_file_index():
|
|
index.from_json(existing_index)
|
|
index.from_json(existing_index)
|
|
|
|
|
|
- # Before merging new data into the index, we will clear any existing
|
|
|
|
- # data from the index related to this bundle.
|
|
|
|
- # This is related to an edge-case in which the same `bundle_id` could be
|
|
|
|
- # re-used but with different file contents.
|
|
|
|
- index.remove(bundle_meta.id)
|
|
|
|
|
|
+ for bundle in bundles_to_add or []:
|
|
|
|
+ # Before merging new data into the index, we will clear any existing
|
|
|
|
+ # data from the index related to this bundle.
|
|
|
|
+ # This is related to an edge-case in which the same `bundle_id` could be
|
|
|
|
+ # re-used but with different file contents.
|
|
|
|
+ index.remove(bundle.meta.id)
|
|
|
|
|
|
- # We merge the index based on the identifier type.
|
|
|
|
- if identifier.is_indexing_by_release():
|
|
|
|
- index.merge_urls(bundle_meta, bundle_archive)
|
|
|
|
- else:
|
|
|
|
- index.merge_debug_ids(bundle_meta, bundle_archive)
|
|
|
|
|
|
+ # We merge the index based on the identifier type.
|
|
|
|
+ if identifier.is_indexing_by_release():
|
|
|
|
+ index.merge_urls(bundle.meta, bundle.urls)
|
|
|
|
+ else:
|
|
|
|
+ index.merge_debug_ids(bundle.meta, bundle.debug_ids)
|
|
|
|
+
|
|
|
|
+ for bundle_id in bundles_to_remove or []:
|
|
|
|
+ index.remove(bundle_id)
|
|
|
|
|
|
# Store the updated index file
|
|
# Store the updated index file
|
|
new_json_index = index.to_json()
|
|
new_json_index = index.to_json()
|
|
flat_file_index.update_flat_file_index(new_json_index)
|
|
flat_file_index.update_flat_file_index(new_json_index)
|
|
|
|
|
|
- # And then mark the bundle as indexed
|
|
|
|
- was_updated = FlatFileIndexState.compare_state_and_set(
|
|
|
|
- flat_file_index.id,
|
|
|
|
- bundle_meta.id,
|
|
|
|
- ArtifactBundleIndexingState.NOT_INDEXED,
|
|
|
|
- ArtifactBundleIndexingState.WAS_INDEXED,
|
|
|
|
- )
|
|
|
|
- if not was_updated:
|
|
|
|
- metrics.incr("artifact_bundle_flat_file_indexing.duplicated_indexing")
|
|
|
|
- logger.error("`ArtifactBundle` %r was already indexed into %r", bundle_meta, identifier)
|
|
|
|
|
|
+ # And then mark the bundles as indexed
|
|
|
|
+ for bundle in bundles_to_add or []:
|
|
|
|
+ was_updated = FlatFileIndexState.mark_as_indexed(
|
|
|
|
+ flat_file_index_id=flat_file_index.id, artifact_bundle_id=bundle.meta.id
|
|
|
|
+ )
|
|
|
|
+ if not was_updated:
|
|
|
|
+ metrics.incr("artifact_bundle_flat_file_indexing.duplicated_indexing")
|
|
|
|
+ logger.error(
|
|
|
|
+ "`ArtifactBundle` %r was already indexed into %r", bundle.meta, identifier
|
|
|
|
+ )
|
|
|
|
|
|
- # We invalidate the cache which is holding the FlatFileMeta for this specific identifier. This is done
|
|
|
|
|
|
+ # We update the cache which is holding the FlatFileMeta for this specific identifier. This is done
|
|
# so that any upcoming event will load the new meta from the db and store it in cache.
|
|
# so that any upcoming event will load the new meta from the db and store it in cache.
|
|
- identifier.delete_flat_file_meta_from_cache()
|
|
|
|
|
|
+ identifier.set_flat_file_meta_in_cache(
|
|
|
|
+ FlatFileMeta(id=flat_file_index.id, date=flat_file_index.date_added)
|
|
|
|
+ )
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
|
|
Bundles = List[BundleMeta]
|
|
Bundles = List[BundleMeta]
|
|
@@ -340,35 +360,22 @@ class FlatFileIndex:
|
|
"files_by_debug_id": self._files_by_debug_id,
|
|
"files_by_debug_id": self._files_by_debug_id,
|
|
}
|
|
}
|
|
|
|
|
|
- json_index = json.dumps(json_idx)
|
|
|
|
-
|
|
|
|
- if len(self._files_by_url) == 0:
|
|
|
|
- metrics.timing(
|
|
|
|
- "artifact_bundle_flat_file_indexing.debug_id_index.size_in_bytes",
|
|
|
|
- value=size_in_bytes(json_index),
|
|
|
|
- )
|
|
|
|
- else:
|
|
|
|
- metrics.timing(
|
|
|
|
- "artifact_bundle_flat_file_indexing.url_index.size_in_bytes",
|
|
|
|
- value=size_in_bytes(json_index),
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- return json_index
|
|
|
|
|
|
+ return json.dumps(json_idx)
|
|
|
|
|
|
- def merge_urls(self, bundle_meta: BundleMeta, bundle_archive: ArtifactBundleArchive):
|
|
|
|
|
|
+ def merge_urls(self, bundle_meta: BundleMeta, urls: List[str]):
|
|
bundle_index = self._add_or_update_bundle(bundle_meta)
|
|
bundle_index = self._add_or_update_bundle(bundle_meta)
|
|
if bundle_index is None:
|
|
if bundle_index is None:
|
|
return
|
|
return
|
|
|
|
|
|
- for url in bundle_archive.get_all_urls():
|
|
|
|
|
|
+ for url in urls:
|
|
self._add_sorted_entry(self._files_by_url, url, bundle_index)
|
|
self._add_sorted_entry(self._files_by_url, url, bundle_index)
|
|
|
|
|
|
- def merge_debug_ids(self, bundle_meta: BundleMeta, bundle_archive: ArtifactBundleArchive):
|
|
|
|
|
|
+ def merge_debug_ids(self, bundle_meta: BundleMeta, debug_ids: List[str]):
|
|
bundle_index = self._add_or_update_bundle(bundle_meta)
|
|
bundle_index = self._add_or_update_bundle(bundle_meta)
|
|
if bundle_index is None:
|
|
if bundle_index is None:
|
|
return
|
|
return
|
|
|
|
|
|
- for debug_id, _ in bundle_archive.get_all_debug_ids():
|
|
|
|
|
|
+ for debug_id in debug_ids:
|
|
self._add_sorted_entry(self._files_by_debug_id, debug_id, bundle_index)
|
|
self._add_sorted_entry(self._files_by_debug_id, debug_id, bundle_index)
|
|
|
|
|
|
def _add_or_update_bundle(self, bundle_meta: BundleMeta) -> Optional[int]:
|
|
def _add_or_update_bundle(self, bundle_meta: BundleMeta) -> Optional[int]:
|