123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- """Support for providing temporary directories to test functions."""
- import dataclasses
- import os
- import re
- import tempfile
- from pathlib import Path
- from shutil import rmtree
- from typing import Any
- from typing import Dict
- from typing import Generator
- from typing import Optional
- from typing import TYPE_CHECKING
- from typing import Union
- from _pytest.nodes import Item
- from _pytest.reports import CollectReport
- from _pytest.stash import StashKey
- if TYPE_CHECKING:
- from typing_extensions import Literal
- RetentionType = Literal["all", "failed", "none"]
- from _pytest.config.argparsing import Parser
- from .pathlib import LOCK_TIMEOUT
- from .pathlib import make_numbered_dir
- from .pathlib import make_numbered_dir_with_cleanup
- from .pathlib import rm_rf
- from .pathlib import cleanup_dead_symlinks
- from _pytest.compat import final, get_user_id
- from _pytest.config import Config
- from _pytest.config import ExitCode
- from _pytest.config import hookimpl
- from _pytest.deprecated import check_ispytest
- from _pytest.fixtures import fixture
- from _pytest.fixtures import FixtureRequest
- from _pytest.monkeypatch import MonkeyPatch
- tmppath_result_key = StashKey[Dict[str, bool]]()
- @final
- @dataclasses.dataclass
- class TempPathFactory:
- """Factory for temporary directories under the common base temp directory.
- The base directory can be configured using the ``--basetemp`` option.
- """
- _given_basetemp: Optional[Path]
- # pluggy TagTracerSub, not currently exposed, so Any.
- _trace: Any
- _basetemp: Optional[Path]
- _retention_count: int
- _retention_policy: "RetentionType"
- def __init__(
- self,
- given_basetemp: Optional[Path],
- retention_count: int,
- retention_policy: "RetentionType",
- trace,
- basetemp: Optional[Path] = None,
- *,
- _ispytest: bool = False,
- ) -> None:
- check_ispytest(_ispytest)
- if given_basetemp is None:
- self._given_basetemp = None
- else:
- # Use os.path.abspath() to get absolute path instead of resolve() as it
- # does not work the same in all platforms (see #4427).
- # Path.absolute() exists, but it is not public (see https://bugs.python.org/issue25012).
- self._given_basetemp = Path(os.path.abspath(str(given_basetemp)))
- self._trace = trace
- self._retention_count = retention_count
- self._retention_policy = retention_policy
- self._basetemp = basetemp
- @classmethod
- def from_config(
- cls,
- config: Config,
- *,
- _ispytest: bool = False,
- ) -> "TempPathFactory":
- """Create a factory according to pytest configuration.
- :meta private:
- """
- check_ispytest(_ispytest)
- count = int(config.getini("tmp_path_retention_count"))
- if count < 0:
- raise ValueError(
- f"tmp_path_retention_count must be >= 0. Current input: {count}."
- )
- policy = config.getini("tmp_path_retention_policy")
- if policy not in ("all", "failed", "none"):
- raise ValueError(
- f"tmp_path_retention_policy must be either all, failed, none. Current input: {policy}."
- )
- return cls(
- given_basetemp=config.option.basetemp,
- trace=config.trace.get("tmpdir"),
- retention_count=count,
- retention_policy=policy,
- _ispytest=True,
- )
- def _ensure_relative_to_basetemp(self, basename: str) -> str:
- basename = os.path.normpath(basename)
- if (self.getbasetemp() / basename).resolve().parent != self.getbasetemp():
- raise ValueError(f"{basename} is not a normalized and relative path")
- return basename
- def mktemp(self, basename: str, numbered: bool = True) -> Path:
- """Create a new temporary directory managed by the factory.
- :param basename:
- Directory base name, must be a relative path.
- :param numbered:
- If ``True``, ensure the directory is unique by adding a numbered
- suffix greater than any existing one: ``basename="foo-"`` and ``numbered=True``
- means that this function will create directories named ``"foo-0"``,
- ``"foo-1"``, ``"foo-2"`` and so on.
- :returns:
- The path to the new directory.
- """
- basename = self._ensure_relative_to_basetemp(basename)
- if not numbered:
- p = self.getbasetemp().joinpath(basename)
- p.mkdir(mode=0o700)
- else:
- p = make_numbered_dir(root=self.getbasetemp(), prefix=basename, mode=0o700)
- self._trace("mktemp", p)
- return p
- def getbasetemp(self) -> Path:
- """Return the base temporary directory, creating it if needed.
- :returns:
- The base temporary directory.
- """
- if self._basetemp is not None:
- return self._basetemp
- if self._given_basetemp is not None:
- basetemp = self._given_basetemp
- if basetemp.exists():
- rm_rf(basetemp)
- basetemp.mkdir(mode=0o700)
- basetemp = basetemp.resolve()
- else:
- from_env = os.environ.get("PYTEST_DEBUG_TEMPROOT")
- temproot = Path(from_env or tempfile.gettempdir()).resolve()
- user = get_user() or "unknown"
- # use a sub-directory in the temproot to speed-up
- # make_numbered_dir() call
- rootdir = temproot.joinpath(f"pytest-of-{user}")
- try:
- rootdir.mkdir(mode=0o700, exist_ok=True)
- except OSError:
- # getuser() likely returned illegal characters for the platform, use unknown back off mechanism
- rootdir = temproot.joinpath("pytest-of-unknown")
- rootdir.mkdir(mode=0o700, exist_ok=True)
- # Because we use exist_ok=True with a predictable name, make sure
- # we are the owners, to prevent any funny business (on unix, where
- # temproot is usually shared).
- # Also, to keep things private, fixup any world-readable temp
- # rootdir's permissions. Historically 0o755 was used, so we can't
- # just error out on this, at least for a while.
- uid = get_user_id()
- if uid is not None:
- rootdir_stat = rootdir.stat()
- if rootdir_stat.st_uid != uid:
- raise OSError(
- f"The temporary directory {rootdir} is not owned by the current user. "
- "Fix this and try again."
- )
- if (rootdir_stat.st_mode & 0o077) != 0:
- os.chmod(rootdir, rootdir_stat.st_mode & ~0o077)
- keep = self._retention_count
- if self._retention_policy == "none":
- keep = 0
- basetemp = make_numbered_dir_with_cleanup(
- prefix="pytest-",
- root=rootdir,
- keep=keep,
- lock_timeout=LOCK_TIMEOUT,
- mode=0o700,
- )
- assert basetemp is not None, basetemp
- self._basetemp = basetemp
- self._trace("new basetemp", basetemp)
- return basetemp
- def get_user() -> Optional[str]:
- """Return the current user name, or None if getuser() does not work
- in the current environment (see #1010)."""
- try:
- # In some exotic environments, getpass may not be importable.
- import getpass
- return getpass.getuser()
- except (ImportError, KeyError):
- return None
- def pytest_configure(config: Config) -> None:
- """Create a TempPathFactory and attach it to the config object.
- This is to comply with existing plugins which expect the handler to be
- available at pytest_configure time, but ideally should be moved entirely
- to the tmp_path_factory session fixture.
- """
- mp = MonkeyPatch()
- config.add_cleanup(mp.undo)
- _tmp_path_factory = TempPathFactory.from_config(config, _ispytest=True)
- mp.setattr(config, "_tmp_path_factory", _tmp_path_factory, raising=False)
- def pytest_addoption(parser: Parser) -> None:
- parser.addini(
- "tmp_path_retention_count",
- help="How many sessions should we keep the `tmp_path` directories, according to `tmp_path_retention_policy`.",
- default=3,
- )
- parser.addini(
- "tmp_path_retention_policy",
- help="Controls which directories created by the `tmp_path` fixture are kept around, based on test outcome. "
- "(all/failed/none)",
- default="all",
- )
- @fixture(scope="session")
- def tmp_path_factory(request: FixtureRequest) -> TempPathFactory:
- """Return a :class:`pytest.TempPathFactory` instance for the test session."""
- # Set dynamically by pytest_configure() above.
- return request.config._tmp_path_factory # type: ignore
- def _mk_tmp(request: FixtureRequest, factory: TempPathFactory) -> Path:
- name = request.node.name
- name = re.sub(r"[\W]", "_", name)
- MAXVAL = 30
- name = name[:MAXVAL]
- return factory.mktemp(name, numbered=True)
- @fixture
- def tmp_path(
- request: FixtureRequest, tmp_path_factory: TempPathFactory
- ) -> Generator[Path, None, None]:
- """Return a temporary directory path object which is unique to each test
- function invocation, created as a sub directory of the base temporary
- directory.
- By default, a new base temporary directory is created each test session,
- and old bases are removed after 3 sessions, to aid in debugging.
- This behavior can be configured with :confval:`tmp_path_retention_count` and
- :confval:`tmp_path_retention_policy`.
- If ``--basetemp`` is used then it is cleared each session. See :ref:`base
- temporary directory`.
- The returned object is a :class:`pathlib.Path` object.
- """
- path = _mk_tmp(request, tmp_path_factory)
- yield path
- # Remove the tmpdir if the policy is "failed" and the test passed.
- tmp_path_factory: TempPathFactory = request.session.config._tmp_path_factory # type: ignore
- policy = tmp_path_factory._retention_policy
- result_dict = request.node.stash[tmppath_result_key]
- if policy == "failed" and result_dict.get("call", True):
- # We do a "best effort" to remove files, but it might not be possible due to some leaked resource,
- # permissions, etc, in which case we ignore it.
- rmtree(path, ignore_errors=True)
- del request.node.stash[tmppath_result_key]
- def pytest_sessionfinish(session, exitstatus: Union[int, ExitCode]):
- """After each session, remove base directory if all the tests passed,
- the policy is "failed", and the basetemp is not specified by a user.
- """
- tmp_path_factory: TempPathFactory = session.config._tmp_path_factory
- basetemp = tmp_path_factory._basetemp
- if basetemp is None:
- return
- policy = tmp_path_factory._retention_policy
- if (
- exitstatus == 0
- and policy == "failed"
- and tmp_path_factory._given_basetemp is None
- ):
- if basetemp.is_dir():
- # We do a "best effort" to remove files, but it might not be possible due to some leaked resource,
- # permissions, etc, in which case we ignore it.
- rmtree(basetemp, ignore_errors=True)
- # Remove dead symlinks.
- if basetemp.is_dir():
- cleanup_dead_symlinks(basetemp)
- @hookimpl(tryfirst=True, hookwrapper=True)
- def pytest_runtest_makereport(item: Item, call):
- outcome = yield
- result: CollectReport = outcome.get_result()
- empty: Dict[str, bool] = {}
- item.stash.setdefault(tmppath_result_key, empty)[result.when] = result.passed
|