123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import atexit
- import errno
- import fnmatch
- import itertools
- import operator
- import os
- import shutil
- import sys
- import uuid
- import warnings
- from functools import partial
- from functools import reduce
- from os.path import expanduser
- from os.path import expandvars
- from os.path import isabs
- from os.path import sep
- from posixpath import sep as posix_sep
- import six
- from six.moves import map
- from .compat import PY36
- from _pytest.warning_types import PytestWarning
- if PY36:
- from pathlib import Path, PurePath
- else:
- from pathlib2 import Path, PurePath
- __all__ = ["Path", "PurePath"]
- LOCK_TIMEOUT = 60 * 60 * 3
- get_lock_path = operator.methodcaller("joinpath", ".lock")
- def ensure_reset_dir(path):
- """
- ensures the given path is an empty directory
- """
- if path.exists():
- rm_rf(path)
- path.mkdir()
- def on_rm_rf_error(func, path, exc, **kwargs):
- """Handles known read-only errors during rmtree.
- The returned value is used only by our own tests.
- """
- start_path = kwargs["start_path"]
- exctype, excvalue = exc[:2]
- # another process removed the file in the middle of the "rm_rf" (xdist for example)
- # more context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018
- if isinstance(excvalue, OSError) and excvalue.errno == errno.ENOENT:
- return False
- if not isinstance(excvalue, OSError) or excvalue.errno not in (
- errno.EACCES,
- errno.EPERM,
- ):
- warnings.warn(
- PytestWarning(
- "(rm_rf) error removing {}\n{}: {}".format(path, exctype, excvalue)
- )
- )
- return False
- if func not in (os.rmdir, os.remove, os.unlink):
- warnings.warn(
- PytestWarning(
- "(rm_rf) unknown function {} when removing {}:\n{}: {}".format(
- path, func, exctype, excvalue
- )
- )
- )
- return False
- # Chmod + retry.
- import stat
- def chmod_rw(p):
- mode = os.stat(p).st_mode
- os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR)
- # For files, we need to recursively go upwards in the directories to
- # ensure they all are also writable.
- p = Path(path)
- if p.is_file():
- for parent in p.parents:
- chmod_rw(str(parent))
- # stop when we reach the original path passed to rm_rf
- if parent == start_path:
- break
- chmod_rw(str(path))
- func(path)
- return True
- def rm_rf(path):
- """Remove the path contents recursively, even if some elements
- are read-only.
- """
- onerror = partial(on_rm_rf_error, start_path=path)
- shutil.rmtree(str(path), onerror=onerror)
- def find_prefixed(root, prefix):
- """finds all elements in root that begin with the prefix, case insensitive"""
- l_prefix = prefix.lower()
- for x in root.iterdir():
- if x.name.lower().startswith(l_prefix):
- yield x
- def extract_suffixes(iter, prefix):
- """
- :param iter: iterator over path names
- :param prefix: expected prefix of the path names
- :returns: the parts of the paths following the prefix
- """
- p_len = len(prefix)
- for p in iter:
- yield p.name[p_len:]
- def find_suffixes(root, prefix):
- """combines find_prefixes and extract_suffixes
- """
- return extract_suffixes(find_prefixed(root, prefix), prefix)
- def parse_num(maybe_num):
- """parses number path suffixes, returns -1 on error"""
- try:
- return int(maybe_num)
- except ValueError:
- return -1
- if six.PY2:
- def _max(iterable, default):
- """needed due to python2.7 lacking the default argument for max"""
- return reduce(max, iterable, default)
- else:
- _max = max
- def _force_symlink(root, target, link_to):
- """helper to create the current symlink
- it's full of race conditions that are reasonably ok to ignore
- for the context of best effort linking to the latest testrun
- the presumption being thatin case of much parallelism
- the inaccuracy is going to be acceptable
- """
- current_symlink = root.joinpath(target)
- try:
- current_symlink.unlink()
- except OSError:
- pass
- try:
- current_symlink.symlink_to(link_to)
- except Exception:
- pass
- def make_numbered_dir(root, prefix):
- """create a directory with an increased number as suffix for the given prefix"""
- for i in range(10):
- # try up to 10 times to create the folder
- max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- new_number = max_existing + 1
- new_path = root.joinpath("{}{}".format(prefix, new_number))
- try:
- new_path.mkdir()
- except Exception:
- pass
- else:
- _force_symlink(root, prefix + "current", new_path)
- return new_path
- else:
- raise EnvironmentError(
- "could not create numbered dir with prefix "
- "{prefix} in {root} after 10 tries".format(prefix=prefix, root=root)
- )
- def create_cleanup_lock(p):
- """crates a lock to prevent premature folder cleanup"""
- lock_path = get_lock_path(p)
- try:
- fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
- except OSError as e:
- if e.errno == errno.EEXIST:
- six.raise_from(
- EnvironmentError("cannot create lockfile in {path}".format(path=p)), e
- )
- else:
- raise
- else:
- pid = os.getpid()
- spid = str(pid)
- if not isinstance(spid, bytes):
- spid = spid.encode("ascii")
- os.write(fd, spid)
- os.close(fd)
- if not lock_path.is_file():
- raise EnvironmentError("lock path got renamed after successful creation")
- return lock_path
- def register_cleanup_lock_removal(lock_path, register=atexit.register):
- """registers a cleanup function for removing a lock, by default on atexit"""
- pid = os.getpid()
- def cleanup_on_exit(lock_path=lock_path, original_pid=pid):
- current_pid = os.getpid()
- if current_pid != original_pid:
- # fork
- return
- try:
- lock_path.unlink()
- except (OSError, IOError):
- pass
- return register(cleanup_on_exit)
- def maybe_delete_a_numbered_dir(path):
- """removes a numbered directory if its lock can be obtained and it does not seem to be in use"""
- lock_path = None
- try:
- lock_path = create_cleanup_lock(path)
- parent = path.parent
- garbage = parent.joinpath("garbage-{}".format(uuid.uuid4()))
- path.rename(garbage)
- rm_rf(garbage)
- except (OSError, EnvironmentError):
- # known races:
- # * other process did a cleanup at the same time
- # * deletable folder was found
- # * process cwd (Windows)
- return
- finally:
- # if we created the lock, ensure we remove it even if we failed
- # to properly remove the numbered dir
- if lock_path is not None:
- try:
- lock_path.unlink()
- except (OSError, IOError):
- pass
- def ensure_deletable(path, consider_lock_dead_if_created_before):
- """checks if a lock exists and breaks it if its considered dead"""
- if path.is_symlink():
- return False
- lock = get_lock_path(path)
- if not lock.exists():
- return True
- try:
- lock_time = lock.stat().st_mtime
- except Exception:
- return False
- else:
- if lock_time < consider_lock_dead_if_created_before:
- lock.unlink()
- return True
- else:
- return False
- def try_cleanup(path, consider_lock_dead_if_created_before):
- """tries to cleanup a folder if we can ensure it's deletable"""
- if ensure_deletable(path, consider_lock_dead_if_created_before):
- maybe_delete_a_numbered_dir(path)
- def cleanup_candidates(root, prefix, keep):
- """lists candidates for numbered directories to be removed - follows py.path"""
- max_existing = _max(map(parse_num, find_suffixes(root, prefix)), default=-1)
- max_delete = max_existing - keep
- paths = find_prefixed(root, prefix)
- paths, paths2 = itertools.tee(paths)
- numbers = map(parse_num, extract_suffixes(paths2, prefix))
- for path, number in zip(paths, numbers):
- if number <= max_delete:
- yield path
- def cleanup_numbered_dir(root, prefix, keep, consider_lock_dead_if_created_before):
- """cleanup for lock driven numbered directories"""
- for path in cleanup_candidates(root, prefix, keep):
- try_cleanup(path, consider_lock_dead_if_created_before)
- for path in root.glob("garbage-*"):
- try_cleanup(path, consider_lock_dead_if_created_before)
- def make_numbered_dir_with_cleanup(root, prefix, keep, lock_timeout):
- """creates a numbered dir with a cleanup lock and removes old ones"""
- e = None
- for i in range(10):
- try:
- p = make_numbered_dir(root, prefix)
- lock_path = create_cleanup_lock(p)
- register_cleanup_lock_removal(lock_path)
- except Exception as exc:
- e = exc
- else:
- consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout
- cleanup_numbered_dir(
- root=root,
- prefix=prefix,
- keep=keep,
- consider_lock_dead_if_created_before=consider_lock_dead_if_created_before,
- )
- return p
- assert e is not None
- raise e
- def resolve_from_str(input, root):
- assert not isinstance(input, Path), "would break on py2"
- root = Path(root)
- input = expanduser(input)
- input = expandvars(input)
- if isabs(input):
- return Path(input)
- else:
- return root.joinpath(input)
- def fnmatch_ex(pattern, path):
- """FNMatcher port from py.path.common which works with PurePath() instances.
- The difference between this algorithm and PurePath.match() is that the latter matches "**" glob expressions
- for each part of the path, while this algorithm uses the whole path instead.
- For example:
- "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" with this algorithm, but not with
- PurePath.match().
- This algorithm was ported to keep backward-compatibility with existing settings which assume paths match according
- this logic.
- References:
- * https://bugs.python.org/issue29249
- * https://bugs.python.org/issue34731
- """
- path = PurePath(path)
- iswin32 = sys.platform.startswith("win")
- if iswin32 and sep not in pattern and posix_sep in pattern:
- # Running on Windows, the pattern has no Windows path separators,
- # and the pattern has one or more Posix path separators. Replace
- # the Posix path separators with the Windows path separator.
- pattern = pattern.replace(posix_sep, sep)
- if sep not in pattern:
- name = path.name
- else:
- name = six.text_type(path)
- return fnmatch.fnmatch(name, pattern)
- def parts(s):
- parts = s.split(sep)
- return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))}
|