models.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. from hashlib import sha1
  2. import tempfile
  3. import mmap
  4. from concurrent.futures import ThreadPoolExecutor
  5. from django.core.files.base import File as FileObj
  6. from django.db import models, transaction
  7. from glitchtip.base_models import CreatedModel
  8. from .exceptions import AssembleChecksumMismatch
  9. def _get_size_and_checksum(fileobj):
  10. size = 0
  11. checksum = sha1()
  12. while True:
  13. chunk = fileobj.read(65536)
  14. if not chunk:
  15. break
  16. size += len(chunk)
  17. checksum.update(chunk)
  18. return size, checksum.hexdigest()
  19. class FileBlob(CreatedModel):
  20. """
  21. Port of sentry.models.file.FileBlog with some simplifications
  22. OSS Sentry stores files in file blob chunks. Where one file gets saved as many blobs.
  23. GlitchTip uses Django FileField and does chunk files. At this time, GlitchTip attempts to
  24. support both situations for compatibility. A file upload is saved as just one blob, as though
  25. the blob chunk size is infinite. In the future it may either support chunked blobs fully or
  26. eliminate naive chunking support entirely.
  27. """
  28. blob = models.FileField(upload_to="uploads/file_blobs")
  29. size = models.PositiveIntegerField(null=True)
  30. checksum = models.CharField(max_length=40, unique=True)
  31. @classmethod
  32. def from_files(cls, files, organization=None, logger=None):
  33. logger.debug("FileBlob.from_files.start")
  34. files_with_checksums = []
  35. for fileobj in files:
  36. if isinstance(fileobj, tuple):
  37. files_with_checksums.append(fileobj)
  38. else:
  39. files_with_checksums.append((fileobj, None))
  40. for file_with_checksum in files_with_checksums:
  41. blob = cls()
  42. blob_file = file_with_checksum[0]
  43. blob.size = file_with_checksum[0].size
  44. blob.checksum = file_with_checksum[1]
  45. blob.blob.save(blob_file.name, blob_file)
  46. blob.save()
  47. @classmethod
  48. def from_file(cls, fileobj):
  49. """
  50. Retrieve a single FileBlob instances for the given file.
  51. """
  52. checksum = sha1()
  53. with fileobj.open("rb") as f:
  54. if f.multiple_chunks():
  55. for chunk in f.chunks():
  56. checksum.update(chunk)
  57. else:
  58. checksum.update(f.read())
  59. # Significant deviation from OSS Sentry
  60. file_blob, _ = cls.objects.get_or_create(
  61. checksum=checksum.hexdigest(),
  62. defaults={"blob": fileobj, "size": fileobj.size},
  63. )
  64. return file_blob
  65. class File(CreatedModel):
  66. """
  67. Port of sentry.models.file.File
  68. """
  69. name = models.TextField()
  70. type = models.CharField(max_length=64)
  71. headers = models.JSONField(blank=True, null=True)
  72. blobs = models.ManyToManyField(FileBlob)
  73. size = models.PositiveIntegerField(null=True)
  74. checksum = models.CharField(max_length=40, null=True, db_index=True)
  75. def put_django_file(self, fileobj):
  76. """ Save a Django File object as a File Blob """
  77. self.size = fileobj.size
  78. file_blob = FileBlob.from_file(fileobj)
  79. self.checksum = file_blob.checksum
  80. self.save()
  81. def putfile(self, fileobj):
  82. """ Save a file-like object as a File Blob """
  83. size, checksum = _get_size_and_checksum(fileobj)
  84. fileobj.seek(0)
  85. file_blob, _ = FileBlob.objects.get_or_create(
  86. defaults={"blob": FileObj(fileobj, name=checksum)},
  87. size=size,
  88. checksum=checksum,
  89. )
  90. self.checksum = checksum
  91. self.save()
  92. self.blobs.add(file_blob)
  93. def _get_chunked_blob(
  94. self, mode=None, prefetch=False, prefetch_to=None, delete=True
  95. ):
  96. return ChunkedFileBlobIndexWrapper(
  97. FileBlobIndex.objects.filter(file=self)
  98. .select_related("blob")
  99. .order_by("offset"),
  100. mode=mode,
  101. prefetch=prefetch,
  102. prefetch_to=prefetch_to,
  103. delete=delete,
  104. )
  105. def getfile(self):
  106. impl = self._get_chunked_blob()
  107. return FileObj(impl, self.name)
  108. def assemble_from_file_blob_ids(self, file_blob_ids, checksum, commit=True):
  109. """
  110. This creates a file, from file blobs and returns a temp file with the
  111. contents.
  112. """
  113. tf = tempfile.NamedTemporaryFile()
  114. with transaction.atomic():
  115. file_blobs = FileBlob.objects.filter(id__in=file_blob_ids).all()
  116. # Ensure blobs are in the order and duplication as provided
  117. blobs_by_id = {blob.id: blob for blob in file_blobs}
  118. file_blobs = [blobs_by_id[blob_id] for blob_id in file_blob_ids]
  119. new_checksum = sha1(b"")
  120. offset = 0
  121. for blob in file_blobs:
  122. FileBlobIndex.objects.create(file=self, blob=blob, offset=offset)
  123. for chunk in blob.blob.chunks():
  124. new_checksum.update(chunk)
  125. tf.write(chunk)
  126. offset += blob.size
  127. self.size = offset
  128. self.checksum = new_checksum.hexdigest()
  129. if checksum != self.checksum:
  130. raise AssembleChecksumMismatch("Checksum mismatch")
  131. if commit:
  132. self.save()
  133. tf.flush()
  134. tf.seek(0)
  135. return tf
  136. class FileBlobIndex(models.Model):
  137. file = models.ForeignKey(File, on_delete=models.CASCADE)
  138. blob = models.ForeignKey(FileBlob, on_delete=models.CASCADE)
  139. offset = models.PositiveIntegerField()
  140. class Meta:
  141. unique_together = (("file", "blob", "offset"),)
  142. class ChunkedFileBlobIndexWrapper(object):
  143. def __init__(
  144. self, indexes, mode=None, prefetch=False, prefetch_to=None, delete=True
  145. ):
  146. # eager load from database incase its a queryset
  147. self._indexes = list(indexes)
  148. self._curfile = None
  149. self._curidx = None
  150. if prefetch:
  151. self.prefetched = True
  152. self._prefetch(prefetch_to, delete)
  153. else:
  154. self.prefetched = False
  155. self.mode = mode
  156. self.open()
  157. def __enter__(self):
  158. return self
  159. def __exit__(self, exc_type, exc_value, tb):
  160. self.close()
  161. def detach_tempfile(self):
  162. if not self.prefetched:
  163. raise TypeError("Can only detech tempfiles in prefetch mode")
  164. rv = self._curfile
  165. self._curfile = None
  166. self.close()
  167. rv.seek(0)
  168. return rv
  169. def _nextidx(self):
  170. assert not self.prefetched, "this makes no sense"
  171. old_file = self._curfile
  172. try:
  173. try:
  174. self._curidx = next(self._idxiter)
  175. self._curfile = self._curidx.blob.getfile()
  176. except StopIteration:
  177. self._curidx = None
  178. self._curfile = None
  179. finally:
  180. if old_file is not None:
  181. old_file.close()
  182. @property
  183. def size(self):
  184. return sum(i.blob.size for i in self._indexes)
  185. def open(self):
  186. self.closed = False
  187. self.seek(0)
  188. def _prefetch(self, prefetch_to=None, delete=True):
  189. size = self.size
  190. f = tempfile.NamedTemporaryFile(
  191. prefix="._prefetch-", dir=prefetch_to, delete=delete
  192. )
  193. if size == 0:
  194. self._curfile = f
  195. return
  196. # Zero out the file
  197. f.seek(size - 1)
  198. f.write("\x00")
  199. f.flush()
  200. mem = mmap.mmap(f.fileno(), size)
  201. def fetch_file(offset, getfile):
  202. with getfile() as sf:
  203. while True:
  204. chunk = sf.read(65535)
  205. if not chunk:
  206. break
  207. mem[offset : offset + len(chunk)] = chunk
  208. offset += len(chunk)
  209. with ThreadPoolExecutor(max_workers=4) as exe:
  210. for idx in self._indexes:
  211. exe.submit(fetch_file, idx.offset, idx.blob.getfile)
  212. mem.flush()
  213. self._curfile = f
  214. def close(self):
  215. if self._curfile:
  216. self._curfile.close()
  217. self._curfile = None
  218. self._curidx = None
  219. self.closed = True
  220. def seek(self, pos):
  221. if self.closed:
  222. raise ValueError("I/O operation on closed file")
  223. if self.prefetched:
  224. return self._curfile.seek(pos)
  225. if pos < 0:
  226. raise IOError("Invalid argument")
  227. for n, idx in enumerate(self._indexes[::-1]):
  228. if idx.offset <= pos:
  229. if idx != self._curidx:
  230. self._idxiter = iter(self._indexes[-(n + 1) :])
  231. self._nextidx()
  232. break
  233. else:
  234. raise ValueError("Cannot seek to pos")
  235. self._curfile.seek(pos - self._curidx.offset)
  236. def tell(self):
  237. if self.closed:
  238. raise ValueError("I/O operation on closed file")
  239. if self.prefetched:
  240. return self._curfile.tell()
  241. if self._curfile is None:
  242. return self.size
  243. return self._curidx.offset + self._curfile.tell()
  244. def read(self, n=-1):
  245. if self.closed:
  246. raise ValueError("I/O operation on closed file")
  247. if self.prefetched:
  248. return self._curfile.read(n)
  249. result = bytearray()
  250. # Read to the end of the file
  251. if n < 0:
  252. while self._curfile is not None:
  253. blob_result = self._curfile.read(32768)
  254. if not blob_result:
  255. self._nextidx()
  256. else:
  257. result.extend(blob_result)
  258. # Read until a certain number of bytes are read
  259. else:
  260. while n > 0 and self._curfile is not None:
  261. blob_result = self._curfile.read(min(n, 32768))
  262. if not blob_result:
  263. self._nextidx()
  264. else:
  265. n -= len(blob_result)
  266. result.extend(blob_result)
  267. return bytes(result)