12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481 |
- """Terminal reporting of the full testing process.
- This is a good source for looking at the various reporting hooks.
- """
- import argparse
- import dataclasses
- import datetime
- import inspect
- import platform
- import sys
- import textwrap
- import warnings
- from collections import Counter
- from functools import partial
- from pathlib import Path
- from typing import Any
- from typing import Callable
- from typing import cast
- from typing import ClassVar
- from typing import Dict
- from typing import Generator
- from typing import List
- from typing import Mapping
- from typing import NamedTuple
- from typing import Optional
- from typing import Sequence
- from typing import Set
- from typing import TextIO
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import Union
- import pluggy
- import _pytest._version
- from _pytest import nodes
- from _pytest import timing
- from _pytest._code import ExceptionInfo
- from _pytest._code.code import ExceptionRepr
- from _pytest._io import TerminalWriter
- from _pytest._io.wcwidth import wcswidth
- from _pytest.assertion.util import running_on_ci
- from _pytest.compat import final
- from _pytest.config import _PluggyPlugin
- from _pytest.config import Config
- from _pytest.config import ExitCode
- from _pytest.config import hookimpl
- from _pytest.config.argparsing import Parser
- from _pytest.nodes import Item
- from _pytest.nodes import Node
- from _pytest.pathlib import absolutepath
- from _pytest.pathlib import bestrelpath
- from _pytest.reports import BaseReport
- from _pytest.reports import CollectReport
- from _pytest.reports import TestReport
- if TYPE_CHECKING:
- from typing_extensions import Literal
- from _pytest.main import Session
- REPORT_COLLECTING_RESOLUTION = 0.5
- KNOWN_TYPES = (
- "failed",
- "passed",
- "skipped",
- "deselected",
- "xfailed",
- "xpassed",
- "warnings",
- "error",
- )
- _REPORTCHARS_DEFAULT = "fE"
- class MoreQuietAction(argparse.Action):
- """A modified copy of the argparse count action which counts down and updates
- the legacy quiet attribute at the same time.
- Used to unify verbosity handling.
- """
- def __init__(
- self,
- option_strings: Sequence[str],
- dest: str,
- default: object = None,
- required: bool = False,
- help: Optional[str] = None,
- ) -> None:
- super().__init__(
- option_strings=option_strings,
- dest=dest,
- nargs=0,
- default=default,
- required=required,
- help=help,
- )
- def __call__(
- self,
- parser: argparse.ArgumentParser,
- namespace: argparse.Namespace,
- values: Union[str, Sequence[object], None],
- option_string: Optional[str] = None,
- ) -> None:
- new_count = getattr(namespace, self.dest, 0) - 1
- setattr(namespace, self.dest, new_count)
- # todo Deprecate config.quiet
- namespace.quiet = getattr(namespace, "quiet", 0) + 1
- class TestShortLogReport(NamedTuple):
- """Used to store the test status result category, shortletter and verbose word.
- For example ``"rerun", "R", ("RERUN", {"yellow": True})``.
- :ivar category:
- The class of result, for example ``“passed”``, ``“skipped”``, ``“error”``, or the empty string.
- :ivar letter:
- The short letter shown as testing progresses, for example ``"."``, ``"s"``, ``"E"``, or the empty string.
- :ivar word:
- Verbose word is shown as testing progresses in verbose mode, for example ``"PASSED"``, ``"SKIPPED"``,
- ``"ERROR"``, or the empty string.
- """
- category: str
- letter: str
- word: Union[str, Tuple[str, Mapping[str, bool]]]
- def pytest_addoption(parser: Parser) -> None:
- group = parser.getgroup("terminal reporting", "Reporting", after="general")
- group._addoption(
- "-v",
- "--verbose",
- action="count",
- default=0,
- dest="verbose",
- help="Increase verbosity",
- )
- group._addoption(
- "--no-header",
- action="store_true",
- default=False,
- dest="no_header",
- help="Disable header",
- )
- group._addoption(
- "--no-summary",
- action="store_true",
- default=False,
- dest="no_summary",
- help="Disable summary",
- )
- group._addoption(
- "-q",
- "--quiet",
- action=MoreQuietAction,
- default=0,
- dest="verbose",
- help="Decrease verbosity",
- )
- group._addoption(
- "--verbosity",
- dest="verbose",
- type=int,
- default=0,
- help="Set verbosity. Default: 0.",
- )
- group._addoption(
- "-r",
- action="store",
- dest="reportchars",
- default=_REPORTCHARS_DEFAULT,
- metavar="chars",
- help="Show extra test summary info as specified by chars: (f)ailed, "
- "(E)rror, (s)kipped, (x)failed, (X)passed, "
- "(p)assed, (P)assed with output, (a)ll except passed (p/P), or (A)ll. "
- "(w)arnings are enabled by default (see --disable-warnings), "
- "'N' can be used to reset the list. (default: 'fE').",
- )
- group._addoption(
- "--disable-warnings",
- "--disable-pytest-warnings",
- default=False,
- dest="disable_warnings",
- action="store_true",
- help="Disable warnings summary",
- )
- group._addoption(
- "-l",
- "--showlocals",
- action="store_true",
- dest="showlocals",
- default=False,
- help="Show locals in tracebacks (disabled by default)",
- )
- group._addoption(
- "--no-showlocals",
- action="store_false",
- dest="showlocals",
- help="Hide locals in tracebacks (negate --showlocals passed through addopts)",
- )
- group._addoption(
- "--tb",
- metavar="style",
- action="store",
- dest="tbstyle",
- default="auto",
- choices=["auto", "long", "short", "no", "line", "native"],
- help="Traceback print mode (auto/long/short/line/native/no)",
- )
- group._addoption(
- "--show-capture",
- action="store",
- dest="showcapture",
- choices=["no", "stdout", "stderr", "log", "all"],
- default="all",
- help="Controls how captured stdout/stderr/log is shown on failed tests. "
- "Default: all.",
- )
- group._addoption(
- "--fulltrace",
- "--full-trace",
- action="store_true",
- default=False,
- help="Don't cut any tracebacks (default is to cut)",
- )
- group._addoption(
- "--color",
- metavar="color",
- action="store",
- dest="color",
- default="auto",
- choices=["yes", "no", "auto"],
- help="Color terminal output (yes/no/auto)",
- )
- group._addoption(
- "--code-highlight",
- default="yes",
- choices=["yes", "no"],
- help="Whether code should be highlighted (only if --color is also enabled). "
- "Default: yes.",
- )
- parser.addini(
- "console_output_style",
- help='Console output: "classic", or with additional progress information '
- '("progress" (percentage) | "count" | "progress-even-when-capture-no" (forces '
- "progress even when capture=no)",
- default="progress",
- )
- def pytest_configure(config: Config) -> None:
- reporter = TerminalReporter(config, sys.stdout)
- config.pluginmanager.register(reporter, "terminalreporter")
- if config.option.debug or config.option.traceconfig:
- def mywriter(tags, args):
- msg = " ".join(map(str, args))
- reporter.write_line("[traceconfig] " + msg)
- config.trace.root.setprocessor("pytest:config", mywriter)
- def getreportopt(config: Config) -> str:
- reportchars: str = config.option.reportchars
- old_aliases = {"F", "S"}
- reportopts = ""
- for char in reportchars:
- if char in old_aliases:
- char = char.lower()
- if char == "a":
- reportopts = "sxXEf"
- elif char == "A":
- reportopts = "PpsxXEf"
- elif char == "N":
- reportopts = ""
- elif char not in reportopts:
- reportopts += char
- if not config.option.disable_warnings and "w" not in reportopts:
- reportopts = "w" + reportopts
- elif config.option.disable_warnings and "w" in reportopts:
- reportopts = reportopts.replace("w", "")
- return reportopts
- @hookimpl(trylast=True) # after _pytest.runner
- def pytest_report_teststatus(report: BaseReport) -> Tuple[str, str, str]:
- letter = "F"
- if report.passed:
- letter = "."
- elif report.skipped:
- letter = "s"
- outcome: str = report.outcome
- if report.when in ("collect", "setup", "teardown") and outcome == "failed":
- outcome = "error"
- letter = "E"
- return outcome, letter, outcome.upper()
- @dataclasses.dataclass
- class WarningReport:
- """Simple structure to hold warnings information captured by ``pytest_warning_recorded``.
- :ivar str message:
- User friendly message about the warning.
- :ivar str|None nodeid:
- nodeid that generated the warning (see ``get_location``).
- :ivar tuple fslocation:
- File system location of the source of the warning (see ``get_location``).
- """
- message: str
- nodeid: Optional[str] = None
- fslocation: Optional[Tuple[str, int]] = None
- count_towards_summary: ClassVar = True
- def get_location(self, config: Config) -> Optional[str]:
- """Return the more user-friendly information about the location of a warning, or None."""
- if self.nodeid:
- return self.nodeid
- if self.fslocation:
- filename, linenum = self.fslocation
- relpath = bestrelpath(config.invocation_params.dir, absolutepath(filename))
- return f"{relpath}:{linenum}"
- return None
- @final
- class TerminalReporter:
- def __init__(self, config: Config, file: Optional[TextIO] = None) -> None:
- import _pytest.config
- self.config = config
- self._numcollected = 0
- self._session: Optional[Session] = None
- self._showfspath: Optional[bool] = None
- self.stats: Dict[str, List[Any]] = {}
- self._main_color: Optional[str] = None
- self._known_types: Optional[List[str]] = None
- self.startpath = config.invocation_params.dir
- if file is None:
- file = sys.stdout
- self._tw = _pytest.config.create_terminal_writer(config, file)
- self._screen_width = self._tw.fullwidth
- self.currentfspath: Union[None, Path, str, int] = None
- self.reportchars = getreportopt(config)
- self.hasmarkup = self._tw.hasmarkup
- self.isatty = file.isatty()
- self._progress_nodeids_reported: Set[str] = set()
- self._show_progress_info = self._determine_show_progress_info()
- self._collect_report_last_write: Optional[float] = None
- self._already_displayed_warnings: Optional[int] = None
- self._keyboardinterrupt_memo: Optional[ExceptionRepr] = None
- def _determine_show_progress_info(self) -> "Literal['progress', 'count', False]":
- """Return whether we should display progress information based on the current config."""
- # do not show progress if we are not capturing output (#3038) unless explicitly
- # overridden by progress-even-when-capture-no
- if (
- self.config.getoption("capture", "no") == "no"
- and self.config.getini("console_output_style")
- != "progress-even-when-capture-no"
- ):
- return False
- # do not show progress if we are showing fixture setup/teardown
- if self.config.getoption("setupshow", False):
- return False
- cfg: str = self.config.getini("console_output_style")
- if cfg == "progress" or cfg == "progress-even-when-capture-no":
- return "progress"
- elif cfg == "count":
- return "count"
- else:
- return False
- @property
- def verbosity(self) -> int:
- verbosity: int = self.config.option.verbose
- return verbosity
- @property
- def showheader(self) -> bool:
- return self.verbosity >= 0
- @property
- def no_header(self) -> bool:
- return bool(self.config.option.no_header)
- @property
- def no_summary(self) -> bool:
- return bool(self.config.option.no_summary)
- @property
- def showfspath(self) -> bool:
- if self._showfspath is None:
- return self.verbosity >= 0
- return self._showfspath
- @showfspath.setter
- def showfspath(self, value: Optional[bool]) -> None:
- self._showfspath = value
- @property
- def showlongtestinfo(self) -> bool:
- return self.verbosity > 0
- def hasopt(self, char: str) -> bool:
- char = {"xfailed": "x", "skipped": "s"}.get(char, char)
- return char in self.reportchars
- def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None:
- fspath = self.config.rootpath / nodeid.split("::")[0]
- if self.currentfspath is None or fspath != self.currentfspath:
- if self.currentfspath is not None and self._show_progress_info:
- self._write_progress_information_filling_space()
- self.currentfspath = fspath
- relfspath = bestrelpath(self.startpath, fspath)
- self._tw.line()
- self._tw.write(relfspath + " ")
- self._tw.write(res, flush=True, **markup)
- def write_ensure_prefix(self, prefix: str, extra: str = "", **kwargs) -> None:
- if self.currentfspath != prefix:
- self._tw.line()
- self.currentfspath = prefix
- self._tw.write(prefix)
- if extra:
- self._tw.write(extra, **kwargs)
- self.currentfspath = -2
- def ensure_newline(self) -> None:
- if self.currentfspath:
- self._tw.line()
- self.currentfspath = None
- def wrap_write(
- self,
- content: str,
- *,
- flush: bool = False,
- margin: int = 8,
- line_sep: str = "\n",
- **markup: bool,
- ) -> None:
- """Wrap message with margin for progress info."""
- width_of_current_line = self._tw.width_of_current_line
- wrapped = line_sep.join(
- textwrap.wrap(
- " " * width_of_current_line + content,
- width=self._screen_width - margin,
- drop_whitespace=True,
- replace_whitespace=False,
- ),
- )
- wrapped = wrapped[width_of_current_line:]
- self._tw.write(wrapped, flush=flush, **markup)
- def write(self, content: str, *, flush: bool = False, **markup: bool) -> None:
- self._tw.write(content, flush=flush, **markup)
- def flush(self) -> None:
- self._tw.flush()
- def write_line(self, line: Union[str, bytes], **markup: bool) -> None:
- if not isinstance(line, str):
- line = str(line, errors="replace")
- self.ensure_newline()
- self._tw.line(line, **markup)
- def rewrite(self, line: str, **markup: bool) -> None:
- """Rewinds the terminal cursor to the beginning and writes the given line.
- :param erase:
- If True, will also add spaces until the full terminal width to ensure
- previous lines are properly erased.
- The rest of the keyword arguments are markup instructions.
- """
- erase = markup.pop("erase", False)
- if erase:
- fill_count = self._tw.fullwidth - len(line) - 1
- fill = " " * fill_count
- else:
- fill = ""
- line = str(line)
- self._tw.write("\r" + line + fill, **markup)
- def write_sep(
- self,
- sep: str,
- title: Optional[str] = None,
- fullwidth: Optional[int] = None,
- **markup: bool,
- ) -> None:
- self.ensure_newline()
- self._tw.sep(sep, title, fullwidth, **markup)
- def section(self, title: str, sep: str = "=", **kw: bool) -> None:
- self._tw.sep(sep, title, **kw)
- def line(self, msg: str, **kw: bool) -> None:
- self._tw.line(msg, **kw)
- def _add_stats(self, category: str, items: Sequence[Any]) -> None:
- set_main_color = category not in self.stats
- self.stats.setdefault(category, []).extend(items)
- if set_main_color:
- self._set_main_color()
- def pytest_internalerror(self, excrepr: ExceptionRepr) -> bool:
- for line in str(excrepr).split("\n"):
- self.write_line("INTERNALERROR> " + line)
- return True
- def pytest_warning_recorded(
- self,
- warning_message: warnings.WarningMessage,
- nodeid: str,
- ) -> None:
- from _pytest.warnings import warning_record_to_str
- fslocation = warning_message.filename, warning_message.lineno
- message = warning_record_to_str(warning_message)
- warning_report = WarningReport(
- fslocation=fslocation, message=message, nodeid=nodeid
- )
- self._add_stats("warnings", [warning_report])
- def pytest_plugin_registered(self, plugin: _PluggyPlugin) -> None:
- if self.config.option.traceconfig:
- msg = f"PLUGIN registered: {plugin}"
- # XXX This event may happen during setup/teardown time
- # which unfortunately captures our output here
- # which garbles our output if we use self.write_line.
- self.write_line(msg)
- def pytest_deselected(self, items: Sequence[Item]) -> None:
- self._add_stats("deselected", items)
- def pytest_runtest_logstart(
- self, nodeid: str, location: Tuple[str, Optional[int], str]
- ) -> None:
- # Ensure that the path is printed before the
- # 1st test of a module starts running.
- if self.showlongtestinfo:
- line = self._locationline(nodeid, *location)
- self.write_ensure_prefix(line, "")
- self.flush()
- elif self.showfspath:
- self.write_fspath_result(nodeid, "")
- self.flush()
- def pytest_runtest_logreport(self, report: TestReport) -> None:
- self._tests_ran = True
- rep = report
- res = TestShortLogReport(
- *self.config.hook.pytest_report_teststatus(report=rep, config=self.config)
- )
- category, letter, word = res.category, res.letter, res.word
- if not isinstance(word, tuple):
- markup = None
- else:
- word, markup = word
- self._add_stats(category, [rep])
- if not letter and not word:
- # Probably passed setup/teardown.
- return
- running_xdist = hasattr(rep, "node")
- if markup is None:
- was_xfail = hasattr(report, "wasxfail")
- if rep.passed and not was_xfail:
- markup = {"green": True}
- elif rep.passed and was_xfail:
- markup = {"yellow": True}
- elif rep.failed:
- markup = {"red": True}
- elif rep.skipped:
- markup = {"yellow": True}
- else:
- markup = {}
- if self.verbosity <= 0:
- self._tw.write(letter, **markup)
- else:
- self._progress_nodeids_reported.add(rep.nodeid)
- line = self._locationline(rep.nodeid, *rep.location)
- if not running_xdist:
- self.write_ensure_prefix(line, word, **markup)
- if rep.skipped or hasattr(report, "wasxfail"):
- reason = _get_raw_skip_reason(rep)
- if self.config.option.verbose < 2:
- available_width = (
- (self._tw.fullwidth - self._tw.width_of_current_line)
- - len(" [100%]")
- - 1
- )
- formatted_reason = _format_trimmed(
- " ({})", reason, available_width
- )
- else:
- formatted_reason = f" ({reason})"
- if reason and formatted_reason is not None:
- self.wrap_write(formatted_reason)
- if self._show_progress_info:
- self._write_progress_information_filling_space()
- else:
- self.ensure_newline()
- self._tw.write("[%s]" % rep.node.gateway.id)
- if self._show_progress_info:
- self._tw.write(
- self._get_progress_information_message() + " ", cyan=True
- )
- else:
- self._tw.write(" ")
- self._tw.write(word, **markup)
- self._tw.write(" " + line)
- self.currentfspath = -2
- self.flush()
- @property
- def _is_last_item(self) -> bool:
- assert self._session is not None
- return len(self._progress_nodeids_reported) == self._session.testscollected
- def pytest_runtest_logfinish(self, nodeid: str) -> None:
- assert self._session
- if self.verbosity <= 0 and self._show_progress_info:
- if self._show_progress_info == "count":
- num_tests = self._session.testscollected
- progress_length = len(f" [{num_tests}/{num_tests}]")
- else:
- progress_length = len(" [100%]")
- self._progress_nodeids_reported.add(nodeid)
- if self._is_last_item:
- self._write_progress_information_filling_space()
- else:
- main_color, _ = self._get_main_color()
- w = self._width_of_current_line
- past_edge = w + progress_length + 1 >= self._screen_width
- if past_edge:
- msg = self._get_progress_information_message()
- self._tw.write(msg + "\n", **{main_color: True})
- def _get_progress_information_message(self) -> str:
- assert self._session
- collected = self._session.testscollected
- if self._show_progress_info == "count":
- if collected:
- progress = self._progress_nodeids_reported
- counter_format = f"{{:{len(str(collected))}d}}"
- format_string = f" [{counter_format}/{{}}]"
- return format_string.format(len(progress), collected)
- return f" [ {collected} / {collected} ]"
- else:
- if collected:
- return " [{:3d}%]".format(
- len(self._progress_nodeids_reported) * 100 // collected
- )
- return " [100%]"
- def _write_progress_information_filling_space(self) -> None:
- color, _ = self._get_main_color()
- msg = self._get_progress_information_message()
- w = self._width_of_current_line
- fill = self._tw.fullwidth - w - 1
- self.write(msg.rjust(fill), flush=True, **{color: True})
- @property
- def _width_of_current_line(self) -> int:
- """Return the width of the current line."""
- return self._tw.width_of_current_line
- def pytest_collection(self) -> None:
- if self.isatty:
- if self.config.option.verbose >= 0:
- self.write("collecting ... ", flush=True, bold=True)
- self._collect_report_last_write = timing.time()
- elif self.config.option.verbose >= 1:
- self.write("collecting ... ", flush=True, bold=True)
- def pytest_collectreport(self, report: CollectReport) -> None:
- if report.failed:
- self._add_stats("error", [report])
- elif report.skipped:
- self._add_stats("skipped", [report])
- items = [x for x in report.result if isinstance(x, Item)]
- self._numcollected += len(items)
- if self.isatty:
- self.report_collect()
- def report_collect(self, final: bool = False) -> None:
- if self.config.option.verbose < 0:
- return
- if not final:
- # Only write "collecting" report every 0.5s.
- t = timing.time()
- if (
- self._collect_report_last_write is not None
- and self._collect_report_last_write > t - REPORT_COLLECTING_RESOLUTION
- ):
- return
- self._collect_report_last_write = t
- errors = len(self.stats.get("error", []))
- skipped = len(self.stats.get("skipped", []))
- deselected = len(self.stats.get("deselected", []))
- selected = self._numcollected - deselected
- line = "collected " if final else "collecting "
- line += (
- str(self._numcollected) + " item" + ("" if self._numcollected == 1 else "s")
- )
- if errors:
- line += " / %d error%s" % (errors, "s" if errors != 1 else "")
- if deselected:
- line += " / %d deselected" % deselected
- if skipped:
- line += " / %d skipped" % skipped
- if self._numcollected > selected:
- line += " / %d selected" % selected
- if self.isatty:
- self.rewrite(line, bold=True, erase=True)
- if final:
- self.write("\n")
- else:
- self.write_line(line)
- @hookimpl(trylast=True)
- def pytest_sessionstart(self, session: "Session") -> None:
- self._session = session
- self._sessionstarttime = timing.time()
- if not self.showheader:
- return
- self.write_sep("=", "test session starts", bold=True)
- verinfo = platform.python_version()
- if not self.no_header:
- msg = f"platform {sys.platform} -- Python {verinfo}"
- pypy_version_info = getattr(sys, "pypy_version_info", None)
- if pypy_version_info:
- verinfo = ".".join(map(str, pypy_version_info[:3]))
- msg += f"[pypy-{verinfo}-{pypy_version_info[3]}]"
- msg += ", pytest-{}, pluggy-{}".format(
- _pytest._version.version, pluggy.__version__
- )
- if (
- self.verbosity > 0
- or self.config.option.debug
- or getattr(self.config.option, "pastebin", None)
- ):
- msg += " -- " + str(sys.executable)
- self.write_line(msg)
- lines = self.config.hook.pytest_report_header(
- config=self.config, start_path=self.startpath
- )
- self._write_report_lines_from_hooks(lines)
- def _write_report_lines_from_hooks(
- self, lines: Sequence[Union[str, Sequence[str]]]
- ) -> None:
- for line_or_lines in reversed(lines):
- if isinstance(line_or_lines, str):
- self.write_line(line_or_lines)
- else:
- for line in line_or_lines:
- self.write_line(line)
- def pytest_report_header(self, config: Config) -> List[str]:
- result = [f"rootdir: {config.rootpath}"]
- if config.inipath:
- result.append("configfile: " + bestrelpath(config.rootpath, config.inipath))
- if config.args_source == Config.ArgsSource.TESTPATHS:
- testpaths: List[str] = config.getini("testpaths")
- result.append("testpaths: {}".format(", ".join(testpaths)))
- plugininfo = config.pluginmanager.list_plugin_distinfo()
- if plugininfo:
- result.append("plugins: %s" % ", ".join(_plugin_nameversions(plugininfo)))
- return result
- def pytest_collection_finish(self, session: "Session") -> None:
- self.report_collect(True)
- lines = self.config.hook.pytest_report_collectionfinish(
- config=self.config,
- start_path=self.startpath,
- items=session.items,
- )
- self._write_report_lines_from_hooks(lines)
- if self.config.getoption("collectonly"):
- if session.items:
- if self.config.option.verbose > -1:
- self._tw.line("")
- self._printcollecteditems(session.items)
- failed = self.stats.get("failed")
- if failed:
- self._tw.sep("!", "collection failures")
- for rep in failed:
- rep.toterminal(self._tw)
- def _printcollecteditems(self, items: Sequence[Item]) -> None:
- if self.config.option.verbose < 0:
- if self.config.option.verbose < -1:
- counts = Counter(item.nodeid.split("::", 1)[0] for item in items)
- for name, count in sorted(counts.items()):
- self._tw.line("%s: %d" % (name, count))
- else:
- for item in items:
- self._tw.line(item.nodeid)
- return
- stack: List[Node] = []
- indent = ""
- for item in items:
- needed_collectors = item.listchain()[1:] # strip root node
- while stack:
- if stack == needed_collectors[: len(stack)]:
- break
- stack.pop()
- for col in needed_collectors[len(stack) :]:
- stack.append(col)
- indent = (len(stack) - 1) * " "
- self._tw.line(f"{indent}{col}")
- if self.config.option.verbose >= 1:
- obj = getattr(col, "obj", None)
- doc = inspect.getdoc(obj) if obj else None
- if doc:
- for line in doc.splitlines():
- self._tw.line("{}{}".format(indent + " ", line))
- @hookimpl(hookwrapper=True)
- def pytest_sessionfinish(
- self, session: "Session", exitstatus: Union[int, ExitCode]
- ):
- outcome = yield
- outcome.get_result()
- self._tw.line("")
- summary_exit_codes = (
- ExitCode.OK,
- ExitCode.TESTS_FAILED,
- ExitCode.INTERRUPTED,
- ExitCode.USAGE_ERROR,
- ExitCode.NO_TESTS_COLLECTED,
- )
- if exitstatus in summary_exit_codes and not self.no_summary:
- self.config.hook.pytest_terminal_summary(
- terminalreporter=self, exitstatus=exitstatus, config=self.config
- )
- if session.shouldfail:
- self.write_sep("!", str(session.shouldfail), red=True)
- if exitstatus == ExitCode.INTERRUPTED:
- self._report_keyboardinterrupt()
- self._keyboardinterrupt_memo = None
- elif session.shouldstop:
- self.write_sep("!", str(session.shouldstop), red=True)
- self.summary_stats()
- @hookimpl(hookwrapper=True)
- def pytest_terminal_summary(self) -> Generator[None, None, None]:
- self.summary_errors()
- self.summary_failures()
- self.summary_warnings()
- self.summary_passes()
- yield
- self.short_test_summary()
- # Display any extra warnings from teardown here (if any).
- self.summary_warnings()
- def pytest_keyboard_interrupt(self, excinfo: ExceptionInfo[BaseException]) -> None:
- self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)
- def pytest_unconfigure(self) -> None:
- if self._keyboardinterrupt_memo is not None:
- self._report_keyboardinterrupt()
- def _report_keyboardinterrupt(self) -> None:
- excrepr = self._keyboardinterrupt_memo
- assert excrepr is not None
- assert excrepr.reprcrash is not None
- msg = excrepr.reprcrash.message
- self.write_sep("!", msg)
- if "KeyboardInterrupt" in msg:
- if self.config.option.fulltrace:
- excrepr.toterminal(self._tw)
- else:
- excrepr.reprcrash.toterminal(self._tw)
- self._tw.line(
- "(to show a full traceback on KeyboardInterrupt use --full-trace)",
- yellow=True,
- )
- def _locationline(
- self, nodeid: str, fspath: str, lineno: Optional[int], domain: str
- ) -> str:
- def mkrel(nodeid: str) -> str:
- line = self.config.cwd_relative_nodeid(nodeid)
- if domain and line.endswith(domain):
- line = line[: -len(domain)]
- values = domain.split("[")
- values[0] = values[0].replace(".", "::") # don't replace '.' in params
- line += "[".join(values)
- return line
- # collect_fspath comes from testid which has a "/"-normalized path.
- if fspath:
- res = mkrel(nodeid)
- if self.verbosity >= 2 and nodeid.split("::")[0] != fspath.replace(
- "\\", nodes.SEP
- ):
- res += " <- " + bestrelpath(self.startpath, Path(fspath))
- else:
- res = "[location]"
- return res + " "
- def _getfailureheadline(self, rep):
- head_line = rep.head_line
- if head_line:
- return head_line
- return "test session" # XXX?
- def _getcrashline(self, rep):
- try:
- return str(rep.longrepr.reprcrash)
- except AttributeError:
- try:
- return str(rep.longrepr)[:50]
- except AttributeError:
- return ""
- #
- # Summaries for sessionfinish.
- #
- def getreports(self, name: str):
- return [x for x in self.stats.get(name, ()) if not hasattr(x, "_pdbshown")]
- def summary_warnings(self) -> None:
- if self.hasopt("w"):
- all_warnings: Optional[List[WarningReport]] = self.stats.get("warnings")
- if not all_warnings:
- return
- final = self._already_displayed_warnings is not None
- if final:
- warning_reports = all_warnings[self._already_displayed_warnings :]
- else:
- warning_reports = all_warnings
- self._already_displayed_warnings = len(warning_reports)
- if not warning_reports:
- return
- reports_grouped_by_message: Dict[str, List[WarningReport]] = {}
- for wr in warning_reports:
- reports_grouped_by_message.setdefault(wr.message, []).append(wr)
- def collapsed_location_report(reports: List[WarningReport]) -> str:
- locations = []
- for w in reports:
- location = w.get_location(self.config)
- if location:
- locations.append(location)
- if len(locations) < 10:
- return "\n".join(map(str, locations))
- counts_by_filename = Counter(
- str(loc).split("::", 1)[0] for loc in locations
- )
- return "\n".join(
- "{}: {} warning{}".format(k, v, "s" if v > 1 else "")
- for k, v in counts_by_filename.items()
- )
- title = "warnings summary (final)" if final else "warnings summary"
- self.write_sep("=", title, yellow=True, bold=False)
- for message, message_reports in reports_grouped_by_message.items():
- maybe_location = collapsed_location_report(message_reports)
- if maybe_location:
- self._tw.line(maybe_location)
- lines = message.splitlines()
- indented = "\n".join(" " + x for x in lines)
- message = indented.rstrip()
- else:
- message = message.rstrip()
- self._tw.line(message)
- self._tw.line()
- self._tw.line(
- "-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html"
- )
- def summary_passes(self) -> None:
- if self.config.option.tbstyle != "no":
- if self.hasopt("P"):
- reports: List[TestReport] = self.getreports("passed")
- if not reports:
- return
- self.write_sep("=", "PASSES")
- for rep in reports:
- if rep.sections:
- msg = self._getfailureheadline(rep)
- self.write_sep("_", msg, green=True, bold=True)
- self._outrep_summary(rep)
- self._handle_teardown_sections(rep.nodeid)
- def _get_teardown_reports(self, nodeid: str) -> List[TestReport]:
- reports = self.getreports("")
- return [
- report
- for report in reports
- if report.when == "teardown" and report.nodeid == nodeid
- ]
- def _handle_teardown_sections(self, nodeid: str) -> None:
- for report in self._get_teardown_reports(nodeid):
- self.print_teardown_sections(report)
- def print_teardown_sections(self, rep: TestReport) -> None:
- showcapture = self.config.option.showcapture
- if showcapture == "no":
- return
- for secname, content in rep.sections:
- if showcapture != "all" and showcapture not in secname:
- continue
- if "teardown" in secname:
- self._tw.sep("-", secname)
- if content[-1:] == "\n":
- content = content[:-1]
- self._tw.line(content)
- def summary_failures(self) -> None:
- if self.config.option.tbstyle != "no":
- reports: List[BaseReport] = self.getreports("failed")
- if not reports:
- return
- self.write_sep("=", "FAILURES")
- if self.config.option.tbstyle == "line":
- for rep in reports:
- line = self._getcrashline(rep)
- self.write_line(line)
- else:
- for rep in reports:
- msg = self._getfailureheadline(rep)
- self.write_sep("_", msg, red=True, bold=True)
- self._outrep_summary(rep)
- self._handle_teardown_sections(rep.nodeid)
- def summary_errors(self) -> None:
- if self.config.option.tbstyle != "no":
- reports: List[BaseReport] = self.getreports("error")
- if not reports:
- return
- self.write_sep("=", "ERRORS")
- for rep in self.stats["error"]:
- msg = self._getfailureheadline(rep)
- if rep.when == "collect":
- msg = "ERROR collecting " + msg
- else:
- msg = f"ERROR at {rep.when} of {msg}"
- self.write_sep("_", msg, red=True, bold=True)
- self._outrep_summary(rep)
- def _outrep_summary(self, rep: BaseReport) -> None:
- rep.toterminal(self._tw)
- showcapture = self.config.option.showcapture
- if showcapture == "no":
- return
- for secname, content in rep.sections:
- if showcapture != "all" and showcapture not in secname:
- continue
- self._tw.sep("-", secname)
- if content[-1:] == "\n":
- content = content[:-1]
- self._tw.line(content)
- def summary_stats(self) -> None:
- if self.verbosity < -1:
- return
- session_duration = timing.time() - self._sessionstarttime
- (parts, main_color) = self.build_summary_stats_line()
- line_parts = []
- display_sep = self.verbosity >= 0
- if display_sep:
- fullwidth = self._tw.fullwidth
- for text, markup in parts:
- with_markup = self._tw.markup(text, **markup)
- if display_sep:
- fullwidth += len(with_markup) - len(text)
- line_parts.append(with_markup)
- msg = ", ".join(line_parts)
- main_markup = {main_color: True}
- duration = f" in {format_session_duration(session_duration)}"
- duration_with_markup = self._tw.markup(duration, **main_markup)
- if display_sep:
- fullwidth += len(duration_with_markup) - len(duration)
- msg += duration_with_markup
- if display_sep:
- markup_for_end_sep = self._tw.markup("", **main_markup)
- if markup_for_end_sep.endswith("\x1b[0m"):
- markup_for_end_sep = markup_for_end_sep[:-4]
- fullwidth += len(markup_for_end_sep)
- msg += markup_for_end_sep
- if display_sep:
- self.write_sep("=", msg, fullwidth=fullwidth, **main_markup)
- else:
- self.write_line(msg, **main_markup)
- def short_test_summary(self) -> None:
- if not self.reportchars:
- return
- def show_simple(lines: List[str], *, stat: str) -> None:
- failed = self.stats.get(stat, [])
- if not failed:
- return
- config = self.config
- for rep in failed:
- color = _color_for_type.get(stat, _color_for_type_default)
- line = _get_line_with_reprcrash_message(
- config, rep, self._tw, {color: True}
- )
- lines.append(line)
- def show_xfailed(lines: List[str]) -> None:
- xfailed = self.stats.get("xfailed", [])
- for rep in xfailed:
- verbose_word = rep._get_verbose_word(self.config)
- markup_word = self._tw.markup(
- verbose_word, **{_color_for_type["warnings"]: True}
- )
- nodeid = _get_node_id_with_markup(self._tw, self.config, rep)
- line = f"{markup_word} {nodeid}"
- reason = rep.wasxfail
- if reason:
- line += " - " + str(reason)
- lines.append(line)
- def show_xpassed(lines: List[str]) -> None:
- xpassed = self.stats.get("xpassed", [])
- for rep in xpassed:
- verbose_word = rep._get_verbose_word(self.config)
- markup_word = self._tw.markup(
- verbose_word, **{_color_for_type["warnings"]: True}
- )
- nodeid = _get_node_id_with_markup(self._tw, self.config, rep)
- reason = rep.wasxfail
- lines.append(f"{markup_word} {nodeid} {reason}")
- def show_skipped(lines: List[str]) -> None:
- skipped: List[CollectReport] = self.stats.get("skipped", [])
- fskips = _folded_skips(self.startpath, skipped) if skipped else []
- if not fskips:
- return
- verbose_word = skipped[0]._get_verbose_word(self.config)
- markup_word = self._tw.markup(
- verbose_word, **{_color_for_type["warnings"]: True}
- )
- prefix = "Skipped: "
- for num, fspath, lineno, reason in fskips:
- if reason.startswith(prefix):
- reason = reason[len(prefix) :]
- if lineno is not None:
- lines.append(
- "%s [%d] %s:%d: %s" % (markup_word, num, fspath, lineno, reason)
- )
- else:
- lines.append("%s [%d] %s: %s" % (markup_word, num, fspath, reason))
- REPORTCHAR_ACTIONS: Mapping[str, Callable[[List[str]], None]] = {
- "x": show_xfailed,
- "X": show_xpassed,
- "f": partial(show_simple, stat="failed"),
- "s": show_skipped,
- "p": partial(show_simple, stat="passed"),
- "E": partial(show_simple, stat="error"),
- }
- lines: List[str] = []
- for char in self.reportchars:
- action = REPORTCHAR_ACTIONS.get(char)
- if action: # skipping e.g. "P" (passed with output) here.
- action(lines)
- if lines:
- self.write_sep("=", "short test summary info", cyan=True, bold=True)
- for line in lines:
- self.write_line(line)
- def _get_main_color(self) -> Tuple[str, List[str]]:
- if self._main_color is None or self._known_types is None or self._is_last_item:
- self._set_main_color()
- assert self._main_color
- assert self._known_types
- return self._main_color, self._known_types
- def _determine_main_color(self, unknown_type_seen: bool) -> str:
- stats = self.stats
- if "failed" in stats or "error" in stats:
- main_color = "red"
- elif "warnings" in stats or "xpassed" in stats or unknown_type_seen:
- main_color = "yellow"
- elif "passed" in stats or not self._is_last_item:
- main_color = "green"
- else:
- main_color = "yellow"
- return main_color
- def _set_main_color(self) -> None:
- unknown_types: List[str] = []
- for found_type in self.stats.keys():
- if found_type: # setup/teardown reports have an empty key, ignore them
- if found_type not in KNOWN_TYPES and found_type not in unknown_types:
- unknown_types.append(found_type)
- self._known_types = list(KNOWN_TYPES) + unknown_types
- self._main_color = self._determine_main_color(bool(unknown_types))
- def build_summary_stats_line(self) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]:
- """
- Build the parts used in the last summary stats line.
- The summary stats line is the line shown at the end, "=== 12 passed, 2 errors in Xs===".
- This function builds a list of the "parts" that make up for the text in that line, in
- the example above it would be:
- [
- ("12 passed", {"green": True}),
- ("2 errors", {"red": True}
- ]
- That last dict for each line is a "markup dictionary", used by TerminalWriter to
- color output.
- The final color of the line is also determined by this function, and is the second
- element of the returned tuple.
- """
- if self.config.getoption("collectonly"):
- return self._build_collect_only_summary_stats_line()
- else:
- return self._build_normal_summary_stats_line()
- def _get_reports_to_display(self, key: str) -> List[Any]:
- """Get test/collection reports for the given status key, such as `passed` or `error`."""
- reports = self.stats.get(key, [])
- return [x for x in reports if getattr(x, "count_towards_summary", True)]
- def _build_normal_summary_stats_line(
- self,
- ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]:
- main_color, known_types = self._get_main_color()
- parts = []
- for key in known_types:
- reports = self._get_reports_to_display(key)
- if reports:
- count = len(reports)
- color = _color_for_type.get(key, _color_for_type_default)
- markup = {color: True, "bold": color == main_color}
- parts.append(("%d %s" % pluralize(count, key), markup))
- if not parts:
- parts = [("no tests ran", {_color_for_type_default: True})]
- return parts, main_color
- def _build_collect_only_summary_stats_line(
- self,
- ) -> Tuple[List[Tuple[str, Dict[str, bool]]], str]:
- deselected = len(self._get_reports_to_display("deselected"))
- errors = len(self._get_reports_to_display("error"))
- if self._numcollected == 0:
- parts = [("no tests collected", {"yellow": True})]
- main_color = "yellow"
- elif deselected == 0:
- main_color = "green"
- collected_output = "%d %s collected" % pluralize(self._numcollected, "test")
- parts = [(collected_output, {main_color: True})]
- else:
- all_tests_were_deselected = self._numcollected == deselected
- if all_tests_were_deselected:
- main_color = "yellow"
- collected_output = f"no tests collected ({deselected} deselected)"
- else:
- main_color = "green"
- selected = self._numcollected - deselected
- collected_output = f"{selected}/{self._numcollected} tests collected ({deselected} deselected)"
- parts = [(collected_output, {main_color: True})]
- if errors:
- main_color = _color_for_type["error"]
- parts += [("%d %s" % pluralize(errors, "error"), {main_color: True})]
- return parts, main_color
- def _get_node_id_with_markup(tw: TerminalWriter, config: Config, rep: BaseReport):
- nodeid = config.cwd_relative_nodeid(rep.nodeid)
- path, *parts = nodeid.split("::")
- if parts:
- parts_markup = tw.markup("::".join(parts), bold=True)
- return path + "::" + parts_markup
- else:
- return path
- def _format_trimmed(format: str, msg: str, available_width: int) -> Optional[str]:
- """Format msg into format, ellipsizing it if doesn't fit in available_width.
- Returns None if even the ellipsis can't fit.
- """
- # Only use the first line.
- i = msg.find("\n")
- if i != -1:
- msg = msg[:i]
- ellipsis = "..."
- format_width = wcswidth(format.format(""))
- if format_width + len(ellipsis) > available_width:
- return None
- if format_width + wcswidth(msg) > available_width:
- available_width -= len(ellipsis)
- msg = msg[:available_width]
- while format_width + wcswidth(msg) > available_width:
- msg = msg[:-1]
- msg += ellipsis
- return format.format(msg)
- def _get_line_with_reprcrash_message(
- config: Config, rep: BaseReport, tw: TerminalWriter, word_markup: Dict[str, bool]
- ) -> str:
- """Get summary line for a report, trying to add reprcrash message."""
- verbose_word = rep._get_verbose_word(config)
- word = tw.markup(verbose_word, **word_markup)
- node = _get_node_id_with_markup(tw, config, rep)
- line = f"{word} {node}"
- line_width = wcswidth(line)
- try:
- # Type ignored intentionally -- possible AttributeError expected.
- msg = rep.longrepr.reprcrash.message # type: ignore[union-attr]
- except AttributeError:
- pass
- else:
- if not running_on_ci():
- available_width = tw.fullwidth - line_width
- msg = _format_trimmed(" - {}", msg, available_width)
- else:
- msg = f" - {msg}"
- if msg is not None:
- line += msg
- return line
- def _folded_skips(
- startpath: Path,
- skipped: Sequence[CollectReport],
- ) -> List[Tuple[int, str, Optional[int], str]]:
- d: Dict[Tuple[str, Optional[int], str], List[CollectReport]] = {}
- for event in skipped:
- assert event.longrepr is not None
- assert isinstance(event.longrepr, tuple), (event, event.longrepr)
- assert len(event.longrepr) == 3, (event, event.longrepr)
- fspath, lineno, reason = event.longrepr
- # For consistency, report all fspaths in relative form.
- fspath = bestrelpath(startpath, Path(fspath))
- keywords = getattr(event, "keywords", {})
- # Folding reports with global pytestmark variable.
- # This is a workaround, because for now we cannot identify the scope of a skip marker
- # TODO: Revisit after marks scope would be fixed.
- if (
- event.when == "setup"
- and "skip" in keywords
- and "pytestmark" not in keywords
- ):
- key: Tuple[str, Optional[int], str] = (fspath, None, reason)
- else:
- key = (fspath, lineno, reason)
- d.setdefault(key, []).append(event)
- values: List[Tuple[int, str, Optional[int], str]] = []
- for key, events in d.items():
- values.append((len(events), *key))
- return values
- _color_for_type = {
- "failed": "red",
- "error": "red",
- "warnings": "yellow",
- "passed": "green",
- }
- _color_for_type_default = "yellow"
- def pluralize(count: int, noun: str) -> Tuple[int, str]:
- # No need to pluralize words such as `failed` or `passed`.
- if noun not in ["error", "warnings", "test"]:
- return count, noun
- # The `warnings` key is plural. To avoid API breakage, we keep it that way but
- # set it to singular here so we can determine plurality in the same way as we do
- # for `error`.
- noun = noun.replace("warnings", "warning")
- return count, noun + "s" if count != 1 else noun
- def _plugin_nameversions(plugininfo) -> List[str]:
- values: List[str] = []
- for plugin, dist in plugininfo:
- # Gets us name and version!
- name = "{dist.project_name}-{dist.version}".format(dist=dist)
- # Questionable convenience, but it keeps things short.
- if name.startswith("pytest-"):
- name = name[7:]
- # We decided to print python package names they can have more than one plugin.
- if name not in values:
- values.append(name)
- return values
- def format_session_duration(seconds: float) -> str:
- """Format the given seconds in a human readable manner to show in the final summary."""
- if seconds < 60:
- return f"{seconds:.2f}s"
- else:
- dt = datetime.timedelta(seconds=int(seconds))
- return f"{seconds:.2f}s ({dt})"
- def _get_raw_skip_reason(report: TestReport) -> str:
- """Get the reason string of a skip/xfail/xpass test report.
- The string is just the part given by the user.
- """
- if hasattr(report, "wasxfail"):
- reason = cast(str, report.wasxfail)
- if reason.startswith("reason: "):
- reason = reason[len("reason: ") :]
- return reason
- else:
- assert report.skipped
- assert isinstance(report.longrepr, tuple)
- _, _, reason = report.longrepr
- if reason.startswith("Skipped: "):
- reason = reason[len("Skipped: ") :]
- elif reason == "Skipped":
- reason = ""
- return reason
|