models.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import mmap
  2. import tempfile
  3. from concurrent.futures import ThreadPoolExecutor
  4. from hashlib import sha1
  5. from asgiref.sync import sync_to_async
  6. from django.core.files.base import File as FileObj
  7. from django.db import models, transaction
  8. from glitchtip.base_models import CreatedModel
  9. from .exceptions import AssembleChecksumMismatch
  10. def _get_size_and_checksum(fileobj):
  11. size = 0
  12. checksum = sha1()
  13. while True:
  14. chunk = fileobj.read(65536)
  15. if not chunk:
  16. break
  17. size += len(chunk)
  18. checksum.update(chunk)
  19. return size, checksum.hexdigest()
  20. class FileBlob(CreatedModel):
  21. """
  22. Port of sentry.models.file.FileBlob with simplifications
  23. OSS Sentry stores files in file blob chunks. Where one file gets saved as many blobs.
  24. GlitchTip uses Django FileField and does not split files into chunks.
  25. The FileBlob's provide file deduplication. Multiple File objects may refer to the same
  26. FileBlob.
  27. """
  28. blob = models.FileField(upload_to="uploads/file_blobs")
  29. size = models.PositiveIntegerField(null=True)
  30. checksum = models.CharField(max_length=40, unique=True)
  31. @classmethod
  32. async def from_files(cls, files, organization=None, logger=None):
  33. logger.debug("FileBlob.from_files.start")
  34. files_with_checksums = []
  35. for fileobj in files:
  36. if isinstance(fileobj, tuple):
  37. files_with_checksums.append(fileobj)
  38. else:
  39. files_with_checksums.append((fileobj, None))
  40. for file_with_checksum in files_with_checksums:
  41. blob = cls()
  42. blob_file = file_with_checksum[0]
  43. blob.size = file_with_checksum[0].size
  44. blob.checksum = file_with_checksum[1]
  45. if not await FileBlob.objects.filter(checksum=blob.checksum).aexists():
  46. await sync_to_async(blob.blob.save)(blob_file.name, blob_file)
  47. @classmethod
  48. def from_file(cls, fileobj):
  49. """
  50. Retrieve a single FileBlob instances for the given file.
  51. """
  52. checksum = sha1()
  53. with fileobj.open("rb") as f:
  54. if f.multiple_chunks():
  55. for chunk in f.chunks():
  56. checksum.update(chunk)
  57. else:
  58. checksum.update(f.read())
  59. # Significant deviation from OSS Sentry
  60. file_blob, _ = cls.objects.get_or_create(
  61. checksum=checksum.hexdigest(),
  62. defaults={"blob": fileobj, "size": fileobj.size},
  63. )
  64. return file_blob
  65. class File(CreatedModel):
  66. """
  67. Port of sentry.models.file.File
  68. """
  69. name = models.TextField()
  70. # last_used = models.DateTimeField(auto_now=True, db_index=True)
  71. # debug_id = models.UUIDField(
  72. # null=True,
  73. # blank=True,
  74. # db_index=True,
  75. # help_text="Association between file and source code",
  76. # ) # Remove this?
  77. type = models.CharField(max_length=64) # Not currently used
  78. headers = models.JSONField(blank=True, null=True)
  79. blob = models.ForeignKey(FileBlob, on_delete=models.CASCADE, null=True)
  80. size = models.PositiveIntegerField(default=0) # Not currently used
  81. checksum = models.CharField(max_length=40, null=True, db_index=True)
  82. def put_django_file(self, fileobj):
  83. """Save a Django File object as a File Blob"""
  84. self.size = fileobj.size
  85. file_blob = FileBlob.from_file(fileobj)
  86. self.checksum = file_blob.checksum
  87. self.save()
  88. def putfile(self, fileobj):
  89. """Save a file-like object as a File Blob"""
  90. size, checksum = _get_size_and_checksum(fileobj)
  91. fileobj.seek(0)
  92. file_blob, _ = FileBlob.objects.get_or_create(
  93. defaults={"blob": FileObj(fileobj, name=checksum)},
  94. size=size,
  95. checksum=checksum,
  96. )
  97. self.checksum = checksum
  98. self.blob = file_blob
  99. self.save()
  100. def _get_chunked_blob(
  101. self, mode=None, prefetch=False, prefetch_to=None, delete=True
  102. ):
  103. return ChunkedFileBlobIndexWrapper(
  104. FileBlobIndex.objects.filter(file=self)
  105. .select_related("blob")
  106. .order_by("offset"),
  107. mode=mode,
  108. prefetch=prefetch,
  109. prefetch_to=prefetch_to,
  110. delete=delete,
  111. )
  112. def getfile(self):
  113. impl = self._get_chunked_blob()
  114. return FileObj(impl, self.name)
  115. def assemble_from_file_blob_ids(self, file_blob_ids, checksum, commit=True):
  116. """
  117. This creates a file, from file blobs and returns a temp file with the
  118. contents.
  119. """
  120. tf = tempfile.NamedTemporaryFile()
  121. with transaction.atomic():
  122. file_blobs = FileBlob.objects.filter(id__in=file_blob_ids).all()
  123. # Ensure blobs are in the order and duplication as provided
  124. blobs_by_id = {blob.id: blob for blob in file_blobs}
  125. file_blobs = [blobs_by_id[blob_id] for blob_id in file_blob_ids]
  126. new_checksum = sha1(b"")
  127. offset = 0
  128. for blob in file_blobs:
  129. FileBlobIndex.objects.create(file=self, blob=blob, offset=offset)
  130. for chunk in blob.blob.chunks():
  131. new_checksum.update(chunk)
  132. tf.write(chunk)
  133. offset += blob.size
  134. self.size = offset
  135. self.checksum = new_checksum.hexdigest()
  136. if checksum != self.checksum:
  137. raise AssembleChecksumMismatch("Checksum mismatch")
  138. if commit:
  139. self.save()
  140. tf.flush()
  141. tf.seek(0)
  142. return tf
  143. class FileBlobIndex(models.Model):
  144. """
  145. Ported from OSS Sentry. Should be removed as GlitchTip does not
  146. split file blobs into chunks.
  147. """
  148. file = models.ForeignKey(File, on_delete=models.CASCADE)
  149. blob = models.ForeignKey(FileBlob, on_delete=models.CASCADE)
  150. offset = models.PositiveIntegerField()
  151. class Meta:
  152. unique_together = (("file", "blob", "offset"),)
  153. class ChunkedFileBlobIndexWrapper(object):
  154. def __init__(
  155. self, indexes, mode=None, prefetch=False, prefetch_to=None, delete=True
  156. ):
  157. # eager load from database incase its a queryset
  158. self._indexes = list(indexes)
  159. self._curfile = None
  160. self._curidx = None
  161. if prefetch:
  162. self.prefetched = True
  163. self._prefetch(prefetch_to, delete)
  164. else:
  165. self.prefetched = False
  166. self.mode = mode
  167. self.open()
  168. def __enter__(self):
  169. return self
  170. def __exit__(self, exc_type, exc_value, tb):
  171. self.close()
  172. def detach_tempfile(self):
  173. if not self.prefetched:
  174. raise TypeError("Can only detech tempfiles in prefetch mode")
  175. rv = self._curfile
  176. self._curfile = None
  177. self.close()
  178. rv.seek(0)
  179. return rv
  180. def _nextidx(self):
  181. assert not self.prefetched, "this makes no sense"
  182. old_file = self._curfile
  183. try:
  184. try:
  185. self._curidx = next(self._idxiter)
  186. self._curfile = self._curidx.blob.getfile()
  187. except StopIteration:
  188. self._curidx = None
  189. self._curfile = None
  190. finally:
  191. if old_file is not None:
  192. old_file.close()
  193. @property
  194. def size(self):
  195. return sum(i.blob.size for i in self._indexes)
  196. def open(self):
  197. self.closed = False
  198. self.seek(0)
  199. def _prefetch(self, prefetch_to=None, delete=True):
  200. size = self.size
  201. f = tempfile.NamedTemporaryFile(
  202. prefix="._prefetch-", dir=prefetch_to, delete=delete
  203. )
  204. if size == 0:
  205. self._curfile = f
  206. return
  207. # Zero out the file
  208. f.seek(size - 1)
  209. f.write("\x00")
  210. f.flush()
  211. mem = mmap.mmap(f.fileno(), size)
  212. def fetch_file(offset, getfile):
  213. with getfile() as sf:
  214. while True:
  215. chunk = sf.read(65535)
  216. if not chunk:
  217. break
  218. mem[offset : offset + len(chunk)] = chunk
  219. offset += len(chunk)
  220. with ThreadPoolExecutor(max_workers=4) as exe:
  221. for idx in self._indexes:
  222. exe.submit(fetch_file, idx.offset, idx.blob.getfile)
  223. mem.flush()
  224. self._curfile = f
  225. def close(self):
  226. if self._curfile:
  227. self._curfile.close()
  228. self._curfile = None
  229. self._curidx = None
  230. self.closed = True
  231. def seek(self, pos):
  232. if self.closed:
  233. raise ValueError("I/O operation on closed file")
  234. if self.prefetched:
  235. return self._curfile.seek(pos)
  236. if pos < 0:
  237. raise IOError("Invalid argument")
  238. for n, idx in enumerate(self._indexes[::-1]):
  239. if idx.offset <= pos:
  240. if idx != self._curidx:
  241. self._idxiter = iter(self._indexes[-(n + 1) :])
  242. self._nextidx()
  243. break
  244. else:
  245. raise ValueError("Cannot seek to pos")
  246. self._curfile.seek(pos - self._curidx.offset)
  247. def tell(self):
  248. if self.closed:
  249. raise ValueError("I/O operation on closed file")
  250. if self.prefetched:
  251. return self._curfile.tell()
  252. if self._curfile is None:
  253. return self.size
  254. return self._curidx.offset + self._curfile.tell()
  255. def read(self, n=-1):
  256. if self.closed:
  257. raise ValueError("I/O operation on closed file")
  258. if self.prefetched:
  259. return self._curfile.read(n)
  260. result = bytearray()
  261. # Read to the end of the file
  262. if n < 0:
  263. while self._curfile is not None:
  264. blob_result = self._curfile.read(32768)
  265. if not blob_result:
  266. self._nextidx()
  267. else:
  268. result.extend(blob_result)
  269. # Read until a certain number of bytes are read
  270. else:
  271. while n > 0 and self._curfile is not None:
  272. blob_result = self._curfile.read(min(n, 32768))
  273. if not blob_result:
  274. self._nextidx()
  275. else:
  276. n -= len(blob_result)
  277. result.extend(blob_result)
  278. return bytes(result)