__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. """
  2. A Path-like interface for zipfiles.
  3. This codebase is shared between zipfile.Path in the stdlib
  4. and zipp in PyPI. See
  5. https://github.com/python/importlib_metadata/wiki/Development-Methodology
  6. for more detail.
  7. """
  8. import io
  9. import posixpath
  10. import zipfile
  11. import itertools
  12. import contextlib
  13. import pathlib
  14. import re
  15. from .glob import translate
  16. __all__ = ['Path']
  17. def _parents(path):
  18. """
  19. Given a path with elements separated by
  20. posixpath.sep, generate all parents of that path.
  21. >>> list(_parents('b/d'))
  22. ['b']
  23. >>> list(_parents('/b/d/'))
  24. ['/b']
  25. >>> list(_parents('b/d/f/'))
  26. ['b/d', 'b']
  27. >>> list(_parents('b'))
  28. []
  29. >>> list(_parents(''))
  30. []
  31. """
  32. return itertools.islice(_ancestry(path), 1, None)
  33. def _ancestry(path):
  34. """
  35. Given a path with elements separated by
  36. posixpath.sep, generate all elements of that path.
  37. >>> list(_ancestry('b/d'))
  38. ['b/d', 'b']
  39. >>> list(_ancestry('/b/d/'))
  40. ['/b/d', '/b']
  41. >>> list(_ancestry('b/d/f/'))
  42. ['b/d/f', 'b/d', 'b']
  43. >>> list(_ancestry('b'))
  44. ['b']
  45. >>> list(_ancestry(''))
  46. []
  47. Multiple separators are treated like a single.
  48. >>> list(_ancestry('//b//d///f//'))
  49. ['//b//d///f', '//b//d', '//b']
  50. """
  51. path = path.rstrip(posixpath.sep)
  52. while path.rstrip(posixpath.sep):
  53. yield path
  54. path, tail = posixpath.split(path)
  55. _dedupe = dict.fromkeys
  56. """Deduplicate an iterable in original order"""
  57. def _difference(minuend, subtrahend):
  58. """
  59. Return items in minuend not in subtrahend, retaining order
  60. with O(1) lookup.
  61. """
  62. return itertools.filterfalse(set(subtrahend).__contains__, minuend)
  63. class InitializedState:
  64. """
  65. Mix-in to save the initialization state for pickling.
  66. """
  67. def __init__(self, *args, **kwargs):
  68. self.__args = args
  69. self.__kwargs = kwargs
  70. super().__init__(*args, **kwargs)
  71. def __getstate__(self):
  72. return self.__args, self.__kwargs
  73. def __setstate__(self, state):
  74. args, kwargs = state
  75. super().__init__(*args, **kwargs)
  76. class CompleteDirs(InitializedState, zipfile.ZipFile):
  77. """
  78. A ZipFile subclass that ensures that implied directories
  79. are always included in the namelist.
  80. >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
  81. ['foo/', 'foo/bar/']
  82. >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
  83. ['foo/']
  84. """
  85. @staticmethod
  86. def _implied_dirs(names):
  87. parents = itertools.chain.from_iterable(map(_parents, names))
  88. as_dirs = (p + posixpath.sep for p in parents)
  89. return _dedupe(_difference(as_dirs, names))
  90. def namelist(self):
  91. names = super().namelist()
  92. return names + list(self._implied_dirs(names))
  93. def _name_set(self):
  94. return set(self.namelist())
  95. def resolve_dir(self, name):
  96. """
  97. If the name represents a directory, return that name
  98. as a directory (with the trailing slash).
  99. """
  100. names = self._name_set()
  101. dirname = name + '/'
  102. dir_match = name not in names and dirname in names
  103. return dirname if dir_match else name
  104. def getinfo(self, name):
  105. """
  106. Supplement getinfo for implied dirs.
  107. """
  108. try:
  109. return super().getinfo(name)
  110. except KeyError:
  111. if not name.endswith('/') or name not in self._name_set():
  112. raise
  113. return zipfile.ZipInfo(filename=name)
  114. @classmethod
  115. def make(cls, source):
  116. """
  117. Given a source (filename or zipfile), return an
  118. appropriate CompleteDirs subclass.
  119. """
  120. if isinstance(source, CompleteDirs):
  121. return source
  122. if not isinstance(source, zipfile.ZipFile):
  123. return cls(source)
  124. # Only allow for FastLookup when supplied zipfile is read-only
  125. if 'r' not in source.mode:
  126. cls = CompleteDirs
  127. source.__class__ = cls
  128. return source
  129. class FastLookup(CompleteDirs):
  130. """
  131. ZipFile subclass to ensure implicit
  132. dirs exist and are resolved rapidly.
  133. """
  134. def namelist(self):
  135. with contextlib.suppress(AttributeError):
  136. return self.__names
  137. self.__names = super().namelist()
  138. return self.__names
  139. def _name_set(self):
  140. with contextlib.suppress(AttributeError):
  141. return self.__lookup
  142. self.__lookup = super()._name_set()
  143. return self.__lookup
  144. def _extract_text_encoding(encoding=None, *args, **kwargs):
  145. # stacklevel=3 so that the caller of the caller see any warning.
  146. return io.text_encoding(encoding, 3), args, kwargs
  147. class Path:
  148. """
  149. A :class:`importlib.resources.abc.Traversable` interface for zip files.
  150. Implements many of the features users enjoy from
  151. :class:`pathlib.Path`.
  152. Consider a zip file with this structure::
  153. .
  154. ├── a.txt
  155. └── b
  156. ├── c.txt
  157. └── d
  158. └── e.txt
  159. >>> data = io.BytesIO()
  160. >>> zf = ZipFile(data, 'w')
  161. >>> zf.writestr('a.txt', 'content of a')
  162. >>> zf.writestr('b/c.txt', 'content of c')
  163. >>> zf.writestr('b/d/e.txt', 'content of e')
  164. >>> zf.filename = 'mem/abcde.zip'
  165. Path accepts the zipfile object itself or a filename
  166. >>> root = Path(zf)
  167. From there, several path operations are available.
  168. Directory iteration (including the zip file itself):
  169. >>> a, b = root.iterdir()
  170. >>> a
  171. Path('mem/abcde.zip', 'a.txt')
  172. >>> b
  173. Path('mem/abcde.zip', 'b/')
  174. name property:
  175. >>> b.name
  176. 'b'
  177. join with divide operator:
  178. >>> c = b / 'c.txt'
  179. >>> c
  180. Path('mem/abcde.zip', 'b/c.txt')
  181. >>> c.name
  182. 'c.txt'
  183. Read text:
  184. >>> c.read_text(encoding='utf-8')
  185. 'content of c'
  186. existence:
  187. >>> c.exists()
  188. True
  189. >>> (b / 'missing.txt').exists()
  190. False
  191. Coercion to string:
  192. >>> import os
  193. >>> str(c).replace(os.sep, posixpath.sep)
  194. 'mem/abcde.zip/b/c.txt'
  195. At the root, ``name``, ``filename``, and ``parent``
  196. resolve to the zipfile. Note these attributes are not
  197. valid and will raise a ``ValueError`` if the zipfile
  198. has no filename.
  199. >>> root.name
  200. 'abcde.zip'
  201. >>> str(root.filename).replace(os.sep, posixpath.sep)
  202. 'mem/abcde.zip'
  203. >>> str(root.parent)
  204. 'mem'
  205. """
  206. __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
  207. def __init__(self, root, at=""):
  208. """
  209. Construct a Path from a ZipFile or filename.
  210. Note: When the source is an existing ZipFile object,
  211. its type (__class__) will be mutated to a
  212. specialized type. If the caller wishes to retain the
  213. original type, the caller should either create a
  214. separate ZipFile object or pass a filename.
  215. """
  216. self.root = FastLookup.make(root)
  217. self.at = at
  218. def __eq__(self, other):
  219. """
  220. >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
  221. False
  222. """
  223. if self.__class__ is not other.__class__:
  224. return NotImplemented
  225. return (self.root, self.at) == (other.root, other.at)
  226. def __hash__(self):
  227. return hash((self.root, self.at))
  228. def open(self, mode='r', *args, pwd=None, **kwargs):
  229. """
  230. Open this entry as text or binary following the semantics
  231. of ``pathlib.Path.open()`` by passing arguments through
  232. to io.TextIOWrapper().
  233. """
  234. if self.is_dir():
  235. raise IsADirectoryError(self)
  236. zip_mode = mode[0]
  237. if zip_mode == 'r' and not self.exists():
  238. raise FileNotFoundError(self)
  239. stream = self.root.open(self.at, zip_mode, pwd=pwd)
  240. if 'b' in mode:
  241. if args or kwargs:
  242. raise ValueError("encoding args invalid for binary operation")
  243. return stream
  244. # Text mode:
  245. encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
  246. return io.TextIOWrapper(stream, encoding, *args, **kwargs)
  247. def _base(self):
  248. return pathlib.PurePosixPath(self.at or self.root.filename)
  249. @property
  250. def name(self):
  251. return self._base().name
  252. @property
  253. def suffix(self):
  254. return self._base().suffix
  255. @property
  256. def suffixes(self):
  257. return self._base().suffixes
  258. @property
  259. def stem(self):
  260. return self._base().stem
  261. @property
  262. def filename(self):
  263. return pathlib.Path(self.root.filename).joinpath(self.at)
  264. def read_text(self, *args, **kwargs):
  265. encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
  266. with self.open('r', encoding, *args, **kwargs) as strm:
  267. return strm.read()
  268. def read_bytes(self):
  269. with self.open('rb') as strm:
  270. return strm.read()
  271. def _is_child(self, path):
  272. return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
  273. def _next(self, at):
  274. return self.__class__(self.root, at)
  275. def is_dir(self):
  276. return not self.at or self.at.endswith("/")
  277. def is_file(self):
  278. return self.exists() and not self.is_dir()
  279. def exists(self):
  280. return self.at in self.root._name_set()
  281. def iterdir(self):
  282. if not self.is_dir():
  283. raise ValueError("Can't listdir a file")
  284. subs = map(self._next, self.root.namelist())
  285. return filter(self._is_child, subs)
  286. def match(self, path_pattern):
  287. return pathlib.PurePosixPath(self.at).match(path_pattern)
  288. def is_symlink(self):
  289. """
  290. Return whether this path is a symlink. Always false (python/cpython#82102).
  291. """
  292. return False
  293. def glob(self, pattern):
  294. if not pattern:
  295. raise ValueError(f"Unacceptable pattern: {pattern!r}")
  296. prefix = re.escape(self.at)
  297. matches = re.compile(prefix + translate(pattern)).fullmatch
  298. return map(self._next, filter(matches, self.root.namelist()))
  299. def rglob(self, pattern):
  300. return self.glob(f'**/{pattern}')
  301. def relative_to(self, other, *extra):
  302. return posixpath.relpath(str(self), str(other.joinpath(*extra)))
  303. def __str__(self):
  304. return posixpath.join(self.root.filename, self.at)
  305. def __repr__(self):
  306. return self.__repr.format(self=self)
  307. def joinpath(self, *other):
  308. next = posixpath.join(self.at, *other)
  309. return self._next(self.root.resolve_dir(next))
  310. __truediv__ = joinpath
  311. @property
  312. def parent(self):
  313. if not self.at:
  314. return self.filename.parent
  315. parent_at = posixpath.dirname(self.at.rstrip('/'))
  316. if parent_at:
  317. parent_at += '/'
  318. return self._next(parent_at)