123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920 |
- """Access and control log capturing."""
- import io
- import logging
- import os
- import re
- from contextlib import contextmanager
- from contextlib import nullcontext
- from datetime import datetime
- from datetime import timedelta
- from datetime import timezone
- from io import StringIO
- from logging import LogRecord
- from pathlib import Path
- from typing import AbstractSet
- from typing import Dict
- from typing import Generator
- from typing import List
- from typing import Mapping
- from typing import Optional
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import TypeVar
- from typing import Union
- from _pytest import nodes
- from _pytest._io import TerminalWriter
- from _pytest.capture import CaptureManager
- from _pytest.compat import final
- from _pytest.config import _strtobool
- from _pytest.config import Config
- from _pytest.config import create_terminal_writer
- from _pytest.config import hookimpl
- from _pytest.config import UsageError
- from _pytest.config.argparsing import Parser
- from _pytest.deprecated import check_ispytest
- from _pytest.fixtures import fixture
- from _pytest.fixtures import FixtureRequest
- from _pytest.main import Session
- from _pytest.stash import StashKey
- from _pytest.terminal import TerminalReporter
- if TYPE_CHECKING:
- logging_StreamHandler = logging.StreamHandler[StringIO]
- from typing_extensions import Literal
- else:
- logging_StreamHandler = logging.StreamHandler
- DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s"
- DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S"
- _ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m")
- caplog_handler_key = StashKey["LogCaptureHandler"]()
- caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]()
- def _remove_ansi_escape_sequences(text: str) -> str:
- return _ANSI_ESCAPE_SEQ.sub("", text)
- class DatetimeFormatter(logging.Formatter):
- """A logging formatter which formats record with
- :func:`datetime.datetime.strftime` formatter instead of
- :func:`time.strftime` in case of microseconds in format string.
- """
- def formatTime(self, record: LogRecord, datefmt=None) -> str:
- if datefmt and "%f" in datefmt:
- ct = self.converter(record.created)
- tz = timezone(timedelta(seconds=ct.tm_gmtoff), ct.tm_zone)
- # Construct `datetime.datetime` object from `struct_time`
- # and msecs information from `record`
- dt = datetime(*ct[0:6], microsecond=round(record.msecs * 1000), tzinfo=tz)
- return dt.strftime(datefmt)
- # Use `logging.Formatter` for non-microsecond formats
- return super().formatTime(record, datefmt)
- class ColoredLevelFormatter(DatetimeFormatter):
- """A logging formatter which colorizes the %(levelname)..s part of the
- log format passed to __init__."""
- LOGLEVEL_COLOROPTS: Mapping[int, AbstractSet[str]] = {
- logging.CRITICAL: {"red"},
- logging.ERROR: {"red", "bold"},
- logging.WARNING: {"yellow"},
- logging.WARN: {"yellow"},
- logging.INFO: {"green"},
- logging.DEBUG: {"purple"},
- logging.NOTSET: set(),
- }
- LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*(?:\.\d+)?s)")
- def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self._terminalwriter = terminalwriter
- self._original_fmt = self._style._fmt
- self._level_to_fmt_mapping: Dict[int, str] = {}
- for level, color_opts in self.LOGLEVEL_COLOROPTS.items():
- self.add_color_level(level, *color_opts)
- def add_color_level(self, level: int, *color_opts: str) -> None:
- """Add or update color opts for a log level.
- :param level:
- Log level to apply a style to, e.g. ``logging.INFO``.
- :param color_opts:
- ANSI escape sequence color options. Capitalized colors indicates
- background color, i.e. ``'green', 'Yellow', 'bold'`` will give bold
- green text on yellow background.
- .. warning::
- This is an experimental API.
- """
- assert self._fmt is not None
- levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt)
- if not levelname_fmt_match:
- return
- levelname_fmt = levelname_fmt_match.group()
- formatted_levelname = levelname_fmt % {"levelname": logging.getLevelName(level)}
- # add ANSI escape sequences around the formatted levelname
- color_kwargs = {name: True for name in color_opts}
- colorized_formatted_levelname = self._terminalwriter.markup(
- formatted_levelname, **color_kwargs
- )
- self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub(
- colorized_formatted_levelname, self._fmt
- )
- def format(self, record: logging.LogRecord) -> str:
- fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt)
- self._style._fmt = fmt
- return super().format(record)
- class PercentStyleMultiline(logging.PercentStyle):
- """A logging style with special support for multiline messages.
- If the message of a record consists of multiple lines, this style
- formats the message as if each line were logged separately.
- """
- def __init__(self, fmt: str, auto_indent: Union[int, str, bool, None]) -> None:
- super().__init__(fmt)
- self._auto_indent = self._get_auto_indent(auto_indent)
- @staticmethod
- def _get_auto_indent(auto_indent_option: Union[int, str, bool, None]) -> int:
- """Determine the current auto indentation setting.
- Specify auto indent behavior (on/off/fixed) by passing in
- extra={"auto_indent": [value]} to the call to logging.log() or
- using a --log-auto-indent [value] command line or the
- log_auto_indent [value] config option.
- Default behavior is auto-indent off.
- Using the string "True" or "on" or the boolean True as the value
- turns auto indent on, using the string "False" or "off" or the
- boolean False or the int 0 turns it off, and specifying a
- positive integer fixes the indentation position to the value
- specified.
- Any other values for the option are invalid, and will silently be
- converted to the default.
- :param None|bool|int|str auto_indent_option:
- User specified option for indentation from command line, config
- or extra kwarg. Accepts int, bool or str. str option accepts the
- same range of values as boolean config options, as well as
- positive integers represented in str form.
- :returns:
- Indentation value, which can be
- -1 (automatically determine indentation) or
- 0 (auto-indent turned off) or
- >0 (explicitly set indentation position).
- """
- if auto_indent_option is None:
- return 0
- elif isinstance(auto_indent_option, bool):
- if auto_indent_option:
- return -1
- else:
- return 0
- elif isinstance(auto_indent_option, int):
- return int(auto_indent_option)
- elif isinstance(auto_indent_option, str):
- try:
- return int(auto_indent_option)
- except ValueError:
- pass
- try:
- if _strtobool(auto_indent_option):
- return -1
- except ValueError:
- return 0
- return 0
- def format(self, record: logging.LogRecord) -> str:
- if "\n" in record.message:
- if hasattr(record, "auto_indent"):
- # Passed in from the "extra={}" kwarg on the call to logging.log().
- auto_indent = self._get_auto_indent(record.auto_indent) # type: ignore[attr-defined]
- else:
- auto_indent = self._auto_indent
- if auto_indent:
- lines = record.message.splitlines()
- formatted = self._fmt % {**record.__dict__, "message": lines[0]}
- if auto_indent < 0:
- indentation = _remove_ansi_escape_sequences(formatted).find(
- lines[0]
- )
- else:
- # Optimizes logging by allowing a fixed indentation.
- indentation = auto_indent
- lines[0] = formatted
- return ("\n" + " " * indentation).join(lines)
- return self._fmt % record.__dict__
- def get_option_ini(config: Config, *names: str):
- for name in names:
- ret = config.getoption(name) # 'default' arg won't work as expected
- if ret is None:
- ret = config.getini(name)
- if ret:
- return ret
- def pytest_addoption(parser: Parser) -> None:
- """Add options to control log capturing."""
- group = parser.getgroup("logging")
- def add_option_ini(option, dest, default=None, type=None, **kwargs):
- parser.addini(
- dest, default=default, type=type, help="Default value for " + option
- )
- group.addoption(option, dest=dest, **kwargs)
- add_option_ini(
- "--log-level",
- dest="log_level",
- default=None,
- metavar="LEVEL",
- help=(
- "Level of messages to catch/display."
- " Not set by default, so it depends on the root/parent log handler's"
- ' effective level, where it is "WARNING" by default.'
- ),
- )
- add_option_ini(
- "--log-format",
- dest="log_format",
- default=DEFAULT_LOG_FORMAT,
- help="Log format used by the logging module",
- )
- add_option_ini(
- "--log-date-format",
- dest="log_date_format",
- default=DEFAULT_LOG_DATE_FORMAT,
- help="Log date format used by the logging module",
- )
- parser.addini(
- "log_cli",
- default=False,
- type="bool",
- help='Enable log display during test run (also known as "live logging")',
- )
- add_option_ini(
- "--log-cli-level", dest="log_cli_level", default=None, help="CLI logging level"
- )
- add_option_ini(
- "--log-cli-format",
- dest="log_cli_format",
- default=None,
- help="Log format used by the logging module",
- )
- add_option_ini(
- "--log-cli-date-format",
- dest="log_cli_date_format",
- default=None,
- help="Log date format used by the logging module",
- )
- add_option_ini(
- "--log-file",
- dest="log_file",
- default=None,
- help="Path to a file when logging will be written to",
- )
- add_option_ini(
- "--log-file-level",
- dest="log_file_level",
- default=None,
- help="Log file logging level",
- )
- add_option_ini(
- "--log-file-format",
- dest="log_file_format",
- default=DEFAULT_LOG_FORMAT,
- help="Log format used by the logging module",
- )
- add_option_ini(
- "--log-file-date-format",
- dest="log_file_date_format",
- default=DEFAULT_LOG_DATE_FORMAT,
- help="Log date format used by the logging module",
- )
- add_option_ini(
- "--log-auto-indent",
- dest="log_auto_indent",
- default=None,
- help="Auto-indent multiline messages passed to the logging module. Accepts true|on, false|off or an integer.",
- )
- group.addoption(
- "--log-disable",
- action="append",
- default=[],
- dest="logger_disable",
- help="Disable a logger by name. Can be passed multiple times.",
- )
- _HandlerType = TypeVar("_HandlerType", bound=logging.Handler)
- # Not using @contextmanager for performance reasons.
- class catching_logs:
- """Context manager that prepares the whole logging machinery properly."""
- __slots__ = ("handler", "level", "orig_level")
- def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None:
- self.handler = handler
- self.level = level
- def __enter__(self):
- root_logger = logging.getLogger()
- if self.level is not None:
- self.handler.setLevel(self.level)
- root_logger.addHandler(self.handler)
- if self.level is not None:
- self.orig_level = root_logger.level
- root_logger.setLevel(min(self.orig_level, self.level))
- return self.handler
- def __exit__(self, type, value, traceback):
- root_logger = logging.getLogger()
- if self.level is not None:
- root_logger.setLevel(self.orig_level)
- root_logger.removeHandler(self.handler)
- class LogCaptureHandler(logging_StreamHandler):
- """A logging handler that stores log records and the log text."""
- def __init__(self) -> None:
- """Create a new log handler."""
- super().__init__(StringIO())
- self.records: List[logging.LogRecord] = []
- def emit(self, record: logging.LogRecord) -> None:
- """Keep the log records in a list in addition to the log text."""
- self.records.append(record)
- super().emit(record)
- def reset(self) -> None:
- self.records = []
- self.stream = StringIO()
- def clear(self) -> None:
- self.records.clear()
- self.stream = StringIO()
- def handleError(self, record: logging.LogRecord) -> None:
- if logging.raiseExceptions:
- # Fail the test if the log message is bad (emit failed).
- # The default behavior of logging is to print "Logging error"
- # to stderr with the call stack and some extra details.
- # pytest wants to make such mistakes visible during testing.
- raise
- @final
- class LogCaptureFixture:
- """Provides access and control of log capturing."""
- def __init__(self, item: nodes.Node, *, _ispytest: bool = False) -> None:
- check_ispytest(_ispytest)
- self._item = item
- self._initial_handler_level: Optional[int] = None
- # Dict of log name -> log level.
- self._initial_logger_levels: Dict[Optional[str], int] = {}
- self._initial_disabled_logging_level: Optional[int] = None
- def _finalize(self) -> None:
- """Finalize the fixture.
- This restores the log levels and the disabled logging levels changed by :meth:`set_level`.
- """
- # Restore log levels.
- if self._initial_handler_level is not None:
- self.handler.setLevel(self._initial_handler_level)
- for logger_name, level in self._initial_logger_levels.items():
- logger = logging.getLogger(logger_name)
- logger.setLevel(level)
- # Disable logging at the original disabled logging level.
- if self._initial_disabled_logging_level is not None:
- logging.disable(self._initial_disabled_logging_level)
- self._initial_disabled_logging_level = None
- @property
- def handler(self) -> LogCaptureHandler:
- """Get the logging handler used by the fixture."""
- return self._item.stash[caplog_handler_key]
- def get_records(
- self, when: "Literal['setup', 'call', 'teardown']"
- ) -> List[logging.LogRecord]:
- """Get the logging records for one of the possible test phases.
- :param when:
- Which test phase to obtain the records from.
- Valid values are: "setup", "call" and "teardown".
- :returns: The list of captured records at the given stage.
- .. versionadded:: 3.4
- """
- return self._item.stash[caplog_records_key].get(when, [])
- @property
- def text(self) -> str:
- """The formatted log text."""
- return _remove_ansi_escape_sequences(self.handler.stream.getvalue())
- @property
- def records(self) -> List[logging.LogRecord]:
- """The list of log records."""
- return self.handler.records
- @property
- def record_tuples(self) -> List[Tuple[str, int, str]]:
- """A list of a stripped down version of log records intended
- for use in assertion comparison.
- The format of the tuple is:
- (logger_name, log_level, message)
- """
- return [(r.name, r.levelno, r.getMessage()) for r in self.records]
- @property
- def messages(self) -> List[str]:
- """A list of format-interpolated log messages.
- Unlike 'records', which contains the format string and parameters for
- interpolation, log messages in this list are all interpolated.
- Unlike 'text', which contains the output from the handler, log
- messages in this list are unadorned with levels, timestamps, etc,
- making exact comparisons more reliable.
- Note that traceback or stack info (from :func:`logging.exception` or
- the `exc_info` or `stack_info` arguments to the logging functions) is
- not included, as this is added by the formatter in the handler.
- .. versionadded:: 3.7
- """
- return [r.getMessage() for r in self.records]
- def clear(self) -> None:
- """Reset the list of log records and the captured log text."""
- self.handler.clear()
- def _force_enable_logging(
- self, level: Union[int, str], logger_obj: logging.Logger
- ) -> int:
- """Enable the desired logging level if the global level was disabled via ``logging.disabled``.
- Only enables logging levels greater than or equal to the requested ``level``.
- Does nothing if the desired ``level`` wasn't disabled.
- :param level:
- The logger level caplog should capture.
- All logging is enabled if a non-standard logging level string is supplied.
- Valid level strings are in :data:`logging._nameToLevel`.
- :param logger_obj: The logger object to check.
- :return: The original disabled logging level.
- """
- original_disable_level: int = logger_obj.manager.disable # type: ignore[attr-defined]
- if isinstance(level, str):
- # Try to translate the level string to an int for `logging.disable()`
- level = logging.getLevelName(level)
- if not isinstance(level, int):
- # The level provided was not valid, so just un-disable all logging.
- logging.disable(logging.NOTSET)
- elif not logger_obj.isEnabledFor(level):
- # Each level is `10` away from other levels.
- # https://docs.python.org/3/library/logging.html#logging-levels
- disable_level = max(level - 10, logging.NOTSET)
- logging.disable(disable_level)
- return original_disable_level
- def set_level(self, level: Union[int, str], logger: Optional[str] = None) -> None:
- """Set the threshold level of a logger for the duration of a test.
- Logging messages which are less severe than this level will not be captured.
- .. versionchanged:: 3.4
- The levels of the loggers changed by this function will be
- restored to their initial values at the end of the test.
- Will enable the requested logging level if it was disabled via :meth:`logging.disable`.
- :param level: The level.
- :param logger: The logger to update. If not given, the root logger.
- """
- logger_obj = logging.getLogger(logger)
- # Save the original log-level to restore it during teardown.
- self._initial_logger_levels.setdefault(logger, logger_obj.level)
- logger_obj.setLevel(level)
- if self._initial_handler_level is None:
- self._initial_handler_level = self.handler.level
- self.handler.setLevel(level)
- initial_disabled_logging_level = self._force_enable_logging(level, logger_obj)
- if self._initial_disabled_logging_level is None:
- self._initial_disabled_logging_level = initial_disabled_logging_level
- @contextmanager
- def at_level(
- self, level: Union[int, str], logger: Optional[str] = None
- ) -> Generator[None, None, None]:
- """Context manager that sets the level for capturing of logs. After
- the end of the 'with' statement the level is restored to its original
- value.
- Will enable the requested logging level if it was disabled via :meth:`logging.disable`.
- :param level: The level.
- :param logger: The logger to update. If not given, the root logger.
- """
- logger_obj = logging.getLogger(logger)
- orig_level = logger_obj.level
- logger_obj.setLevel(level)
- handler_orig_level = self.handler.level
- self.handler.setLevel(level)
- original_disable_level = self._force_enable_logging(level, logger_obj)
- try:
- yield
- finally:
- logger_obj.setLevel(orig_level)
- self.handler.setLevel(handler_orig_level)
- logging.disable(original_disable_level)
- @fixture
- def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture, None, None]:
- """Access and control log capturing.
- Captured logs are available through the following properties/methods::
- * caplog.messages -> list of format-interpolated log messages
- * caplog.text -> string containing formatted log output
- * caplog.records -> list of logging.LogRecord instances
- * caplog.record_tuples -> list of (logger_name, level, message) tuples
- * caplog.clear() -> clear captured records and formatted log output string
- """
- result = LogCaptureFixture(request.node, _ispytest=True)
- yield result
- result._finalize()
- def get_log_level_for_setting(config: Config, *setting_names: str) -> Optional[int]:
- for setting_name in setting_names:
- log_level = config.getoption(setting_name)
- if log_level is None:
- log_level = config.getini(setting_name)
- if log_level:
- break
- else:
- return None
- if isinstance(log_level, str):
- log_level = log_level.upper()
- try:
- return int(getattr(logging, log_level, log_level))
- except ValueError as e:
- # Python logging does not recognise this as a logging level
- raise UsageError(
- "'{}' is not recognized as a logging level name for "
- "'{}'. Please consider passing the "
- "logging level num instead.".format(log_level, setting_name)
- ) from e
- # run after terminalreporter/capturemanager are configured
- @hookimpl(trylast=True)
- def pytest_configure(config: Config) -> None:
- config.pluginmanager.register(LoggingPlugin(config), "logging-plugin")
- class LoggingPlugin:
- """Attaches to the logging module and captures log messages for each test."""
- def __init__(self, config: Config) -> None:
- """Create a new plugin to capture log messages.
- The formatter can be safely shared across all handlers so
- create a single one for the entire test session here.
- """
- self._config = config
- # Report logging.
- self.formatter = self._create_formatter(
- get_option_ini(config, "log_format"),
- get_option_ini(config, "log_date_format"),
- get_option_ini(config, "log_auto_indent"),
- )
- self.log_level = get_log_level_for_setting(config, "log_level")
- self.caplog_handler = LogCaptureHandler()
- self.caplog_handler.setFormatter(self.formatter)
- self.report_handler = LogCaptureHandler()
- self.report_handler.setFormatter(self.formatter)
- # File logging.
- self.log_file_level = get_log_level_for_setting(config, "log_file_level")
- log_file = get_option_ini(config, "log_file") or os.devnull
- if log_file != os.devnull:
- directory = os.path.dirname(os.path.abspath(log_file))
- if not os.path.isdir(directory):
- os.makedirs(directory)
- self.log_file_handler = _FileHandler(log_file, mode="w", encoding="UTF-8")
- log_file_format = get_option_ini(config, "log_file_format", "log_format")
- log_file_date_format = get_option_ini(
- config, "log_file_date_format", "log_date_format"
- )
- log_file_formatter = DatetimeFormatter(
- log_file_format, datefmt=log_file_date_format
- )
- self.log_file_handler.setFormatter(log_file_formatter)
- # CLI/live logging.
- self.log_cli_level = get_log_level_for_setting(
- config, "log_cli_level", "log_level"
- )
- if self._log_cli_enabled():
- terminal_reporter = config.pluginmanager.get_plugin("terminalreporter")
- # Guaranteed by `_log_cli_enabled()`.
- assert terminal_reporter is not None
- capture_manager = config.pluginmanager.get_plugin("capturemanager")
- # if capturemanager plugin is disabled, live logging still works.
- self.log_cli_handler: Union[
- _LiveLoggingStreamHandler, _LiveLoggingNullHandler
- ] = _LiveLoggingStreamHandler(terminal_reporter, capture_manager)
- else:
- self.log_cli_handler = _LiveLoggingNullHandler()
- log_cli_formatter = self._create_formatter(
- get_option_ini(config, "log_cli_format", "log_format"),
- get_option_ini(config, "log_cli_date_format", "log_date_format"),
- get_option_ini(config, "log_auto_indent"),
- )
- self.log_cli_handler.setFormatter(log_cli_formatter)
- self._disable_loggers(loggers_to_disable=config.option.logger_disable)
- def _disable_loggers(self, loggers_to_disable: List[str]) -> None:
- if not loggers_to_disable:
- return
- for name in loggers_to_disable:
- logger = logging.getLogger(name)
- logger.disabled = True
- def _create_formatter(self, log_format, log_date_format, auto_indent):
- # Color option doesn't exist if terminal plugin is disabled.
- color = getattr(self._config.option, "color", "no")
- if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search(
- log_format
- ):
- formatter: logging.Formatter = ColoredLevelFormatter(
- create_terminal_writer(self._config), log_format, log_date_format
- )
- else:
- formatter = DatetimeFormatter(log_format, log_date_format)
- formatter._style = PercentStyleMultiline(
- formatter._style._fmt, auto_indent=auto_indent
- )
- return formatter
- def set_log_path(self, fname: str) -> None:
- """Set the filename parameter for Logging.FileHandler().
- Creates parent directory if it does not exist.
- .. warning::
- This is an experimental API.
- """
- fpath = Path(fname)
- if not fpath.is_absolute():
- fpath = self._config.rootpath / fpath
- if not fpath.parent.exists():
- fpath.parent.mkdir(exist_ok=True, parents=True)
- # https://github.com/python/mypy/issues/11193
- stream: io.TextIOWrapper = fpath.open(mode="w", encoding="UTF-8") # type: ignore[assignment]
- old_stream = self.log_file_handler.setStream(stream)
- if old_stream:
- old_stream.close()
- def _log_cli_enabled(self):
- """Return whether live logging is enabled."""
- enabled = self._config.getoption(
- "--log-cli-level"
- ) is not None or self._config.getini("log_cli")
- if not enabled:
- return False
- terminal_reporter = self._config.pluginmanager.get_plugin("terminalreporter")
- if terminal_reporter is None:
- # terminal reporter is disabled e.g. by pytest-xdist.
- return False
- return True
- @hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_sessionstart(self) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("sessionstart")
- with catching_logs(self.log_cli_handler, level=self.log_cli_level):
- with catching_logs(self.log_file_handler, level=self.log_file_level):
- yield
- @hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_collection(self) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("collection")
- with catching_logs(self.log_cli_handler, level=self.log_cli_level):
- with catching_logs(self.log_file_handler, level=self.log_file_level):
- yield
- @hookimpl(hookwrapper=True)
- def pytest_runtestloop(self, session: Session) -> Generator[None, None, None]:
- if session.config.option.collectonly:
- yield
- return
- if self._log_cli_enabled() and self._config.getoption("verbose") < 1:
- # The verbose flag is needed to avoid messy test progress output.
- self._config.option.verbose = 1
- with catching_logs(self.log_cli_handler, level=self.log_cli_level):
- with catching_logs(self.log_file_handler, level=self.log_file_level):
- yield # Run all the tests.
- @hookimpl
- def pytest_runtest_logstart(self) -> None:
- self.log_cli_handler.reset()
- self.log_cli_handler.set_when("start")
- @hookimpl
- def pytest_runtest_logreport(self) -> None:
- self.log_cli_handler.set_when("logreport")
- def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None, None, None]:
- """Implement the internals of the pytest_runtest_xxx() hooks."""
- with catching_logs(
- self.caplog_handler,
- level=self.log_level,
- ) as caplog_handler, catching_logs(
- self.report_handler,
- level=self.log_level,
- ) as report_handler:
- caplog_handler.reset()
- report_handler.reset()
- item.stash[caplog_records_key][when] = caplog_handler.records
- item.stash[caplog_handler_key] = caplog_handler
- yield
- log = report_handler.stream.getvalue().strip()
- item.add_report_section(when, "log", log)
- @hookimpl(hookwrapper=True)
- def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("setup")
- empty: Dict[str, List[logging.LogRecord]] = {}
- item.stash[caplog_records_key] = empty
- yield from self._runtest_for(item, "setup")
- @hookimpl(hookwrapper=True)
- def pytest_runtest_call(self, item: nodes.Item) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("call")
- yield from self._runtest_for(item, "call")
- @hookimpl(hookwrapper=True)
- def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("teardown")
- yield from self._runtest_for(item, "teardown")
- del item.stash[caplog_records_key]
- del item.stash[caplog_handler_key]
- @hookimpl
- def pytest_runtest_logfinish(self) -> None:
- self.log_cli_handler.set_when("finish")
- @hookimpl(hookwrapper=True, tryfirst=True)
- def pytest_sessionfinish(self) -> Generator[None, None, None]:
- self.log_cli_handler.set_when("sessionfinish")
- with catching_logs(self.log_cli_handler, level=self.log_cli_level):
- with catching_logs(self.log_file_handler, level=self.log_file_level):
- yield
- @hookimpl
- def pytest_unconfigure(self) -> None:
- # Close the FileHandler explicitly.
- # (logging.shutdown might have lost the weakref?!)
- self.log_file_handler.close()
- class _FileHandler(logging.FileHandler):
- """A logging FileHandler with pytest tweaks."""
- def handleError(self, record: logging.LogRecord) -> None:
- # Handled by LogCaptureHandler.
- pass
- class _LiveLoggingStreamHandler(logging_StreamHandler):
- """A logging StreamHandler used by the live logging feature: it will
- write a newline before the first log message in each test.
- During live logging we must also explicitly disable stdout/stderr
- capturing otherwise it will get captured and won't appear in the
- terminal.
- """
- # Officially stream needs to be a IO[str], but TerminalReporter
- # isn't. So force it.
- stream: TerminalReporter = None # type: ignore
- def __init__(
- self,
- terminal_reporter: TerminalReporter,
- capture_manager: Optional[CaptureManager],
- ) -> None:
- super().__init__(stream=terminal_reporter) # type: ignore[arg-type]
- self.capture_manager = capture_manager
- self.reset()
- self.set_when(None)
- self._test_outcome_written = False
- def reset(self) -> None:
- """Reset the handler; should be called before the start of each test."""
- self._first_record_emitted = False
- def set_when(self, when: Optional[str]) -> None:
- """Prepare for the given test phase (setup/call/teardown)."""
- self._when = when
- self._section_name_shown = False
- if when == "start":
- self._test_outcome_written = False
- def emit(self, record: logging.LogRecord) -> None:
- ctx_manager = (
- self.capture_manager.global_and_fixture_disabled()
- if self.capture_manager
- else nullcontext()
- )
- with ctx_manager:
- if not self._first_record_emitted:
- self.stream.write("\n")
- self._first_record_emitted = True
- elif self._when in ("teardown", "finish"):
- if not self._test_outcome_written:
- self._test_outcome_written = True
- self.stream.write("\n")
- if not self._section_name_shown and self._when:
- self.stream.section("live log " + self._when, sep="-", bold=True)
- self._section_name_shown = True
- super().emit(record)
- def handleError(self, record: logging.LogRecord) -> None:
- # Handled by LogCaptureHandler.
- pass
- class _LiveLoggingNullHandler(logging.NullHandler):
- """A logging handler used when live logging is disabled."""
- def reset(self) -> None:
- pass
- def set_when(self, when: str) -> None:
- pass
- def handleError(self, record: logging.LogRecord) -> None:
- # Handled by LogCaptureHandler.
- pass
|