__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. from __future__ import annotations
  2. import contextlib
  3. import errno
  4. import functools
  5. import operator
  6. import os
  7. import platform
  8. import shutil
  9. import stat
  10. import subprocess
  11. import sys
  12. import tempfile
  13. import urllib.request
  14. from typing import Iterator
  15. if sys.version_info < (3, 12):
  16. from backports import tarfile
  17. else:
  18. import tarfile
  19. @contextlib.contextmanager
  20. def pushd(dir: str | os.PathLike) -> Iterator[str | os.PathLike]:
  21. """
  22. >>> tmp_path = getfixture('tmp_path')
  23. >>> with pushd(tmp_path):
  24. ... assert os.getcwd() == os.fspath(tmp_path)
  25. >>> assert os.getcwd() != os.fspath(tmp_path)
  26. """
  27. orig = os.getcwd()
  28. os.chdir(dir)
  29. try:
  30. yield dir
  31. finally:
  32. os.chdir(orig)
  33. @contextlib.contextmanager
  34. def tarball(
  35. url, target_dir: str | os.PathLike | None = None
  36. ) -> Iterator[str | os.PathLike]:
  37. """
  38. Get a URL to a tarball, download, extract, yield, then clean up.
  39. Assumes everything in the tarball is prefixed with a common
  40. directory. That common path is stripped and the contents
  41. are extracted to ``target_dir``, similar to passing
  42. ``-C {target} --strip-components 1`` to the ``tar`` command.
  43. Uses the streaming protocol to extract the contents from a
  44. stream in a single pass without loading the whole file into
  45. memory.
  46. >>> import urllib.request
  47. >>> url = getfixture('tarfile_served')
  48. >>> target = getfixture('tmp_path') / 'out'
  49. >>> tb = tarball(url, target_dir=target)
  50. >>> import pathlib
  51. >>> with tb as extracted:
  52. ... contents = pathlib.Path(extracted, 'contents.txt').read_text(encoding='utf-8')
  53. >>> assert not os.path.exists(extracted)
  54. If the target is not specified, contents are extracted to a
  55. directory relative to the current working directory named after
  56. the name of the file as extracted from the URL.
  57. >>> target = getfixture('tmp_path')
  58. >>> with pushd(target), tarball(url):
  59. ... target.joinpath('served').is_dir()
  60. True
  61. """
  62. if target_dir is None:
  63. target_dir = os.path.basename(url).replace('.tar.gz', '').replace('.tgz', '')
  64. os.mkdir(target_dir)
  65. try:
  66. req = urllib.request.urlopen(url)
  67. with tarfile.open(fileobj=req, mode='r|*') as tf:
  68. tf.extractall(path=target_dir, filter=strip_first_component)
  69. yield target_dir
  70. finally:
  71. shutil.rmtree(target_dir)
  72. def strip_first_component(
  73. member: tarfile.TarInfo,
  74. path,
  75. ) -> tarfile.TarInfo:
  76. _, member.name = member.name.split('/', 1)
  77. return member
  78. def _compose(*cmgrs):
  79. """
  80. Compose any number of dependent context managers into a single one.
  81. The last, innermost context manager may take arbitrary arguments, but
  82. each successive context manager should accept the result from the
  83. previous as a single parameter.
  84. Like :func:`jaraco.functools.compose`, behavior works from right to
  85. left, so the context manager should be indicated from outermost to
  86. innermost.
  87. Example, to create a context manager to change to a temporary
  88. directory:
  89. >>> temp_dir_as_cwd = _compose(pushd, temp_dir)
  90. >>> with temp_dir_as_cwd() as dir:
  91. ... assert os.path.samefile(os.getcwd(), dir)
  92. """
  93. def compose_two(inner, outer):
  94. def composed(*args, **kwargs):
  95. with inner(*args, **kwargs) as saved, outer(saved) as res:
  96. yield res
  97. return contextlib.contextmanager(composed)
  98. return functools.reduce(compose_two, reversed(cmgrs))
  99. tarball_cwd = _compose(pushd, tarball)
  100. """
  101. A tarball context with the current working directory pointing to the contents.
  102. """
  103. def remove_readonly(func, path, exc_info):
  104. """
  105. Add support for removing read-only files on Windows.
  106. """
  107. _, exc, _ = exc_info
  108. if func in (os.rmdir, os.remove, os.unlink) and exc.errno == errno.EACCES:
  109. # change the file to be readable,writable,executable: 0777
  110. os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
  111. # retry
  112. func(path)
  113. else:
  114. raise
  115. def robust_remover():
  116. return (
  117. functools.partial(shutil.rmtree, onerror=remove_readonly)
  118. if platform.system() == 'Windows'
  119. else shutil.rmtree
  120. )
  121. @contextlib.contextmanager
  122. def temp_dir(remover=shutil.rmtree):
  123. """
  124. Create a temporary directory context. Pass a custom remover
  125. to override the removal behavior.
  126. >>> import pathlib
  127. >>> with temp_dir() as the_dir:
  128. ... assert os.path.isdir(the_dir)
  129. >>> assert not os.path.exists(the_dir)
  130. """
  131. temp_dir = tempfile.mkdtemp()
  132. try:
  133. yield temp_dir
  134. finally:
  135. remover(temp_dir)
  136. robust_temp_dir = functools.partial(temp_dir, remover=robust_remover())
  137. @contextlib.contextmanager
  138. def repo_context(
  139. url, branch: str | None = None, quiet: bool = True, dest_ctx=robust_temp_dir
  140. ):
  141. """
  142. Check out the repo indicated by url.
  143. If dest_ctx is supplied, it should be a context manager
  144. to yield the target directory for the check out.
  145. >>> repo = repo_context('https://github.com/jaraco/jaraco.context')
  146. >>> with repo as dest:
  147. ... listing = os.listdir(dest)
  148. >>> 'README.rst' in listing
  149. True
  150. """
  151. exe = 'git' if 'git' in url else 'hg'
  152. with dest_ctx() as repo_dir:
  153. cmd = [exe, 'clone', url, repo_dir]
  154. cmd.extend(['--branch', branch] * bool(branch))
  155. stream = subprocess.DEVNULL if quiet else None
  156. subprocess.check_call(cmd, stdout=stream, stderr=stream)
  157. yield repo_dir
  158. class ExceptionTrap:
  159. """
  160. A context manager that will catch certain exceptions and provide an
  161. indication they occurred.
  162. >>> with ExceptionTrap() as trap:
  163. ... raise Exception()
  164. >>> bool(trap)
  165. True
  166. >>> with ExceptionTrap() as trap:
  167. ... pass
  168. >>> bool(trap)
  169. False
  170. >>> with ExceptionTrap(ValueError) as trap:
  171. ... raise ValueError("1 + 1 is not 3")
  172. >>> bool(trap)
  173. True
  174. >>> trap.value
  175. ValueError('1 + 1 is not 3')
  176. >>> trap.tb
  177. <traceback object at ...>
  178. >>> with ExceptionTrap(ValueError) as trap:
  179. ... raise Exception()
  180. Traceback (most recent call last):
  181. ...
  182. Exception
  183. >>> bool(trap)
  184. False
  185. """
  186. exc_info = None, None, None
  187. def __init__(self, exceptions=(Exception,)):
  188. self.exceptions = exceptions
  189. def __enter__(self):
  190. return self
  191. @property
  192. def type(self):
  193. return self.exc_info[0]
  194. @property
  195. def value(self):
  196. return self.exc_info[1]
  197. @property
  198. def tb(self):
  199. return self.exc_info[2]
  200. def __exit__(self, *exc_info):
  201. type = exc_info[0]
  202. matches = type and issubclass(type, self.exceptions)
  203. if matches:
  204. self.exc_info = exc_info
  205. return matches
  206. def __bool__(self):
  207. return bool(self.type)
  208. def raises(self, func, *, _test=bool):
  209. """
  210. Wrap func and replace the result with the truth
  211. value of the trap (True if an exception occurred).
  212. First, give the decorator an alias to support Python 3.8
  213. Syntax.
  214. >>> raises = ExceptionTrap(ValueError).raises
  215. Now decorate a function that always fails.
  216. >>> @raises
  217. ... def fail():
  218. ... raise ValueError('failed')
  219. >>> fail()
  220. True
  221. """
  222. @functools.wraps(func)
  223. def wrapper(*args, **kwargs):
  224. with ExceptionTrap(self.exceptions) as trap:
  225. func(*args, **kwargs)
  226. return _test(trap)
  227. return wrapper
  228. def passes(self, func):
  229. """
  230. Wrap func and replace the result with the truth
  231. value of the trap (True if no exception).
  232. First, give the decorator an alias to support Python 3.8
  233. Syntax.
  234. >>> passes = ExceptionTrap(ValueError).passes
  235. Now decorate a function that always fails.
  236. >>> @passes
  237. ... def fail():
  238. ... raise ValueError('failed')
  239. >>> fail()
  240. False
  241. """
  242. return self.raises(func, _test=operator.not_)
  243. class suppress(contextlib.suppress, contextlib.ContextDecorator):
  244. """
  245. A version of contextlib.suppress with decorator support.
  246. >>> @suppress(KeyError)
  247. ... def key_error():
  248. ... {}['']
  249. >>> key_error()
  250. """
  251. class on_interrupt(contextlib.ContextDecorator):
  252. """
  253. Replace a KeyboardInterrupt with SystemExit(1).
  254. Useful in conjunction with console entry point functions.
  255. >>> def do_interrupt():
  256. ... raise KeyboardInterrupt()
  257. >>> on_interrupt('error')(do_interrupt)()
  258. Traceback (most recent call last):
  259. ...
  260. SystemExit: 1
  261. >>> on_interrupt('error', code=255)(do_interrupt)()
  262. Traceback (most recent call last):
  263. ...
  264. SystemExit: 255
  265. >>> on_interrupt('suppress')(do_interrupt)()
  266. >>> with __import__('pytest').raises(KeyboardInterrupt):
  267. ... on_interrupt('ignore')(do_interrupt)()
  268. """
  269. def __init__(self, action='error', /, code=1):
  270. self.action = action
  271. self.code = code
  272. def __enter__(self):
  273. return self
  274. def __exit__(self, exctype, excinst, exctb):
  275. if exctype is not KeyboardInterrupt or self.action == 'ignore':
  276. return
  277. elif self.action == 'error':
  278. raise SystemExit(self.code) from excinst
  279. return self.action == 'suppress'