123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780 |
- # -*- coding: utf-8 -*-
- """ core implementation of testing process: init, session, runtest loop. """
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- import contextlib
- import fnmatch
- import functools
- import os
- import pkgutil
- import sys
- import warnings
- import attr
- import py
- import six
- import _pytest._code
- from _pytest import nodes
- from _pytest.config import directory_arg
- from _pytest.config import hookimpl
- from _pytest.config import UsageError
- from _pytest.deprecated import PYTEST_CONFIG_GLOBAL
- from _pytest.outcomes import exit
- from _pytest.runner import collect_one_node
- # exitcodes for the command line
- EXIT_OK = 0
- EXIT_TESTSFAILED = 1
- EXIT_INTERRUPTED = 2
- EXIT_INTERNALERROR = 3
- EXIT_USAGEERROR = 4
- EXIT_NOTESTSCOLLECTED = 5
- def pytest_addoption(parser):
- parser.addini(
- "norecursedirs",
- "directory patterns to avoid for recursion",
- type="args",
- default=[".*", "build", "dist", "CVS", "_darcs", "{arch}", "*.egg", "venv"],
- )
- parser.addini(
- "testpaths",
- "directories to search for tests when no files or directories are given in the "
- "command line.",
- type="args",
- default=[],
- )
- group = parser.getgroup("general", "running and selection options")
- group._addoption(
- "-x",
- "--exitfirst",
- action="store_const",
- dest="maxfail",
- const=1,
- help="exit instantly on first error or failed test.",
- ),
- group._addoption(
- "--maxfail",
- metavar="num",
- action="store",
- type=int,
- dest="maxfail",
- default=0,
- help="exit after first num failures or errors.",
- )
- group._addoption(
- "--strict-markers",
- "--strict",
- action="store_true",
- help="markers not registered in the `markers` section of the configuration file raise errors.",
- )
- group._addoption(
- "-c",
- metavar="file",
- type=str,
- dest="inifilename",
- help="load configuration from `file` instead of trying to locate one of the implicit "
- "configuration files.",
- )
- group._addoption(
- "--continue-on-collection-errors",
- action="store_true",
- default=False,
- dest="continue_on_collection_errors",
- help="Force test execution even if collection errors occur.",
- )
- group._addoption(
- "--rootdir",
- action="store",
- dest="rootdir",
- help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', "
- "'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: "
- "'$HOME/root_dir'.",
- )
- group = parser.getgroup("collect", "collection")
- group.addoption(
- "--collectonly",
- "--collect-only",
- action="store_true",
- help="only collect tests, don't execute them.",
- ),
- group.addoption(
- "--pyargs",
- action="store_true",
- help="try to interpret all arguments as python packages.",
- )
- group.addoption(
- "--ignore",
- action="append",
- metavar="path",
- help="ignore path during collection (multi-allowed).",
- )
- group.addoption(
- "--ignore-glob",
- action="append",
- metavar="path",
- help="ignore path pattern during collection (multi-allowed).",
- )
- group.addoption(
- "--deselect",
- action="append",
- metavar="nodeid_prefix",
- help="deselect item during collection (multi-allowed).",
- )
- # when changing this to --conf-cut-dir, config.py Conftest.setinitial
- # needs upgrading as well
- group.addoption(
- "--confcutdir",
- dest="confcutdir",
- default=None,
- metavar="dir",
- type=functools.partial(directory_arg, optname="--confcutdir"),
- help="only load conftest.py's relative to specified dir.",
- )
- group.addoption(
- "--noconftest",
- action="store_true",
- dest="noconftest",
- default=False,
- help="Don't load any conftest.py files.",
- )
- group.addoption(
- "--keepduplicates",
- "--keep-duplicates",
- action="store_true",
- dest="keepduplicates",
- default=False,
- help="Keep duplicate tests.",
- )
- group.addoption(
- "--collect-in-virtualenv",
- action="store_true",
- dest="collect_in_virtualenv",
- default=False,
- help="Don't ignore tests in a local virtualenv directory",
- )
- group = parser.getgroup("debugconfig", "test session debugging and configuration")
- group.addoption(
- "--basetemp",
- dest="basetemp",
- default=None,
- metavar="dir",
- help=(
- "base temporary directory for this test run."
- "(warning: this directory is removed if it exists)"
- ),
- )
- class _ConfigDeprecated(object):
- def __init__(self, config):
- self.__dict__["_config"] = config
- def __getattr__(self, attr):
- warnings.warn(PYTEST_CONFIG_GLOBAL, stacklevel=2)
- return getattr(self._config, attr)
- def __setattr__(self, attr, val):
- warnings.warn(PYTEST_CONFIG_GLOBAL, stacklevel=2)
- return setattr(self._config, attr, val)
- def __repr__(self):
- return "{}({!r})".format(type(self).__name__, self._config)
- def pytest_configure(config):
- __import__("pytest").config = _ConfigDeprecated(config) # compatibility
- def wrap_session(config, doit):
- """Skeleton command line program"""
- session = Session(config)
- session.exitstatus = EXIT_OK
- initstate = 0
- try:
- try:
- config._do_configure()
- initstate = 1
- config.hook.pytest_sessionstart(session=session)
- initstate = 2
- session.exitstatus = doit(config, session) or 0
- except UsageError:
- session.exitstatus = EXIT_USAGEERROR
- raise
- except Failed:
- session.exitstatus = EXIT_TESTSFAILED
- except (KeyboardInterrupt, exit.Exception):
- excinfo = _pytest._code.ExceptionInfo.from_current()
- exitstatus = EXIT_INTERRUPTED
- if isinstance(excinfo.value, exit.Exception):
- if excinfo.value.returncode is not None:
- exitstatus = excinfo.value.returncode
- if initstate < 2:
- sys.stderr.write(
- "{}: {}\n".format(excinfo.typename, excinfo.value.msg)
- )
- config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
- session.exitstatus = exitstatus
- except: # noqa
- excinfo = _pytest._code.ExceptionInfo.from_current()
- config.notify_exception(excinfo, config.option)
- session.exitstatus = EXIT_INTERNALERROR
- if excinfo.errisinstance(SystemExit):
- sys.stderr.write("mainloop: caught unexpected SystemExit!\n")
- finally:
- excinfo = None # Explicitly break reference cycle.
- session.startdir.chdir()
- if initstate >= 2:
- config.hook.pytest_sessionfinish(
- session=session, exitstatus=session.exitstatus
- )
- config._ensure_unconfigure()
- return session.exitstatus
- def pytest_cmdline_main(config):
- return wrap_session(config, _main)
- def _main(config, session):
- """ default command line protocol for initialization, session,
- running tests and reporting. """
- config.hook.pytest_collection(session=session)
- config.hook.pytest_runtestloop(session=session)
- if session.testsfailed:
- return EXIT_TESTSFAILED
- elif session.testscollected == 0:
- return EXIT_NOTESTSCOLLECTED
- def pytest_collection(session):
- return session.perform_collect()
- def pytest_runtestloop(session):
- if session.testsfailed and not session.config.option.continue_on_collection_errors:
- raise session.Interrupted("%d errors during collection" % session.testsfailed)
- if session.config.option.collectonly:
- return True
- for i, item in enumerate(session.items):
- nextitem = session.items[i + 1] if i + 1 < len(session.items) else None
- item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
- if session.shouldfail:
- raise session.Failed(session.shouldfail)
- if session.shouldstop:
- raise session.Interrupted(session.shouldstop)
- return True
- def _in_venv(path):
- """Attempts to detect if ``path`` is the root of a Virtual Environment by
- checking for the existence of the appropriate activate script"""
- bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin")
- if not bindir.isdir():
- return False
- activates = (
- "activate",
- "activate.csh",
- "activate.fish",
- "Activate",
- "Activate.bat",
- "Activate.ps1",
- )
- return any([fname.basename in activates for fname in bindir.listdir()])
- def pytest_ignore_collect(path, config):
- ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath())
- ignore_paths = ignore_paths or []
- excludeopt = config.getoption("ignore")
- if excludeopt:
- ignore_paths.extend([py.path.local(x) for x in excludeopt])
- if py.path.local(path) in ignore_paths:
- return True
- ignore_globs = config._getconftest_pathlist(
- "collect_ignore_glob", path=path.dirpath()
- )
- ignore_globs = ignore_globs or []
- excludeglobopt = config.getoption("ignore_glob")
- if excludeglobopt:
- ignore_globs.extend([py.path.local(x) for x in excludeglobopt])
- if any(
- fnmatch.fnmatch(six.text_type(path), six.text_type(glob))
- for glob in ignore_globs
- ):
- return True
- allow_in_venv = config.getoption("collect_in_virtualenv")
- if not allow_in_venv and _in_venv(path):
- return True
- return False
- def pytest_collection_modifyitems(items, config):
- deselect_prefixes = tuple(config.getoption("deselect") or [])
- if not deselect_prefixes:
- return
- remaining = []
- deselected = []
- for colitem in items:
- if colitem.nodeid.startswith(deselect_prefixes):
- deselected.append(colitem)
- else:
- remaining.append(colitem)
- if deselected:
- config.hook.pytest_deselected(items=deselected)
- items[:] = remaining
- @contextlib.contextmanager
- def _patched_find_module():
- """Patch bug in pkgutil.ImpImporter.find_module
- When using pkgutil.find_loader on python<3.4 it removes symlinks
- from the path due to a call to os.path.realpath. This is not consistent
- with actually doing the import (in these versions, pkgutil and __import__
- did not share the same underlying code). This can break conftest
- discovery for pytest where symlinks are involved.
- The only supported python<3.4 by pytest is python 2.7.
- """
- if six.PY2: # python 3.4+ uses importlib instead
- def find_module_patched(self, fullname, path=None):
- # Note: we ignore 'path' argument since it is only used via meta_path
- subname = fullname.split(".")[-1]
- if subname != fullname and self.path is None:
- return None
- if self.path is None:
- path = None
- else:
- # original: path = [os.path.realpath(self.path)]
- path = [self.path]
- try:
- file, filename, etc = pkgutil.imp.find_module(subname, path)
- except ImportError:
- return None
- return pkgutil.ImpLoader(fullname, file, filename, etc)
- old_find_module = pkgutil.ImpImporter.find_module
- pkgutil.ImpImporter.find_module = find_module_patched
- try:
- yield
- finally:
- pkgutil.ImpImporter.find_module = old_find_module
- else:
- yield
- class FSHookProxy(object):
- def __init__(self, fspath, pm, remove_mods):
- self.fspath = fspath
- self.pm = pm
- self.remove_mods = remove_mods
- def __getattr__(self, name):
- x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
- self.__dict__[name] = x
- return x
- class NoMatch(Exception):
- """ raised if matching cannot locate a matching names. """
- class Interrupted(KeyboardInterrupt):
- """ signals an interrupted test run. """
- __module__ = "builtins" # for py3
- class Failed(Exception):
- """ signals a stop as failed test run. """
- @attr.s
- class _bestrelpath_cache(dict):
- path = attr.ib()
- def __missing__(self, path):
- r = self.path.bestrelpath(path)
- self[path] = r
- return r
- class Session(nodes.FSCollector):
- Interrupted = Interrupted
- Failed = Failed
- def __init__(self, config):
- nodes.FSCollector.__init__(
- self, config.rootdir, parent=None, config=config, session=self, nodeid=""
- )
- self.testsfailed = 0
- self.testscollected = 0
- self.shouldstop = False
- self.shouldfail = False
- self.trace = config.trace.root.get("collection")
- self._norecursepatterns = config.getini("norecursedirs")
- self.startdir = config.invocation_dir
- self._initialpaths = frozenset()
- # Keep track of any collected nodes in here, so we don't duplicate fixtures
- self._node_cache = {}
- self._bestrelpathcache = _bestrelpath_cache(config.rootdir)
- # Dirnames of pkgs with dunder-init files.
- self._pkg_roots = {}
- self.config.pluginmanager.register(self, name="session")
- def __repr__(self):
- return "<%s %s exitstatus=%r testsfailed=%d testscollected=%d>" % (
- self.__class__.__name__,
- self.name,
- getattr(self, "exitstatus", "<UNSET>"),
- self.testsfailed,
- self.testscollected,
- )
- def _node_location_to_relpath(self, node_path):
- # bestrelpath is a quite slow function
- return self._bestrelpathcache[node_path]
- @hookimpl(tryfirst=True)
- def pytest_collectstart(self):
- if self.shouldfail:
- raise self.Failed(self.shouldfail)
- if self.shouldstop:
- raise self.Interrupted(self.shouldstop)
- @hookimpl(tryfirst=True)
- def pytest_runtest_logreport(self, report):
- if report.failed and not hasattr(report, "wasxfail"):
- self.testsfailed += 1
- maxfail = self.config.getvalue("maxfail")
- if maxfail and self.testsfailed >= maxfail:
- self.shouldfail = "stopping after %d failures" % (self.testsfailed)
- pytest_collectreport = pytest_runtest_logreport
- def isinitpath(self, path):
- return path in self._initialpaths
- def gethookproxy(self, fspath):
- # check if we have the common case of running
- # hooks with all conftest.py files
- pm = self.config.pluginmanager
- my_conftestmodules = pm._getconftestmodules(fspath)
- remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
- if remove_mods:
- # one or more conftests are not in use at this fspath
- proxy = FSHookProxy(fspath, pm, remove_mods)
- else:
- # all plugis are active for this fspath
- proxy = self.config.hook
- return proxy
- def perform_collect(self, args=None, genitems=True):
- hook = self.config.hook
- try:
- items = self._perform_collect(args, genitems)
- self.config.pluginmanager.check_pending()
- hook.pytest_collection_modifyitems(
- session=self, config=self.config, items=items
- )
- finally:
- hook.pytest_collection_finish(session=self)
- self.testscollected = len(items)
- return items
- def _perform_collect(self, args, genitems):
- if args is None:
- args = self.config.args
- self.trace("perform_collect", self, args)
- self.trace.root.indent += 1
- self._notfound = []
- initialpaths = []
- self._initialparts = []
- self.items = items = []
- for arg in args:
- parts = self._parsearg(arg)
- self._initialparts.append(parts)
- initialpaths.append(parts[0])
- self._initialpaths = frozenset(initialpaths)
- rep = collect_one_node(self)
- self.ihook.pytest_collectreport(report=rep)
- self.trace.root.indent -= 1
- if self._notfound:
- errors = []
- for arg, exc in self._notfound:
- line = "(no name %r in any of %r)" % (arg, exc.args[0])
- errors.append("not found: %s\n%s" % (arg, line))
- # XXX: test this
- raise UsageError(*errors)
- if not genitems:
- return rep.result
- else:
- if rep.passed:
- for node in rep.result:
- self.items.extend(self.genitems(node))
- return items
- def collect(self):
- for initialpart in self._initialparts:
- arg = "::".join(map(str, initialpart))
- self.trace("processing argument", arg)
- self.trace.root.indent += 1
- try:
- for x in self._collect(arg):
- yield x
- except NoMatch:
- # we are inside a make_report hook so
- # we cannot directly pass through the exception
- self._notfound.append((arg, sys.exc_info()[1]))
- self.trace.root.indent -= 1
- def _collect(self, arg):
- from _pytest.python import Package
- names = self._parsearg(arg)
- argpath = names.pop(0)
- # Start with a Session root, and delve to argpath item (dir or file)
- # and stack all Packages found on the way.
- # No point in finding packages when collecting doctests
- if not self.config.getoption("doctestmodules", False):
- pm = self.config.pluginmanager
- for parent in reversed(argpath.parts()):
- if pm._confcutdir and pm._confcutdir.relto(parent):
- break
- if parent.isdir():
- pkginit = parent.join("__init__.py")
- if pkginit.isfile():
- if pkginit not in self._node_cache:
- col = self._collectfile(pkginit, handle_dupes=False)
- if col:
- if isinstance(col[0], Package):
- self._pkg_roots[parent] = col[0]
- # always store a list in the cache, matchnodes expects it
- self._node_cache[col[0].fspath] = [col[0]]
- # If it's a directory argument, recurse and look for any Subpackages.
- # Let the Package collector deal with subnodes, don't collect here.
- if argpath.check(dir=1):
- assert not names, "invalid arg %r" % (arg,)
- seen_dirs = set()
- for path in argpath.visit(
- fil=self._visit_filter, rec=self._recurse, bf=True, sort=True
- ):
- dirpath = path.dirpath()
- if dirpath not in seen_dirs:
- # Collect packages first.
- seen_dirs.add(dirpath)
- pkginit = dirpath.join("__init__.py")
- if pkginit.exists():
- for x in self._collectfile(pkginit):
- yield x
- if isinstance(x, Package):
- self._pkg_roots[dirpath] = x
- if dirpath in self._pkg_roots:
- # Do not collect packages here.
- continue
- for x in self._collectfile(path):
- key = (type(x), x.fspath)
- if key in self._node_cache:
- yield self._node_cache[key]
- else:
- self._node_cache[key] = x
- yield x
- else:
- assert argpath.check(file=1)
- if argpath in self._node_cache:
- col = self._node_cache[argpath]
- else:
- collect_root = self._pkg_roots.get(argpath.dirname, self)
- col = collect_root._collectfile(argpath, handle_dupes=False)
- if col:
- self._node_cache[argpath] = col
- m = self.matchnodes(col, names)
- # If __init__.py was the only file requested, then the matched node will be
- # the corresponding Package, and the first yielded item will be the __init__
- # Module itself, so just use that. If this special case isn't taken, then all
- # the files in the package will be yielded.
- if argpath.basename == "__init__.py":
- try:
- yield next(m[0].collect())
- except StopIteration:
- # The package collects nothing with only an __init__.py
- # file in it, which gets ignored by the default
- # "python_files" option.
- pass
- return
- for y in m:
- yield y
- def _collectfile(self, path, handle_dupes=True):
- assert path.isfile(), "%r is not a file (isdir=%r, exists=%r, islink=%r)" % (
- path,
- path.isdir(),
- path.exists(),
- path.islink(),
- )
- ihook = self.gethookproxy(path)
- if not self.isinitpath(path):
- if ihook.pytest_ignore_collect(path=path, config=self.config):
- return ()
- if handle_dupes:
- keepduplicates = self.config.getoption("keepduplicates")
- if not keepduplicates:
- duplicate_paths = self.config.pluginmanager._duplicatepaths
- if path in duplicate_paths:
- return ()
- else:
- duplicate_paths.add(path)
- return ihook.pytest_collect_file(path=path, parent=self)
- def _recurse(self, dirpath):
- if dirpath.basename == "__pycache__":
- return False
- ihook = self.gethookproxy(dirpath.dirpath())
- if ihook.pytest_ignore_collect(path=dirpath, config=self.config):
- return False
- for pat in self._norecursepatterns:
- if dirpath.check(fnmatch=pat):
- return False
- ihook = self.gethookproxy(dirpath)
- ihook.pytest_collect_directory(path=dirpath, parent=self)
- return True
- if six.PY2:
- @staticmethod
- def _visit_filter(f):
- return f.check(file=1) and not f.strpath.endswith("*.pyc")
- else:
- @staticmethod
- def _visit_filter(f):
- return f.check(file=1)
- def _tryconvertpyarg(self, x):
- """Convert a dotted module name to path."""
- try:
- with _patched_find_module():
- loader = pkgutil.find_loader(x)
- except ImportError:
- return x
- if loader is None:
- return x
- # This method is sometimes invoked when AssertionRewritingHook, which
- # does not define a get_filename method, is already in place:
- try:
- with _patched_find_module():
- path = loader.get_filename(x)
- except AttributeError:
- # Retrieve path from AssertionRewritingHook:
- path = loader.modules[x][0].co_filename
- if loader.is_package(x):
- path = os.path.dirname(path)
- return path
- def _parsearg(self, arg):
- """ return (fspath, names) tuple after checking the file exists. """
- parts = str(arg).split("::")
- if self.config.option.pyargs:
- parts[0] = self._tryconvertpyarg(parts[0])
- relpath = parts[0].replace("/", os.sep)
- path = self.config.invocation_dir.join(relpath, abs=True)
- if not path.check():
- if self.config.option.pyargs:
- raise UsageError(
- "file or package not found: " + arg + " (missing __init__.py?)"
- )
- raise UsageError("file not found: " + arg)
- parts[0] = path.realpath()
- return parts
- def matchnodes(self, matching, names):
- self.trace("matchnodes", matching, names)
- self.trace.root.indent += 1
- nodes = self._matchnodes(matching, names)
- num = len(nodes)
- self.trace("matchnodes finished -> ", num, "nodes")
- self.trace.root.indent -= 1
- if num == 0:
- raise NoMatch(matching, names[:1])
- return nodes
- def _matchnodes(self, matching, names):
- if not matching or not names:
- return matching
- name = names[0]
- assert name
- nextnames = names[1:]
- resultnodes = []
- for node in matching:
- if isinstance(node, nodes.Item):
- if not names:
- resultnodes.append(node)
- continue
- assert isinstance(node, nodes.Collector)
- key = (type(node), node.nodeid)
- if key in self._node_cache:
- rep = self._node_cache[key]
- else:
- rep = collect_one_node(node)
- self._node_cache[key] = rep
- if rep.passed:
- has_matched = False
- for x in rep.result:
- # TODO: remove parametrized workaround once collection structure contains parametrization
- if x.name == name or x.name.split("[")[0] == name:
- resultnodes.extend(self.matchnodes([x], nextnames))
- has_matched = True
- # XXX accept IDs that don't have "()" for class instances
- if not has_matched and len(rep.result) == 1 and x.name == "()":
- nextnames.insert(0, name)
- resultnodes.extend(self.matchnodes([x], nextnames))
- else:
- # report collection failures here to avoid failing to run some test
- # specified in the command line because the module could not be
- # imported (#134)
- node.ihook.pytest_collectreport(report=rep)
- return resultnodes
- def genitems(self, node):
- self.trace("genitems", node)
- if isinstance(node, nodes.Item):
- node.ihook.pytest_itemcollected(item=node)
- yield node
- else:
- assert isinstance(node, nodes.Collector)
- rep = collect_one_node(node)
- if rep.passed:
- for subnode in rep.result:
- for x in self.genitems(subnode):
- yield x
- node.ihook.pytest_collectreport(report=rep)
|