Browse Source

ref(hybrid-cloud): Create control silo file tables (#49663)

Add a symmetric set of tables to represent files in the control silo.
This basically duplicates the File model graph by abstracting the base
functionality into Django abstract models and realizing concrete models
for both silos. 


[Jira](https://getsentry.atlassian.net/browse/HC-704)
Mike Ihbe 1 year ago
parent
commit
7aa64ca28e

+ 1 - 1
migrations_lockfile.txt

@@ -6,5 +6,5 @@ To resolve this, rebase against latest master and regenerate your migration. Thi
 will then be regenerated, and you should be able to merge without conflicts.
 
 nodestore: 0002_nodestore_no_dictfield
-sentry: 0466_gh_comment_index
+sentry: 0467_control_files
 social_auth: 0001_initial

+ 135 - 0
src/sentry/migrations/0467_control_files.py

@@ -0,0 +1,135 @@
+# Generated by Django 2.2.28 on 2023-05-24 05:27
+
+import django.db.models.deletion
+import django.utils.timezone
+from django.db import migrations, models
+
+import sentry.db.models.fields.bounded
+import sentry.db.models.fields.foreignkey
+import sentry.db.models.fields.jsonfield
+from sentry.new_migrations.migrations import CheckedMigration
+
+
+class Migration(CheckedMigration):
+    # This flag is used to mark that a migration shouldn't be automatically run in production. For
+    # the most part, this should only be used for operations where it's safe to run the migration
+    # after your code has deployed. So this should not be used for most operations that alter the
+    # schema of a table.
+    # Here are some things that make sense to mark as dangerous:
+    # - Large data migrations. Typically we want these to be run manually by ops so that they can
+    #   be monitored and not block the deploy for a long period of time while they run.
+    # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
+    #   have ops run this and not block the deploy. Note that while adding an index is a schema
+    #   change, it's completely safe to run the operation after the code has deployed.
+    is_dangerous = False
+
+    dependencies = [
+        ("sentry", "0466_gh_comment_index"),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name="ControlFile",
+            fields=[
+                (
+                    "id",
+                    sentry.db.models.fields.bounded.BoundedBigAutoField(
+                        primary_key=True, serialize=False
+                    ),
+                ),
+                ("name", models.TextField()),
+                ("type", models.CharField(max_length=64)),
+                (
+                    "timestamp",
+                    models.DateTimeField(db_index=True, default=django.utils.timezone.now),
+                ),
+                ("headers", sentry.db.models.fields.jsonfield.JSONField(default=dict)),
+                ("size", sentry.db.models.fields.bounded.BoundedPositiveIntegerField(null=True)),
+                ("checksum", models.CharField(db_index=True, max_length=40, null=True)),
+            ],
+            options={
+                "db_table": "sentry_controlfile",
+            },
+        ),
+        migrations.CreateModel(
+            name="ControlFileBlob",
+            fields=[
+                (
+                    "id",
+                    sentry.db.models.fields.bounded.BoundedBigAutoField(
+                        primary_key=True, serialize=False
+                    ),
+                ),
+                ("path", models.TextField(null=True)),
+                ("size", sentry.db.models.fields.bounded.BoundedPositiveIntegerField(null=True)),
+                ("checksum", models.CharField(max_length=40, unique=True)),
+                (
+                    "timestamp",
+                    models.DateTimeField(db_index=True, default=django.utils.timezone.now),
+                ),
+            ],
+            options={
+                "db_table": "sentry_controlfileblob",
+            },
+        ),
+        migrations.CreateModel(
+            name="ControlFileBlobIndex",
+            fields=[
+                (
+                    "id",
+                    sentry.db.models.fields.bounded.BoundedBigAutoField(
+                        primary_key=True, serialize=False
+                    ),
+                ),
+                ("offset", sentry.db.models.fields.bounded.BoundedPositiveIntegerField()),
+                (
+                    "blob",
+                    sentry.db.models.fields.foreignkey.FlexibleForeignKey(
+                        on_delete=django.db.models.deletion.PROTECT, to="sentry.ControlFileBlob"
+                    ),
+                ),
+                (
+                    "file",
+                    sentry.db.models.fields.foreignkey.FlexibleForeignKey(
+                        on_delete=django.db.models.deletion.CASCADE, to="sentry.ControlFile"
+                    ),
+                ),
+            ],
+            options={
+                "db_table": "sentry_controlfileblobindex",
+                "unique_together": {("file", "blob", "offset")},
+            },
+        ),
+        migrations.AddField(
+            model_name="controlfile",
+            name="blobs",
+            field=models.ManyToManyField(
+                through="sentry.ControlFileBlobIndex", to="sentry.ControlFileBlob"
+            ),
+        ),
+        migrations.CreateModel(
+            name="ControlFileBlobOwner",
+            fields=[
+                (
+                    "id",
+                    sentry.db.models.fields.bounded.BoundedBigAutoField(
+                        primary_key=True, serialize=False
+                    ),
+                ),
+                (
+                    "organization_id",
+                    sentry.db.models.fields.bounded.BoundedBigIntegerField(db_index=True),
+                ),
+                (
+                    "blob",
+                    sentry.db.models.fields.foreignkey.FlexibleForeignKey(
+                        on_delete=django.db.models.deletion.CASCADE, to="sentry.ControlFileBlob"
+                    ),
+                ),
+            ],
+            options={
+                "db_table": "sentry_controlfileblobowner",
+                "unique_together": {("blob", "organization_id")},
+            },
+        ),
+    ]

+ 1 - 1
src/sentry/models/__init__.py

@@ -35,7 +35,7 @@ from .eventattachment import *  # NOQA
 from .eventerror import *  # NOQA
 from .eventuser import *  # NOQA
 from .featureadoption import *  # NOQA
-from .file import *  # NOQA
+from .files import *  # NOQA
 from .group import *  # NOQA
 from .groupassignee import *  # NOQA
 from .groupbookmark import *  # NOQA

+ 2 - 1
src/sentry/models/debugfile.py

@@ -43,7 +43,8 @@ from sentry.db.models import (
     region_silo_only_model,
     sane_repr,
 )
-from sentry.models.file import File, clear_cached_files
+from sentry.models.files.file import File
+from sentry.models.files.utils import clear_cached_files
 from sentry.reprocessing import bump_reprocessing_revision, resolve_processing_issue
 from sentry.utils import json
 from sentry.utils.zip import safe_extract_zip

+ 1 - 694
src/sentry/models/file.py

@@ -1,694 +1 @@
-import io
-import mmap
-import os
-import tempfile
-import time
-from concurrent.futures import ThreadPoolExecutor
-from contextlib import contextmanager
-from hashlib import sha1
-from threading import Semaphore
-from uuid import uuid4
-
-from django.conf import settings
-from django.core.files.base import ContentFile
-from django.core.files.base import File as FileObj
-from django.core.files.storage import get_storage_class
-from django.db import IntegrityError, models, router, transaction
-from django.utils import timezone
-
-from sentry.db.models import (
-    BoundedBigIntegerField,
-    BoundedPositiveIntegerField,
-    FlexibleForeignKey,
-    JSONField,
-    Model,
-    region_silo_only_model,
-)
-from sentry.locks import locks
-from sentry.tasks.files import delete_file as delete_file_task
-from sentry.tasks.files import delete_unreferenced_blobs
-from sentry.utils import metrics
-from sentry.utils.db import atomic_transaction
-from sentry.utils.retries import TimedRetryPolicy
-
-ONE_DAY = 60 * 60 * 24
-ONE_DAY_AND_A_HALF = int(ONE_DAY * 1.5)
-
-UPLOAD_RETRY_TIME = settings.SENTRY_UPLOAD_RETRY_TIME
-
-DEFAULT_BLOB_SIZE = 1024 * 1024  # one mb
-CHUNK_STATE_HEADER = "__state"
-MULTI_BLOB_UPLOAD_CONCURRENCY = 8
-MAX_FILE_SIZE = 2**31  # 2GB is the maximum offset supported by fileblob
-
-
-class nooplogger:
-    debug = staticmethod(lambda *a, **kw: None)
-    info = staticmethod(lambda *a, **kw: None)
-    warning = staticmethod(lambda *a, **kw: None)
-    error = staticmethod(lambda *a, **kw: None)
-    critical = staticmethod(lambda *a, **kw: None)
-    log = staticmethod(lambda *a, **kw: None)
-    exception = staticmethod(lambda *a, **kw: None)
-
-
-def _get_size_and_checksum(fileobj, logger=nooplogger):
-    logger.debug("_get_size_and_checksum.start")
-    size = 0
-    checksum = sha1()
-    while True:
-        chunk = fileobj.read(65536)
-        if not chunk:
-            break
-        size += len(chunk)
-        checksum.update(chunk)
-
-    logger.debug("_get_size_and_checksum.end")
-    return size, checksum.hexdigest()
-
-
-@contextmanager
-def _locked_blob(checksum, logger=nooplogger):
-    logger.debug("_locked_blob.start", extra={"checksum": checksum})
-    lock = locks.get(
-        f"fileblob:upload:{checksum}", duration=UPLOAD_RETRY_TIME, name="fileblob_upload_model"
-    )
-    with TimedRetryPolicy(UPLOAD_RETRY_TIME, metric_instance="lock.fileblob.upload")(lock.acquire):
-        logger.debug("_locked_blob.acquired", extra={"checksum": checksum})
-        # test for presence
-        try:
-            existing = FileBlob.objects.get(checksum=checksum)
-        except FileBlob.DoesNotExist:
-            existing = None
-        yield existing
-    logger.debug("_locked_blob.end", extra={"checksum": checksum})
-
-
-class AssembleChecksumMismatch(Exception):
-    pass
-
-
-def get_storage(config=None):
-
-    if config is not None:
-        backend = config["backend"]
-        options = config["options"]
-    else:
-        from sentry import options as options_store
-
-        backend = options_store.get("filestore.backend")
-        options = options_store.get("filestore.options")
-
-    try:
-        backend = settings.SENTRY_FILESTORE_ALIASES[backend]
-    except KeyError:
-        pass
-
-    storage = get_storage_class(backend)
-    return storage(**options)
-
-
-@region_silo_only_model
-class FileBlob(Model):
-    __include_in_export__ = False
-
-    path = models.TextField(null=True)
-    size = BoundedPositiveIntegerField(null=True)
-    checksum = models.CharField(max_length=40, unique=True)
-    timestamp = models.DateTimeField(default=timezone.now, db_index=True)
-
-    class Meta:
-        app_label = "sentry"
-        db_table = "sentry_fileblob"
-
-    @classmethod
-    def from_files(cls, files, organization=None, logger=nooplogger):
-        """A faster version of `from_file` for multiple files at the time.
-        If an organization is provided it will also create `FileBlobOwner`
-        entries.  Files can be a list of files or tuples of file and checksum.
-        If both are provided then a checksum check is performed.
-
-        If the checksums mismatch an `IOError` is raised.
-        """
-        logger.debug("FileBlob.from_files.start")
-
-        files_with_checksums = []
-        for fileobj in files:
-            if isinstance(fileobj, tuple):
-                files_with_checksums.append(fileobj)
-            else:
-                files_with_checksums.append((fileobj, None))
-
-        checksums_seen = set()
-        blobs_created = []
-        blobs_to_save = []
-        locks = set()
-        semaphore = Semaphore(value=MULTI_BLOB_UPLOAD_CONCURRENCY)
-
-        def _upload_and_pend_chunk(fileobj, size, checksum, lock):
-            logger.debug(
-                "FileBlob.from_files._upload_and_pend_chunk.start",
-                extra={"checksum": checksum, "size": size},
-            )
-            blob = cls(size=size, checksum=checksum)
-            blob.path = cls.generate_unique_path()
-            storage = get_storage()
-            storage.save(blob.path, fileobj)
-            blobs_to_save.append((blob, lock))
-            metrics.timing("filestore.blob-size", size, tags={"function": "from_files"})
-            logger.debug(
-                "FileBlob.from_files._upload_and_pend_chunk.end",
-                extra={"checksum": checksum, "path": blob.path},
-            )
-
-        def _ensure_blob_owned(blob):
-            if organization is None:
-                return
-            try:
-                with atomic_transaction(using=router.db_for_write(FileBlobOwner)):
-                    FileBlobOwner.objects.create(organization_id=organization.id, blob=blob)
-            except IntegrityError:
-                pass
-
-        def _save_blob(blob):
-            logger.debug("FileBlob.from_files._save_blob.start", extra={"path": blob.path})
-            blob.save()
-            _ensure_blob_owned(blob)
-            logger.debug("FileBlob.from_files._save_blob.end", extra={"path": blob.path})
-
-        def _flush_blobs():
-            while True:
-                try:
-                    blob, lock = blobs_to_save.pop()
-                except IndexError:
-                    break
-
-                _save_blob(blob)
-                lock.__exit__(None, None, None)
-                locks.discard(lock)
-                semaphore.release()
-
-        try:
-            with ThreadPoolExecutor(max_workers=MULTI_BLOB_UPLOAD_CONCURRENCY) as exe:
-                for fileobj, reference_checksum in files_with_checksums:
-                    logger.debug(
-                        "FileBlob.from_files.executor_start", extra={"checksum": reference_checksum}
-                    )
-                    _flush_blobs()
-
-                    # Before we go and do something with the files we calculate
-                    # the checksums and compare it against the reference.  This
-                    # also deduplicates duplicates uploaded in the same request.
-                    # This is necessary because we acquire multiple locks in one
-                    # go which would let us deadlock otherwise.
-                    size, checksum = _get_size_and_checksum(fileobj)
-                    if reference_checksum is not None and checksum != reference_checksum:
-                        raise OSError("Checksum mismatch")
-                    if checksum in checksums_seen:
-                        continue
-                    checksums_seen.add(checksum)
-
-                    # Check if we need to lock the blob.  If we get a result back
-                    # here it means the blob already exists.
-                    lock = _locked_blob(checksum, logger=logger)
-                    existing = lock.__enter__()
-                    if existing is not None:
-                        lock.__exit__(None, None, None)
-                        blobs_created.append(existing)
-                        _ensure_blob_owned(existing)
-                        continue
-
-                    # Remember the lock to force unlock all at the end if we
-                    # encounter any difficulties.
-                    locks.add(lock)
-
-                    # Otherwise we leave the blob locked and submit the task.
-                    # We use the semaphore to ensure we never schedule too
-                    # many.  The upload will be done with a certain amount
-                    # of concurrency controlled by the semaphore and the
-                    # `_flush_blobs` call will take all those uploaded
-                    # blobs and associate them with the database.
-                    semaphore.acquire()
-                    exe.submit(_upload_and_pend_chunk(fileobj, size, checksum, lock))
-                    logger.debug("FileBlob.from_files.end", extra={"checksum": reference_checksum})
-
-            _flush_blobs()
-        finally:
-            for lock in locks:
-                try:
-                    lock.__exit__(None, None, None)
-                except Exception:
-                    pass
-            logger.debug("FileBlob.from_files.end")
-
-    @classmethod
-    def from_file(cls, fileobj, logger=nooplogger):
-        """
-        Retrieve a single FileBlob instances for the given file.
-        """
-        logger.debug("FileBlob.from_file.start")
-
-        size, checksum = _get_size_and_checksum(fileobj)
-
-        # TODO(dcramer): the database here is safe, but if this lock expires
-        # and duplicate files are uploaded then we need to prune one
-        with _locked_blob(checksum, logger=logger) as existing:
-            if existing is not None:
-                return existing
-
-            blob = cls(size=size, checksum=checksum)
-            blob.path = cls.generate_unique_path()
-            storage = get_storage()
-            storage.save(blob.path, fileobj)
-            blob.save()
-
-        metrics.timing("filestore.blob-size", size)
-        logger.debug("FileBlob.from_file.end")
-        return blob
-
-    @classmethod
-    def generate_unique_path(cls):
-        # We intentionally do not use checksums as path names to avoid concurrency issues
-        # when we attempt concurrent uploads for any reason.
-        uuid_hex = uuid4().hex
-        pieces = [uuid_hex[:2], uuid_hex[2:6], uuid_hex[6:]]
-        return "/".join(pieces)
-
-    def delete(self, *args, **kwargs):
-        if self.path:
-            self.deletefile(commit=False)
-        lock = locks.get(
-            f"fileblob:upload:{self.checksum}",
-            duration=UPLOAD_RETRY_TIME,
-            name="fileblob_upload_delete",
-        )
-        with TimedRetryPolicy(UPLOAD_RETRY_TIME, metric_instance="lock.fileblob.delete")(
-            lock.acquire
-        ):
-            super().delete(*args, **kwargs)
-
-    def deletefile(self, commit=False):
-        assert self.path
-
-        # Defer this by 1 minute just to make sure
-        # we avoid any transaction isolation where the
-        # FileBlob row might still be visible by the
-        # task before transaction is committed.
-        delete_file_task.apply_async(
-            kwargs={"path": self.path, "checksum": self.checksum}, countdown=60
-        )
-
-        self.path = None
-
-        if commit:
-            self.save()
-
-    def getfile(self):
-        """
-        Return a file-like object for this File's content.
-
-        >>> with blob.getfile() as src, open('/tmp/localfile', 'wb') as dst:
-        >>>     for chunk in src.chunks():
-        >>>         dst.write(chunk)
-        """
-        assert self.path
-
-        storage = get_storage()
-        return storage.open(self.path)
-
-
-@region_silo_only_model
-class File(Model):
-    __include_in_export__ = False
-
-    name = models.TextField()
-    type = models.CharField(max_length=64)
-    timestamp = models.DateTimeField(default=timezone.now, db_index=True)
-    headers = JSONField()
-    blobs = models.ManyToManyField("sentry.FileBlob", through="sentry.FileBlobIndex")
-    size = BoundedPositiveIntegerField(null=True)
-    checksum = models.CharField(max_length=40, null=True, db_index=True)
-
-    # <Legacy fields>
-    # Remove in 8.1
-    blob = FlexibleForeignKey("sentry.FileBlob", null=True, related_name="legacy_blob")
-    path = models.TextField(null=True)
-
-    # </Legacy fields>
-
-    class Meta:
-        app_label = "sentry"
-        db_table = "sentry_file"
-
-    def _get_chunked_blob(self, mode=None, prefetch=False, prefetch_to=None, delete=True):
-        return ChunkedFileBlobIndexWrapper(
-            FileBlobIndex.objects.filter(file=self).select_related("blob").order_by("offset"),
-            mode=mode,
-            prefetch=prefetch,
-            prefetch_to=prefetch_to,
-            delete=delete,
-        )
-
-    def getfile(self, mode=None, prefetch=False):
-        """Returns a file object.  By default the file is fetched on
-        demand but if prefetch is enabled the file is fully prefetched
-        into a tempfile before reading can happen.
-        """
-        impl = self._get_chunked_blob(mode, prefetch)
-        return FileObj(impl, self.name)
-
-    def save_to(self, path):
-        """Fetches the file and emplaces it at a certain location.  The
-        write is done atomically to a tempfile first and then moved over.
-        If the directory does not exist it is created.
-        """
-        path = os.path.abspath(path)
-        base = os.path.dirname(path)
-        try:
-            os.makedirs(base)
-        except OSError:
-            pass
-
-        f = None
-        try:
-            f = self._get_chunked_blob(
-                prefetch=True, prefetch_to=base, delete=False
-            ).detach_tempfile()
-
-            # pre-emptively check if the file already exists.
-            # this can happen as a race condition if two processes/threads
-            # are trying to cache the same file and both try to write
-            # at the same time, overwriting each other. Normally this is fine,
-            # but can cause an issue if another process has opened the file
-            # for reading, then the file that was being read gets clobbered.
-            # I don't know if this affects normal filesystems, but it
-            # definitely has an issue if the filesystem is NFS.
-            if not os.path.exists(path):
-                os.rename(f.name, path)
-                f.close()
-                f = None
-        finally:
-            if f is not None:
-                f.close()
-                try:
-                    os.remove(f.name)
-                except Exception:
-                    pass
-
-    def putfile(self, fileobj, blob_size=DEFAULT_BLOB_SIZE, commit=True, logger=nooplogger):
-        """
-        Save a fileobj into a number of chunks.
-
-        Returns a list of `FileBlobIndex` items.
-
-        >>> indexes = file.putfile(fileobj)
-        """
-        results = []
-        offset = 0
-        checksum = sha1(b"")
-
-        while True:
-            contents = fileobj.read(blob_size)
-            if not contents:
-                break
-            checksum.update(contents)
-
-            blob_fileobj = ContentFile(contents)
-            blob = FileBlob.from_file(blob_fileobj, logger=logger)
-            results.append(FileBlobIndex.objects.create(file=self, blob=blob, offset=offset))
-            offset += blob.size
-        self.size = offset
-        self.checksum = checksum.hexdigest()
-        metrics.timing("filestore.file-size", offset)
-        if commit:
-            self.save()
-        return results
-
-    def assemble_from_file_blob_ids(self, file_blob_ids, checksum, commit=True):
-        """
-        This creates a file, from file blobs and returns a temp file with the
-        contents.
-        """
-        tf = tempfile.NamedTemporaryFile()
-        with atomic_transaction(
-            using=(
-                router.db_for_write(FileBlob),
-                router.db_for_write(FileBlobIndex),
-            )
-        ):
-            file_blobs = FileBlob.objects.filter(id__in=file_blob_ids).all()
-
-            # Ensure blobs are in the order and duplication as provided
-            blobs_by_id = {blob.id: blob for blob in file_blobs}
-            file_blobs = [blobs_by_id[blob_id] for blob_id in file_blob_ids]
-
-            new_checksum = sha1(b"")
-            offset = 0
-            for blob in file_blobs:
-                FileBlobIndex.objects.create(file=self, blob=blob, offset=offset)
-                with blob.getfile() as blobfile:
-                    for chunk in blobfile.chunks():
-                        new_checksum.update(chunk)
-                        tf.write(chunk)
-                offset += blob.size
-
-            self.size = offset
-            self.checksum = new_checksum.hexdigest()
-
-            if checksum != self.checksum:
-                raise AssembleChecksumMismatch("Checksum mismatch")
-
-        metrics.timing("filestore.file-size", offset)
-        if commit:
-            self.save()
-        tf.flush()
-        tf.seek(0)
-        return tf
-
-    def delete(self, *args, **kwargs):
-        blob_ids = [blob.id for blob in self.blobs.all()]
-        super().delete(*args, **kwargs)
-
-        # Wait to delete blobs. This helps prevent
-        # races around frequently used blobs in debug images and release files.
-        transaction.on_commit(
-            lambda: delete_unreferenced_blobs.apply_async(
-                kwargs={"blob_ids": blob_ids}, countdown=60 * 5
-            ),
-            using=router.db_for_write(type(self)),
-        )
-
-
-@region_silo_only_model
-class FileBlobIndex(Model):
-    __include_in_export__ = False
-
-    file = FlexibleForeignKey("sentry.File")
-    blob = FlexibleForeignKey("sentry.FileBlob", on_delete=models.PROTECT)
-    offset = BoundedPositiveIntegerField()
-
-    class Meta:
-        app_label = "sentry"
-        db_table = "sentry_fileblobindex"
-        unique_together = (("file", "blob", "offset"),)
-
-
-class ChunkedFileBlobIndexWrapper:
-    def __init__(self, indexes, mode=None, prefetch=False, prefetch_to=None, delete=True):
-        # eager load from database incase its a queryset
-        self._indexes = list(indexes)
-        self._curfile = None
-        self._curidx = None
-        if prefetch:
-            self.prefetched = True
-            self._prefetch(prefetch_to, delete)
-        else:
-            self.prefetched = False
-        self.mode = mode
-        self.open()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_value, tb):
-        self.close()
-
-    def detach_tempfile(self):
-        if not self.prefetched:
-            raise TypeError("Can only detech tempfiles in prefetch mode")
-        rv = self._curfile
-        self._curfile = None
-        self.close()
-        rv.seek(0)
-        return rv
-
-    def _nextidx(self):
-        assert not self.prefetched, "this makes no sense"
-        old_file = self._curfile
-        try:
-            try:
-                self._curidx = next(self._idxiter)
-                self._curfile = self._curidx.blob.getfile()
-            except StopIteration:
-                self._curidx = None
-                self._curfile = None
-        finally:
-            if old_file is not None:
-                old_file.close()
-
-    @property
-    def size(self):
-        return sum(i.blob.size for i in self._indexes)
-
-    def open(self):
-        self.closed = False
-        self.seek(0)
-
-    def _prefetch(self, prefetch_to=None, delete=True):
-        size = self.size
-        f = tempfile.NamedTemporaryFile(prefix="._prefetch-", dir=prefetch_to, delete=delete)
-        if size == 0:
-            self._curfile = f
-            return
-
-        # Zero out the file
-        f.seek(size - 1)
-        f.write(b"\x00")
-        f.flush()
-
-        mem = mmap.mmap(f.fileno(), size)
-
-        def fetch_file(offset, getfile):
-            with getfile() as sf:
-                while True:
-                    chunk = sf.read(65535)
-                    if not chunk:
-                        break
-                    mem[offset : offset + len(chunk)] = chunk
-                    offset += len(chunk)
-
-        with ThreadPoolExecutor(max_workers=4) as exe:
-            for idx in self._indexes:
-                exe.submit(fetch_file, idx.offset, idx.blob.getfile)
-
-        mem.flush()
-        self._curfile = f
-
-    def close(self):
-        if self._curfile:
-            self._curfile.close()
-        self._curfile = None
-        self._curidx = None
-        self.closed = True
-
-    def _seek(self, pos):
-        if self.closed:
-            raise ValueError("I/O operation on closed file")
-
-        if self.prefetched:
-            return self._curfile.seek(pos)
-
-        if pos < 0:
-            raise OSError("Invalid argument")
-        if pos == 0 and not self._indexes:
-            # Empty file, there's no seeking to be done.
-            return
-
-        for n, idx in enumerate(self._indexes[::-1]):
-            if idx.offset <= pos:
-                if idx != self._curidx:
-                    self._idxiter = iter(self._indexes[-(n + 1) :])
-                    self._nextidx()
-                break
-        else:
-            raise ValueError("Cannot seek to pos")
-        self._curfile.seek(pos - self._curidx.offset)
-
-    def seek(self, pos, whence=io.SEEK_SET):
-        if whence == io.SEEK_SET:
-            return self._seek(pos)
-        if whence == io.SEEK_CUR:
-            return self._seek(self.tell() + pos)
-        if whence == io.SEEK_END:
-            return self._seek(self.size + pos)
-
-        raise ValueError(f"Invalid value for whence: {whence}")
-
-    def tell(self):
-        if self.closed:
-            raise ValueError("I/O operation on closed file")
-        if self.prefetched:
-            return self._curfile.tell()
-        if self._curfile is None:
-            return self.size
-        return self._curidx.offset + self._curfile.tell()
-
-    def read(self, n=-1):
-        if self.closed:
-            raise ValueError("I/O operation on closed file")
-
-        if self.prefetched:
-            return self._curfile.read(n)
-
-        result = bytearray()
-
-        # Read to the end of the file
-        if n < 0:
-            while self._curfile is not None:
-                blob_result = self._curfile.read(32768)
-                if not blob_result:
-                    self._nextidx()
-                else:
-                    result.extend(blob_result)
-
-        # Read until a certain number of bytes are read
-        else:
-            while n > 0 and self._curfile is not None:
-                blob_result = self._curfile.read(min(n, 32768))
-                if not blob_result:
-                    self._nextidx()
-                else:
-                    n -= len(blob_result)
-                    result.extend(blob_result)
-
-        return bytes(result)
-
-
-@region_silo_only_model
-class FileBlobOwner(Model):
-    __include_in_export__ = False
-
-    blob = FlexibleForeignKey("sentry.FileBlob")
-    organization_id = BoundedBigIntegerField(db_index=True)
-
-    class Meta:
-        app_label = "sentry"
-        db_table = "sentry_fileblobowner"
-        unique_together = (("blob", "organization_id"),)
-
-
-def clear_cached_files(cache_path):
-    try:
-        cache_folders = os.listdir(cache_path)
-    except OSError:
-        return
-
-    cutoff = int(time.time()) - ONE_DAY_AND_A_HALF
-
-    for cache_folder in cache_folders:
-        cache_folder = os.path.join(cache_path, cache_folder)
-        try:
-            items = os.listdir(cache_folder)
-        except OSError:
-            continue
-        for cached_file in items:
-            cached_file = os.path.join(cache_folder, cached_file)
-            try:
-                mtime = os.path.getmtime(cached_file)
-            except OSError:
-                continue
-            if mtime < cutoff:
-                try:
-                    os.remove(cached_file)
-                except OSError:
-                    pass
+from sentry.models.files import *  # NOQA

+ 25 - 0
src/sentry/models/files/__init__.py

@@ -0,0 +1,25 @@
+from .control_file import ControlFile
+from .control_fileblob import ControlFileBlob
+from .control_fileblobindex import ControlFileBlobIndex
+from .control_fileblobowner import ControlFileBlobOwner
+from .file import File
+from .fileblob import FileBlob
+from .fileblobindex import FileBlobIndex
+from .fileblobowner import FileBlobOwner
+from .utils import DEFAULT_BLOB_SIZE, MAX_FILE_SIZE, ONE_DAY, AssembleChecksumMismatch, get_storage
+
+__all__ = (
+    "File",
+    "FileBlob",
+    "FileBlobIndex",
+    "FileBlobOwner",
+    "ControlFile",
+    "ControlFileBlob",
+    "ControlFileBlobIndex",
+    "ControlFileBlobOwner",
+    "ONE_DAY",
+    "DEFAULT_BLOB_SIZE",
+    "MAX_FILE_SIZE",
+    "AssembleChecksumMismatch",
+    "get_storage",
+)

+ 337 - 0
src/sentry/models/files/abstractfile.py

@@ -0,0 +1,337 @@
+import io
+import mmap
+import os
+import tempfile
+from concurrent.futures import ThreadPoolExecutor
+from hashlib import sha1
+
+from django.core.files.base import ContentFile
+from django.core.files.base import File as FileObj
+from django.db import models, router, transaction
+from django.utils import timezone
+
+from sentry.db.models import BoundedPositiveIntegerField, JSONField, Model
+from sentry.models.files.utils import DEFAULT_BLOB_SIZE, AssembleChecksumMismatch, nooplogger
+from sentry.utils import metrics
+from sentry.utils.db import atomic_transaction
+
+
+class ChunkedFileBlobIndexWrapper:
+    def __init__(self, indexes, mode=None, prefetch=False, prefetch_to=None, delete=True):
+        # eager load from database incase its a queryset
+        self._indexes = list(indexes)
+        self._curfile = None
+        self._curidx = None
+        if prefetch:
+            self.prefetched = True
+            self._prefetch(prefetch_to, delete)
+        else:
+            self.prefetched = False
+        self.mode = mode
+        self.open()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        self.close()
+
+    def detach_tempfile(self):
+        if not self.prefetched:
+            raise TypeError("Can only detech tempfiles in prefetch mode")
+        rv = self._curfile
+        self._curfile = None
+        self.close()
+        rv.seek(0)
+        return rv
+
+    def _nextidx(self):
+        assert not self.prefetched, "this makes no sense"
+        old_file = self._curfile
+        try:
+            try:
+                self._curidx = next(self._idxiter)
+                self._curfile = self._curidx.blob.getfile()
+            except StopIteration:
+                self._curidx = None
+                self._curfile = None
+        finally:
+            if old_file is not None:
+                old_file.close()
+
+    @property
+    def size(self):
+        return sum(i.blob.size for i in self._indexes)
+
+    def open(self):
+        self.closed = False
+        self.seek(0)
+
+    def _prefetch(self, prefetch_to=None, delete=True):
+        size = self.size
+        f = tempfile.NamedTemporaryFile(prefix="._prefetch-", dir=prefetch_to, delete=delete)
+        if size == 0:
+            self._curfile = f
+            return
+
+        # Zero out the file
+        f.seek(size - 1)
+        f.write(b"\x00")
+        f.flush()
+
+        mem = mmap.mmap(f.fileno(), size)
+
+        def fetch_file(offset, getfile):
+            with getfile() as sf:
+                while True:
+                    chunk = sf.read(65535)
+                    if not chunk:
+                        break
+                    mem[offset : offset + len(chunk)] = chunk
+                    offset += len(chunk)
+
+        with ThreadPoolExecutor(max_workers=4) as exe:
+            for idx in self._indexes:
+                exe.submit(fetch_file, idx.offset, idx.blob.getfile)
+
+        mem.flush()
+        self._curfile = f
+
+    def close(self):
+        if self._curfile:
+            self._curfile.close()
+        self._curfile = None
+        self._curidx = None
+        self.closed = True
+
+    def _seek(self, pos):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+
+        if self.prefetched:
+            return self._curfile.seek(pos)
+
+        if pos < 0:
+            raise OSError("Invalid argument")
+        if pos == 0 and not self._indexes:
+            # Empty file, there's no seeking to be done.
+            return
+
+        for n, idx in enumerate(self._indexes[::-1]):
+            if idx.offset <= pos:
+                if idx != self._curidx:
+                    self._idxiter = iter(self._indexes[-(n + 1) :])
+                    self._nextidx()
+                break
+        else:
+            raise ValueError("Cannot seek to pos")
+        self._curfile.seek(pos - self._curidx.offset)
+
+    def seek(self, pos, whence=io.SEEK_SET):
+        if whence == io.SEEK_SET:
+            return self._seek(pos)
+        if whence == io.SEEK_CUR:
+            return self._seek(self.tell() + pos)
+        if whence == io.SEEK_END:
+            return self._seek(self.size + pos)
+
+        raise ValueError(f"Invalid value for whence: {whence}")
+
+    def tell(self):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        if self.prefetched:
+            return self._curfile.tell()
+        if self._curfile is None:
+            return self.size
+        return self._curidx.offset + self._curfile.tell()
+
+    def read(self, n=-1):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+
+        if self.prefetched:
+            return self._curfile.read(n)
+
+        result = bytearray()
+
+        # Read to the end of the file
+        if n < 0:
+            while self._curfile is not None:
+                blob_result = self._curfile.read(32768)
+                if not blob_result:
+                    self._nextidx()
+                else:
+                    result.extend(blob_result)
+
+        # Read until a certain number of bytes are read
+        else:
+            while n > 0 and self._curfile is not None:
+                blob_result = self._curfile.read(min(n, 32768))
+                if not blob_result:
+                    self._nextidx()
+                else:
+                    n -= len(blob_result)
+                    result.extend(blob_result)
+
+        return bytes(result)
+
+
+class AbstractFile(Model):
+    __include_in_export__ = False
+
+    name = models.TextField()
+    type = models.CharField(max_length=64)
+    timestamp = models.DateTimeField(default=timezone.now, db_index=True)
+    headers = JSONField()
+    size = BoundedPositiveIntegerField(null=True)
+    checksum = models.CharField(max_length=40, null=True, db_index=True)
+
+    class Meta:
+        abstract = True
+
+    FILE_BLOB_MODEL = None
+    FILE_BLOB_INDEX_MODEL = None
+    DELETE_UNREFERENCED_BLOB_TASK = None
+
+    def _get_chunked_blob(self, mode=None, prefetch=False, prefetch_to=None, delete=True):
+        return ChunkedFileBlobIndexWrapper(
+            self.FILE_BLOB_INDEX_MODEL.objects.filter(file=self)
+            .select_related("blob")
+            .order_by("offset"),
+            mode=mode,
+            prefetch=prefetch,
+            prefetch_to=prefetch_to,
+            delete=delete,
+        )
+
+    def getfile(self, mode=None, prefetch=False):
+        """Returns a file object.  By default the file is fetched on
+        demand but if prefetch is enabled the file is fully prefetched
+        into a tempfile before reading can happen.
+        """
+        impl = self._get_chunked_blob(mode, prefetch)
+        return FileObj(impl, self.name)
+
+    def save_to(self, path):
+        """Fetches the file and emplaces it at a certain location.  The
+        write is done atomically to a tempfile first and then moved over.
+        If the directory does not exist it is created.
+        """
+        path = os.path.abspath(path)
+        base = os.path.dirname(path)
+        try:
+            os.makedirs(base)
+        except OSError:
+            pass
+
+        f = None
+        try:
+            f = self._get_chunked_blob(
+                prefetch=True, prefetch_to=base, delete=False
+            ).detach_tempfile()
+
+            # pre-emptively check if the file already exists.
+            # this can happen as a race condition if two processes/threads
+            # are trying to cache the same file and both try to write
+            # at the same time, overwriting each other. Normally this is fine,
+            # but can cause an issue if another process has opened the file
+            # for reading, then the file that was being read gets clobbered.
+            # I don't know if this affects normal filesystems, but it
+            # definitely has an issue if the filesystem is NFS.
+            if not os.path.exists(path):
+                os.rename(f.name, path)
+                f.close()
+                f = None
+        finally:
+            if f is not None:
+                f.close()
+                try:
+                    os.remove(f.name)
+                except Exception:
+                    pass
+
+    def putfile(self, fileobj, blob_size=DEFAULT_BLOB_SIZE, commit=True, logger=nooplogger):
+        """
+        Save a fileobj into a number of chunks.
+
+        Returns a list of `FileBlobIndex` items.
+
+        >>> indexes = file.putfile(fileobj)
+        """
+        results = []
+        offset = 0
+        checksum = sha1(b"")
+
+        while True:
+            contents = fileobj.read(blob_size)
+            if not contents:
+                break
+            checksum.update(contents)
+
+            blob_fileobj = ContentFile(contents)
+            blob = self.FILE_BLOB_MODEL.from_file(blob_fileobj, logger=logger)
+            results.append(
+                self.FILE_BLOB_INDEX_MODEL.objects.create(file=self, blob=blob, offset=offset)
+            )
+            offset += blob.size
+        self.size = offset
+        self.checksum = checksum.hexdigest()
+        metrics.timing("filestore.file-size", offset)
+        if commit:
+            self.save()
+        return results
+
+    def assemble_from_file_blob_ids(self, file_blob_ids, checksum, commit=True):
+        """
+        This creates a file, from file blobs and returns a temp file with the
+        contents.
+        """
+        tf = tempfile.NamedTemporaryFile()
+        with atomic_transaction(
+            using=(
+                router.db_for_write(self.FILE_BLOB_MODEL),
+                router.db_for_write(self.FILE_BLOB_INDEX_MODEL),
+            )
+        ):
+            file_blobs = self.FILE_BLOB_MODEL.objects.filter(id__in=file_blob_ids).all()
+
+            # Ensure blobs are in the order and duplication as provided
+            blobs_by_id = {blob.id: blob for blob in file_blobs}
+            file_blobs = [blobs_by_id[blob_id] for blob_id in file_blob_ids]
+
+            new_checksum = sha1(b"")
+            offset = 0
+            for blob in file_blobs:
+                self.FILE_BLOB_INDEX_MODEL.objects.create(file=self, blob=blob, offset=offset)
+                with blob.getfile() as blobfile:
+                    for chunk in blobfile.chunks():
+                        new_checksum.update(chunk)
+                        tf.write(chunk)
+                offset += blob.size
+
+            self.size = offset
+            self.checksum = new_checksum.hexdigest()
+
+            if checksum != self.checksum:
+                raise AssembleChecksumMismatch("Checksum mismatch")
+
+        metrics.timing("filestore.file-size", offset)
+        if commit:
+            self.save()
+        tf.flush()
+        tf.seek(0)
+        return tf
+
+    def delete(self, *args, **kwargs):
+        blob_ids = [blob.id for blob in self.blobs.all()]
+        super().delete(*args, **kwargs)
+
+        # Wait to delete blobs. This helps prevent
+        # races around frequently used blobs in debug images and release files.
+        transaction.on_commit(
+            lambda: self.DELETE_UNREFERENCED_BLOB_TASK.apply_async(
+                kwargs={"blob_ids": blob_ids}, countdown=60 * 5
+            ),
+            using=router.db_for_write(type(self)),
+        )

+ 233 - 0
src/sentry/models/files/abstractfileblob.py

@@ -0,0 +1,233 @@
+from concurrent.futures import ThreadPoolExecutor
+from threading import Semaphore
+from uuid import uuid4
+
+from django.db import IntegrityError, models, router
+from django.utils import timezone
+
+from sentry.db.models import BoundedPositiveIntegerField, Model
+from sentry.locks import locks
+from sentry.models.files.utils import (
+    UPLOAD_RETRY_TIME,
+    _get_size_and_checksum,
+    get_storage,
+    locked_blob,
+    nooplogger,
+)
+from sentry.utils import metrics
+from sentry.utils.db import atomic_transaction
+from sentry.utils.retries import TimedRetryPolicy
+
+MULTI_BLOB_UPLOAD_CONCURRENCY = 8
+
+
+class AbstractFileBlob(Model):
+    __include_in_export__ = False
+
+    path = models.TextField(null=True)
+    size = BoundedPositiveIntegerField(null=True)
+    checksum = models.CharField(max_length=40, unique=True)
+    timestamp = models.DateTimeField(default=timezone.now, db_index=True)
+
+    class Meta:
+        abstract = True
+
+    FILE_BLOB_OWNER_MODEL = None
+    DELETE_FILE_TASK = None
+
+    @classmethod
+    def from_files(cls, files, organization=None, logger=nooplogger):
+        """A faster version of `from_file` for multiple files at the time.
+        If an organization is provided it will also create `FileBlobOwner`
+        entries.  Files can be a list of files or tuples of file and checksum.
+        If both are provided then a checksum check is performed.
+
+        If the checksums mismatch an `IOError` is raised.
+        """
+        logger.debug("FileBlob.from_files.start")
+
+        files_with_checksums = []
+        for fileobj in files:
+            if isinstance(fileobj, tuple):
+                files_with_checksums.append(fileobj)
+            else:
+                files_with_checksums.append((fileobj, None))
+
+        checksums_seen = set()
+        blobs_created = []
+        blobs_to_save = []
+        locks = set()
+        semaphore = Semaphore(value=MULTI_BLOB_UPLOAD_CONCURRENCY)
+
+        def _upload_and_pend_chunk(fileobj, size, checksum, lock):
+            logger.debug(
+                "FileBlob.from_files._upload_and_pend_chunk.start",
+                extra={"checksum": checksum, "size": size},
+            )
+            blob = cls(size=size, checksum=checksum)
+            blob.path = cls.generate_unique_path()
+            storage = get_storage()
+            storage.save(blob.path, fileobj)
+            blobs_to_save.append((blob, lock))
+            metrics.timing("filestore.blob-size", size, tags={"function": "from_files"})
+            logger.debug(
+                "FileBlob.from_files._upload_and_pend_chunk.end",
+                extra={"checksum": checksum, "path": blob.path},
+            )
+
+        def _ensure_blob_owned(blob):
+            if organization is None:
+                return
+            try:
+                with atomic_transaction(using=router.db_for_write(cls.FILE_BLOB_OWNER_MODEL)):
+                    cls.FILE_BLOB_OWNER_MODEL.objects.create(
+                        organization_id=organization.id, blob=blob
+                    )
+            except IntegrityError:
+                pass
+
+        def _save_blob(blob):
+            logger.debug("FileBlob.from_files._save_blob.start", extra={"path": blob.path})
+            blob.save()
+            _ensure_blob_owned(blob)
+            logger.debug("FileBlob.from_files._save_blob.end", extra={"path": blob.path})
+
+        def _flush_blobs():
+            while True:
+                try:
+                    blob, lock = blobs_to_save.pop()
+                except IndexError:
+                    break
+
+                _save_blob(blob)
+                lock.__exit__(None, None, None)
+                locks.discard(lock)
+                semaphore.release()
+
+        try:
+            with ThreadPoolExecutor(max_workers=MULTI_BLOB_UPLOAD_CONCURRENCY) as exe:
+                for fileobj, reference_checksum in files_with_checksums:
+                    logger.debug(
+                        "FileBlob.from_files.executor_start", extra={"checksum": reference_checksum}
+                    )
+                    _flush_blobs()
+
+                    # Before we go and do something with the files we calculate
+                    # the checksums and compare it against the reference.  This
+                    # also deduplicates duplicates uploaded in the same request.
+                    # This is necessary because we acquire multiple locks in one
+                    # go which would let us deadlock otherwise.
+                    size, checksum = _get_size_and_checksum(fileobj)
+                    if reference_checksum is not None and checksum != reference_checksum:
+                        raise OSError("Checksum mismatch")
+                    if checksum in checksums_seen:
+                        continue
+                    checksums_seen.add(checksum)
+
+                    # Check if we need to lock the blob.  If we get a result back
+                    # here it means the blob already exists.
+                    lock = locked_blob(cls, checksum, logger=logger)
+                    existing = lock.__enter__()
+                    if existing is not None:
+                        lock.__exit__(None, None, None)
+                        blobs_created.append(existing)
+                        _ensure_blob_owned(existing)
+                        continue
+
+                    # Remember the lock to force unlock all at the end if we
+                    # encounter any difficulties.
+                    locks.add(lock)
+
+                    # Otherwise we leave the blob locked and submit the task.
+                    # We use the semaphore to ensure we never schedule too
+                    # many.  The upload will be done with a certain amount
+                    # of concurrency controlled by the semaphore and the
+                    # `_flush_blobs` call will take all those uploaded
+                    # blobs and associate them with the database.
+                    semaphore.acquire()
+                    exe.submit(_upload_and_pend_chunk(fileobj, size, checksum, lock))
+                    logger.debug("FileBlob.from_files.end", extra={"checksum": reference_checksum})
+
+            _flush_blobs()
+        finally:
+            for lock in locks:
+                try:
+                    lock.__exit__(None, None, None)
+                except Exception:
+                    pass
+            logger.debug("FileBlob.from_files.end")
+
+    @classmethod
+    def from_file(cls, fileobj, logger=nooplogger):
+        """
+        Retrieve a single FileBlob instances for the given file.
+        """
+        logger.debug("FileBlob.from_file.start")
+
+        size, checksum = _get_size_and_checksum(fileobj)
+
+        # TODO(dcramer): the database here is safe, but if this lock expires
+        # and duplicate files are uploaded then we need to prune one
+        with locked_blob(cls, checksum, logger=logger) as existing:
+            if existing is not None:
+                return existing
+
+            blob = cls(size=size, checksum=checksum)
+            blob.path = cls.generate_unique_path()
+            storage = get_storage()
+            storage.save(blob.path, fileobj)
+            blob.save()
+
+        metrics.timing("filestore.blob-size", size)
+        logger.debug("FileBlob.from_file.end")
+        return blob
+
+    @classmethod
+    def generate_unique_path(cls):
+        # We intentionally do not use checksums as path names to avoid concurrency issues
+        # when we attempt concurrent uploads for any reason.
+        uuid_hex = uuid4().hex
+        pieces = [uuid_hex[:2], uuid_hex[2:6], uuid_hex[6:]]
+        return "/".join(pieces)
+
+    def delete(self, *args, **kwargs):
+        if self.path:
+            self.deletefile(commit=False)
+        lock = locks.get(
+            f"fileblob:upload:{self.checksum}",
+            duration=UPLOAD_RETRY_TIME,
+            name="fileblob_upload_delete",
+        )
+        with TimedRetryPolicy(UPLOAD_RETRY_TIME, metric_instance="lock.fileblob.delete")(
+            lock.acquire
+        ):
+            super().delete(*args, **kwargs)
+
+    def deletefile(self, commit=False):
+        assert self.path
+
+        # Defer this by 1 minute just to make sure
+        # we avoid any transaction isolation where the
+        # FileBlob row might still be visible by the
+        # task before transaction is committed.
+        self.DELETE_FILE_TASK.apply_async(
+            kwargs={"path": self.path, "checksum": self.checksum}, countdown=60
+        )
+
+        self.path = None
+
+        if commit:
+            self.save()
+
+    def getfile(self):
+        """
+        Return a file-like object for this File's content.
+
+        >>> with blob.getfile() as src, open('/tmp/localfile', 'wb') as dst:
+        >>>     for chunk in src.chunks():
+        >>>         dst.write(chunk)
+        """
+        assert self.path
+
+        storage = get_storage()
+        return storage.open(self.path)

+ 10 - 0
src/sentry/models/files/abstractfileblobindex.py

@@ -0,0 +1,10 @@
+from sentry.db.models import BoundedPositiveIntegerField, Model
+
+
+class AbstractFileBlobIndex(Model):
+    __include_in_export__ = False
+
+    offset = BoundedPositiveIntegerField()
+
+    class Meta:
+        abstract = True

+ 10 - 0
src/sentry/models/files/abstractfileblobowner.py

@@ -0,0 +1,10 @@
+from sentry.db.models import BoundedBigIntegerField, Model
+
+
+class AbstractFileBlobOwner(Model):
+    __include_in_export__ = False
+
+    organization_id = BoundedBigIntegerField(db_index=True)
+
+    class Meta:
+        abstract = True

Some files were not shown because too many files changed in this diff