capture.py 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. """Per-test stdout/stderr capturing mechanism."""
  2. import abc
  3. import collections
  4. import contextlib
  5. import io
  6. import os
  7. import sys
  8. from io import UnsupportedOperation
  9. from tempfile import TemporaryFile
  10. from types import TracebackType
  11. from typing import Any
  12. from typing import AnyStr
  13. from typing import BinaryIO
  14. from typing import Generator
  15. from typing import Generic
  16. from typing import Iterable
  17. from typing import Iterator
  18. from typing import List
  19. from typing import NamedTuple
  20. from typing import Optional
  21. from typing import TextIO
  22. from typing import Tuple
  23. from typing import Type
  24. from typing import TYPE_CHECKING
  25. from typing import Union
  26. from _pytest.compat import final
  27. from _pytest.config import Config
  28. from _pytest.config import hookimpl
  29. from _pytest.config.argparsing import Parser
  30. from _pytest.deprecated import check_ispytest
  31. from _pytest.fixtures import fixture
  32. from _pytest.fixtures import SubRequest
  33. from _pytest.nodes import Collector
  34. from _pytest.nodes import File
  35. from _pytest.nodes import Item
  36. if TYPE_CHECKING:
  37. from typing_extensions import Final
  38. from typing_extensions import Literal
  39. _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]
  40. def pytest_addoption(parser: Parser) -> None:
  41. group = parser.getgroup("general")
  42. group._addoption(
  43. "--capture",
  44. action="store",
  45. default="fd",
  46. metavar="method",
  47. choices=["fd", "sys", "no", "tee-sys"],
  48. help="Per-test capturing method: one of fd|sys|no|tee-sys",
  49. )
  50. group._addoption(
  51. "-s",
  52. action="store_const",
  53. const="no",
  54. dest="capture",
  55. help="Shortcut for --capture=no",
  56. )
  57. def _colorama_workaround() -> None:
  58. """Ensure colorama is imported so that it attaches to the correct stdio
  59. handles on Windows.
  60. colorama uses the terminal on import time. So if something does the
  61. first import of colorama while I/O capture is active, colorama will
  62. fail in various ways.
  63. """
  64. if sys.platform.startswith("win32"):
  65. try:
  66. import colorama # noqa: F401
  67. except ImportError:
  68. pass
  69. def _windowsconsoleio_workaround(stream: TextIO) -> None:
  70. """Workaround for Windows Unicode console handling.
  71. Python 3.6 implemented Unicode console handling for Windows. This works
  72. by reading/writing to the raw console handle using
  73. ``{Read,Write}ConsoleW``.
  74. The problem is that we are going to ``dup2`` over the stdio file
  75. descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
  76. handles used by Python to write to the console. Though there is still some
  77. weirdness and the console handle seems to only be closed randomly and not
  78. on the first call to ``CloseHandle``, or maybe it gets reopened with the
  79. same handle value when we suspend capturing.
  80. The workaround in this case will reopen stdio with a different fd which
  81. also means a different handle by replicating the logic in
  82. "Py_lifecycle.c:initstdio/create_stdio".
  83. :param stream:
  84. In practice ``sys.stdout`` or ``sys.stderr``, but given
  85. here as parameter for unittesting purposes.
  86. See https://github.com/pytest-dev/py/issues/103.
  87. """
  88. if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"):
  89. return
  90. # Bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666).
  91. if not hasattr(stream, "buffer"): # type: ignore[unreachable]
  92. return
  93. buffered = hasattr(stream.buffer, "raw")
  94. raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined]
  95. if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined]
  96. return
  97. def _reopen_stdio(f, mode):
  98. if not buffered and mode[0] == "w":
  99. buffering = 0
  100. else:
  101. buffering = -1
  102. return io.TextIOWrapper(
  103. open(os.dup(f.fileno()), mode, buffering),
  104. f.encoding,
  105. f.errors,
  106. f.newlines,
  107. f.line_buffering,
  108. )
  109. sys.stdin = _reopen_stdio(sys.stdin, "rb")
  110. sys.stdout = _reopen_stdio(sys.stdout, "wb")
  111. sys.stderr = _reopen_stdio(sys.stderr, "wb")
  112. @hookimpl(hookwrapper=True)
  113. def pytest_load_initial_conftests(early_config: Config):
  114. ns = early_config.known_args_namespace
  115. if ns.capture == "fd":
  116. _windowsconsoleio_workaround(sys.stdout)
  117. _colorama_workaround()
  118. pluginmanager = early_config.pluginmanager
  119. capman = CaptureManager(ns.capture)
  120. pluginmanager.register(capman, "capturemanager")
  121. # Make sure that capturemanager is properly reset at final shutdown.
  122. early_config.add_cleanup(capman.stop_global_capturing)
  123. # Finally trigger conftest loading but while capturing (issue #93).
  124. capman.start_global_capturing()
  125. outcome = yield
  126. capman.suspend_global_capture()
  127. if outcome.excinfo is not None:
  128. out, err = capman.read_global_capture()
  129. sys.stdout.write(out)
  130. sys.stderr.write(err)
  131. # IO Helpers.
  132. class EncodedFile(io.TextIOWrapper):
  133. __slots__ = ()
  134. @property
  135. def name(self) -> str:
  136. # Ensure that file.name is a string. Workaround for a Python bug
  137. # fixed in >=3.7.4: https://bugs.python.org/issue36015
  138. return repr(self.buffer)
  139. @property
  140. def mode(self) -> str:
  141. # TextIOWrapper doesn't expose a mode, but at least some of our
  142. # tests check it.
  143. return self.buffer.mode.replace("b", "")
  144. class CaptureIO(io.TextIOWrapper):
  145. def __init__(self) -> None:
  146. super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True)
  147. def getvalue(self) -> str:
  148. assert isinstance(self.buffer, io.BytesIO)
  149. return self.buffer.getvalue().decode("UTF-8")
  150. class TeeCaptureIO(CaptureIO):
  151. def __init__(self, other: TextIO) -> None:
  152. self._other = other
  153. super().__init__()
  154. def write(self, s: str) -> int:
  155. super().write(s)
  156. return self._other.write(s)
  157. class DontReadFromInput(TextIO):
  158. @property
  159. def encoding(self) -> str:
  160. return sys.__stdin__.encoding
  161. def read(self, size: int = -1) -> str:
  162. raise OSError(
  163. "pytest: reading from stdin while output is captured! Consider using `-s`."
  164. )
  165. readline = read
  166. def __next__(self) -> str:
  167. return self.readline()
  168. def readlines(self, hint: Optional[int] = -1) -> List[str]:
  169. raise OSError(
  170. "pytest: reading from stdin while output is captured! Consider using `-s`."
  171. )
  172. def __iter__(self) -> Iterator[str]:
  173. return self
  174. def fileno(self) -> int:
  175. raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
  176. def flush(self) -> None:
  177. raise UnsupportedOperation("redirected stdin is pseudofile, has no flush()")
  178. def isatty(self) -> bool:
  179. return False
  180. def close(self) -> None:
  181. pass
  182. def readable(self) -> bool:
  183. return False
  184. def seek(self, offset: int, whence: int = 0) -> int:
  185. raise UnsupportedOperation("redirected stdin is pseudofile, has no seek(int)")
  186. def seekable(self) -> bool:
  187. return False
  188. def tell(self) -> int:
  189. raise UnsupportedOperation("redirected stdin is pseudofile, has no tell()")
  190. def truncate(self, size: Optional[int] = None) -> int:
  191. raise UnsupportedOperation("cannot truncate stdin")
  192. def write(self, data: str) -> int:
  193. raise UnsupportedOperation("cannot write to stdin")
  194. def writelines(self, lines: Iterable[str]) -> None:
  195. raise UnsupportedOperation("Cannot write to stdin")
  196. def writable(self) -> bool:
  197. return False
  198. def __enter__(self) -> "DontReadFromInput":
  199. return self
  200. def __exit__(
  201. self,
  202. type: Optional[Type[BaseException]],
  203. value: Optional[BaseException],
  204. traceback: Optional[TracebackType],
  205. ) -> None:
  206. pass
  207. @property
  208. def buffer(self) -> BinaryIO:
  209. # The str/bytes doesn't actually matter in this type, so OK to fake.
  210. return self # type: ignore[return-value]
  211. # Capture classes.
  212. class CaptureBase(abc.ABC, Generic[AnyStr]):
  213. EMPTY_BUFFER: AnyStr
  214. @abc.abstractmethod
  215. def __init__(self, fd: int) -> None:
  216. raise NotImplementedError()
  217. @abc.abstractmethod
  218. def start(self) -> None:
  219. raise NotImplementedError()
  220. @abc.abstractmethod
  221. def done(self) -> None:
  222. raise NotImplementedError()
  223. @abc.abstractmethod
  224. def suspend(self) -> None:
  225. raise NotImplementedError()
  226. @abc.abstractmethod
  227. def resume(self) -> None:
  228. raise NotImplementedError()
  229. @abc.abstractmethod
  230. def writeorg(self, data: AnyStr) -> None:
  231. raise NotImplementedError()
  232. @abc.abstractmethod
  233. def snap(self) -> AnyStr:
  234. raise NotImplementedError()
  235. patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
  236. class NoCapture(CaptureBase[str]):
  237. EMPTY_BUFFER = ""
  238. def __init__(self, fd: int) -> None:
  239. pass
  240. def start(self) -> None:
  241. pass
  242. def done(self) -> None:
  243. pass
  244. def suspend(self) -> None:
  245. pass
  246. def resume(self) -> None:
  247. pass
  248. def snap(self) -> str:
  249. return ""
  250. def writeorg(self, data: str) -> None:
  251. pass
  252. class SysCaptureBase(CaptureBase[AnyStr]):
  253. def __init__(
  254. self, fd: int, tmpfile: Optional[TextIO] = None, *, tee: bool = False
  255. ) -> None:
  256. name = patchsysdict[fd]
  257. self._old: TextIO = getattr(sys, name)
  258. self.name = name
  259. if tmpfile is None:
  260. if name == "stdin":
  261. tmpfile = DontReadFromInput()
  262. else:
  263. tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old)
  264. self.tmpfile = tmpfile
  265. self._state = "initialized"
  266. def repr(self, class_name: str) -> str:
  267. return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
  268. class_name,
  269. self.name,
  270. hasattr(self, "_old") and repr(self._old) or "<UNSET>",
  271. self._state,
  272. self.tmpfile,
  273. )
  274. def __repr__(self) -> str:
  275. return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
  276. self.__class__.__name__,
  277. self.name,
  278. hasattr(self, "_old") and repr(self._old) or "<UNSET>",
  279. self._state,
  280. self.tmpfile,
  281. )
  282. def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
  283. assert (
  284. self._state in states
  285. ), "cannot {} in state {!r}: expected one of {}".format(
  286. op, self._state, ", ".join(states)
  287. )
  288. def start(self) -> None:
  289. self._assert_state("start", ("initialized",))
  290. setattr(sys, self.name, self.tmpfile)
  291. self._state = "started"
  292. def done(self) -> None:
  293. self._assert_state("done", ("initialized", "started", "suspended", "done"))
  294. if self._state == "done":
  295. return
  296. setattr(sys, self.name, self._old)
  297. del self._old
  298. self.tmpfile.close()
  299. self._state = "done"
  300. def suspend(self) -> None:
  301. self._assert_state("suspend", ("started", "suspended"))
  302. setattr(sys, self.name, self._old)
  303. self._state = "suspended"
  304. def resume(self) -> None:
  305. self._assert_state("resume", ("started", "suspended"))
  306. if self._state == "started":
  307. return
  308. setattr(sys, self.name, self.tmpfile)
  309. self._state = "started"
  310. class SysCaptureBinary(SysCaptureBase[bytes]):
  311. EMPTY_BUFFER = b""
  312. def snap(self) -> bytes:
  313. self._assert_state("snap", ("started", "suspended"))
  314. self.tmpfile.seek(0)
  315. res = self.tmpfile.buffer.read()
  316. self.tmpfile.seek(0)
  317. self.tmpfile.truncate()
  318. return res
  319. def writeorg(self, data: bytes) -> None:
  320. self._assert_state("writeorg", ("started", "suspended"))
  321. self._old.flush()
  322. self._old.buffer.write(data)
  323. self._old.buffer.flush()
  324. class SysCapture(SysCaptureBase[str]):
  325. EMPTY_BUFFER = ""
  326. def snap(self) -> str:
  327. self._assert_state("snap", ("started", "suspended"))
  328. assert isinstance(self.tmpfile, CaptureIO)
  329. res = self.tmpfile.getvalue()
  330. self.tmpfile.seek(0)
  331. self.tmpfile.truncate()
  332. return res
  333. def writeorg(self, data: str) -> None:
  334. self._assert_state("writeorg", ("started", "suspended"))
  335. self._old.write(data)
  336. self._old.flush()
  337. class FDCaptureBase(CaptureBase[AnyStr]):
  338. def __init__(self, targetfd: int) -> None:
  339. self.targetfd = targetfd
  340. try:
  341. os.fstat(targetfd)
  342. except OSError:
  343. # FD capturing is conceptually simple -- create a temporary file,
  344. # redirect the FD to it, redirect back when done. But when the
  345. # target FD is invalid it throws a wrench into this lovely scheme.
  346. #
  347. # Tests themselves shouldn't care if the FD is valid, FD capturing
  348. # should work regardless of external circumstances. So falling back
  349. # to just sys capturing is not a good option.
  350. #
  351. # Further complications are the need to support suspend() and the
  352. # possibility of FD reuse (e.g. the tmpfile getting the very same
  353. # target FD). The following approach is robust, I believe.
  354. self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR)
  355. os.dup2(self.targetfd_invalid, targetfd)
  356. else:
  357. self.targetfd_invalid = None
  358. self.targetfd_save = os.dup(targetfd)
  359. if targetfd == 0:
  360. self.tmpfile = open(os.devnull, encoding="utf-8")
  361. self.syscapture: CaptureBase[str] = SysCapture(targetfd)
  362. else:
  363. self.tmpfile = EncodedFile(
  364. TemporaryFile(buffering=0),
  365. encoding="utf-8",
  366. errors="replace",
  367. newline="",
  368. write_through=True,
  369. )
  370. if targetfd in patchsysdict:
  371. self.syscapture = SysCapture(targetfd, self.tmpfile)
  372. else:
  373. self.syscapture = NoCapture(targetfd)
  374. self._state = "initialized"
  375. def __repr__(self) -> str:
  376. return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
  377. self.__class__.__name__,
  378. self.targetfd,
  379. self.targetfd_save,
  380. self._state,
  381. self.tmpfile,
  382. )
  383. def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
  384. assert (
  385. self._state in states
  386. ), "cannot {} in state {!r}: expected one of {}".format(
  387. op, self._state, ", ".join(states)
  388. )
  389. def start(self) -> None:
  390. """Start capturing on targetfd using memorized tmpfile."""
  391. self._assert_state("start", ("initialized",))
  392. os.dup2(self.tmpfile.fileno(), self.targetfd)
  393. self.syscapture.start()
  394. self._state = "started"
  395. def done(self) -> None:
  396. """Stop capturing, restore streams, return original capture file,
  397. seeked to position zero."""
  398. self._assert_state("done", ("initialized", "started", "suspended", "done"))
  399. if self._state == "done":
  400. return
  401. os.dup2(self.targetfd_save, self.targetfd)
  402. os.close(self.targetfd_save)
  403. if self.targetfd_invalid is not None:
  404. if self.targetfd_invalid != self.targetfd:
  405. os.close(self.targetfd)
  406. os.close(self.targetfd_invalid)
  407. self.syscapture.done()
  408. self.tmpfile.close()
  409. self._state = "done"
  410. def suspend(self) -> None:
  411. self._assert_state("suspend", ("started", "suspended"))
  412. if self._state == "suspended":
  413. return
  414. self.syscapture.suspend()
  415. os.dup2(self.targetfd_save, self.targetfd)
  416. self._state = "suspended"
  417. def resume(self) -> None:
  418. self._assert_state("resume", ("started", "suspended"))
  419. if self._state == "started":
  420. return
  421. self.syscapture.resume()
  422. os.dup2(self.tmpfile.fileno(), self.targetfd)
  423. self._state = "started"
  424. class FDCaptureBinary(FDCaptureBase[bytes]):
  425. """Capture IO to/from a given OS-level file descriptor.
  426. snap() produces `bytes`.
  427. """
  428. EMPTY_BUFFER = b""
  429. def snap(self) -> bytes:
  430. self._assert_state("snap", ("started", "suspended"))
  431. self.tmpfile.seek(0)
  432. res = self.tmpfile.buffer.read()
  433. self.tmpfile.seek(0)
  434. self.tmpfile.truncate()
  435. return res
  436. def writeorg(self, data: bytes) -> None:
  437. """Write to original file descriptor."""
  438. self._assert_state("writeorg", ("started", "suspended"))
  439. os.write(self.targetfd_save, data)
  440. class FDCapture(FDCaptureBase[str]):
  441. """Capture IO to/from a given OS-level file descriptor.
  442. snap() produces text.
  443. """
  444. EMPTY_BUFFER = ""
  445. def snap(self) -> str:
  446. self._assert_state("snap", ("started", "suspended"))
  447. self.tmpfile.seek(0)
  448. res = self.tmpfile.read()
  449. self.tmpfile.seek(0)
  450. self.tmpfile.truncate()
  451. return res
  452. def writeorg(self, data: str) -> None:
  453. """Write to original file descriptor."""
  454. self._assert_state("writeorg", ("started", "suspended"))
  455. # XXX use encoding of original stream
  456. os.write(self.targetfd_save, data.encode("utf-8"))
  457. # MultiCapture
  458. # Generic NamedTuple only supported since Python 3.11.
  459. if sys.version_info >= (3, 11) or TYPE_CHECKING:
  460. @final
  461. class CaptureResult(NamedTuple, Generic[AnyStr]):
  462. """The result of :method:`CaptureFixture.readouterr`."""
  463. out: AnyStr
  464. err: AnyStr
  465. else:
  466. class CaptureResult(
  467. collections.namedtuple("CaptureResult", ["out", "err"]), Generic[AnyStr]
  468. ):
  469. """The result of :method:`CaptureFixture.readouterr`."""
  470. __slots__ = ()
  471. class MultiCapture(Generic[AnyStr]):
  472. _state = None
  473. _in_suspended = False
  474. def __init__(
  475. self,
  476. in_: Optional[CaptureBase[AnyStr]],
  477. out: Optional[CaptureBase[AnyStr]],
  478. err: Optional[CaptureBase[AnyStr]],
  479. ) -> None:
  480. self.in_: Optional[CaptureBase[AnyStr]] = in_
  481. self.out: Optional[CaptureBase[AnyStr]] = out
  482. self.err: Optional[CaptureBase[AnyStr]] = err
  483. def __repr__(self) -> str:
  484. return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
  485. self.out,
  486. self.err,
  487. self.in_,
  488. self._state,
  489. self._in_suspended,
  490. )
  491. def start_capturing(self) -> None:
  492. self._state = "started"
  493. if self.in_:
  494. self.in_.start()
  495. if self.out:
  496. self.out.start()
  497. if self.err:
  498. self.err.start()
  499. def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]:
  500. """Pop current snapshot out/err capture and flush to orig streams."""
  501. out, err = self.readouterr()
  502. if out:
  503. assert self.out is not None
  504. self.out.writeorg(out)
  505. if err:
  506. assert self.err is not None
  507. self.err.writeorg(err)
  508. return out, err
  509. def suspend_capturing(self, in_: bool = False) -> None:
  510. self._state = "suspended"
  511. if self.out:
  512. self.out.suspend()
  513. if self.err:
  514. self.err.suspend()
  515. if in_ and self.in_:
  516. self.in_.suspend()
  517. self._in_suspended = True
  518. def resume_capturing(self) -> None:
  519. self._state = "started"
  520. if self.out:
  521. self.out.resume()
  522. if self.err:
  523. self.err.resume()
  524. if self._in_suspended:
  525. assert self.in_ is not None
  526. self.in_.resume()
  527. self._in_suspended = False
  528. def stop_capturing(self) -> None:
  529. """Stop capturing and reset capturing streams."""
  530. if self._state == "stopped":
  531. raise ValueError("was already stopped")
  532. self._state = "stopped"
  533. if self.out:
  534. self.out.done()
  535. if self.err:
  536. self.err.done()
  537. if self.in_:
  538. self.in_.done()
  539. def is_started(self) -> bool:
  540. """Whether actively capturing -- not suspended or stopped."""
  541. return self._state == "started"
  542. def readouterr(self) -> CaptureResult[AnyStr]:
  543. out = self.out.snap() if self.out else ""
  544. err = self.err.snap() if self.err else ""
  545. # TODO: This type error is real, need to fix.
  546. return CaptureResult(out, err) # type: ignore[arg-type]
  547. def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]:
  548. if method == "fd":
  549. return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2))
  550. elif method == "sys":
  551. return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2))
  552. elif method == "no":
  553. return MultiCapture(in_=None, out=None, err=None)
  554. elif method == "tee-sys":
  555. return MultiCapture(
  556. in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True)
  557. )
  558. raise ValueError(f"unknown capturing method: {method!r}")
  559. # CaptureManager and CaptureFixture
  560. class CaptureManager:
  561. """The capture plugin.
  562. Manages that the appropriate capture method is enabled/disabled during
  563. collection and each test phase (setup, call, teardown). After each of
  564. those points, the captured output is obtained and attached to the
  565. collection/runtest report.
  566. There are two levels of capture:
  567. * global: enabled by default and can be suppressed by the ``-s``
  568. option. This is always enabled/disabled during collection and each test
  569. phase.
  570. * fixture: when a test function or one of its fixture depend on the
  571. ``capsys`` or ``capfd`` fixtures. In this case special handling is
  572. needed to ensure the fixtures take precedence over the global capture.
  573. """
  574. def __init__(self, method: "_CaptureMethod") -> None:
  575. self._method: Final = method
  576. self._global_capturing: Optional[MultiCapture[str]] = None
  577. self._capture_fixture: Optional[CaptureFixture[Any]] = None
  578. def __repr__(self) -> str:
  579. return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
  580. self._method, self._global_capturing, self._capture_fixture
  581. )
  582. def is_capturing(self) -> Union[str, bool]:
  583. if self.is_globally_capturing():
  584. return "global"
  585. if self._capture_fixture:
  586. return "fixture %s" % self._capture_fixture.request.fixturename
  587. return False
  588. # Global capturing control
  589. def is_globally_capturing(self) -> bool:
  590. return self._method != "no"
  591. def start_global_capturing(self) -> None:
  592. assert self._global_capturing is None
  593. self._global_capturing = _get_multicapture(self._method)
  594. self._global_capturing.start_capturing()
  595. def stop_global_capturing(self) -> None:
  596. if self._global_capturing is not None:
  597. self._global_capturing.pop_outerr_to_orig()
  598. self._global_capturing.stop_capturing()
  599. self._global_capturing = None
  600. def resume_global_capture(self) -> None:
  601. # During teardown of the python process, and on rare occasions, capture
  602. # attributes can be `None` while trying to resume global capture.
  603. if self._global_capturing is not None:
  604. self._global_capturing.resume_capturing()
  605. def suspend_global_capture(self, in_: bool = False) -> None:
  606. if self._global_capturing is not None:
  607. self._global_capturing.suspend_capturing(in_=in_)
  608. def suspend(self, in_: bool = False) -> None:
  609. # Need to undo local capsys-et-al if it exists before disabling global capture.
  610. self.suspend_fixture()
  611. self.suspend_global_capture(in_)
  612. def resume(self) -> None:
  613. self.resume_global_capture()
  614. self.resume_fixture()
  615. def read_global_capture(self) -> CaptureResult[str]:
  616. assert self._global_capturing is not None
  617. return self._global_capturing.readouterr()
  618. # Fixture Control
  619. def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None:
  620. if self._capture_fixture:
  621. current_fixture = self._capture_fixture.request.fixturename
  622. requested_fixture = capture_fixture.request.fixturename
  623. capture_fixture.request.raiseerror(
  624. "cannot use {} and {} at the same time".format(
  625. requested_fixture, current_fixture
  626. )
  627. )
  628. self._capture_fixture = capture_fixture
  629. def unset_fixture(self) -> None:
  630. self._capture_fixture = None
  631. def activate_fixture(self) -> None:
  632. """If the current item is using ``capsys`` or ``capfd``, activate
  633. them so they take precedence over the global capture."""
  634. if self._capture_fixture:
  635. self._capture_fixture._start()
  636. def deactivate_fixture(self) -> None:
  637. """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any."""
  638. if self._capture_fixture:
  639. self._capture_fixture.close()
  640. def suspend_fixture(self) -> None:
  641. if self._capture_fixture:
  642. self._capture_fixture._suspend()
  643. def resume_fixture(self) -> None:
  644. if self._capture_fixture:
  645. self._capture_fixture._resume()
  646. # Helper context managers
  647. @contextlib.contextmanager
  648. def global_and_fixture_disabled(self) -> Generator[None, None, None]:
  649. """Context manager to temporarily disable global and current fixture capturing."""
  650. do_fixture = self._capture_fixture and self._capture_fixture._is_started()
  651. if do_fixture:
  652. self.suspend_fixture()
  653. do_global = self._global_capturing and self._global_capturing.is_started()
  654. if do_global:
  655. self.suspend_global_capture()
  656. try:
  657. yield
  658. finally:
  659. if do_global:
  660. self.resume_global_capture()
  661. if do_fixture:
  662. self.resume_fixture()
  663. @contextlib.contextmanager
  664. def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
  665. self.resume_global_capture()
  666. self.activate_fixture()
  667. try:
  668. yield
  669. finally:
  670. self.deactivate_fixture()
  671. self.suspend_global_capture(in_=False)
  672. out, err = self.read_global_capture()
  673. item.add_report_section(when, "stdout", out)
  674. item.add_report_section(when, "stderr", err)
  675. # Hooks
  676. @hookimpl(hookwrapper=True)
  677. def pytest_make_collect_report(self, collector: Collector):
  678. if isinstance(collector, File):
  679. self.resume_global_capture()
  680. outcome = yield
  681. self.suspend_global_capture()
  682. out, err = self.read_global_capture()
  683. rep = outcome.get_result()
  684. if out:
  685. rep.sections.append(("Captured stdout", out))
  686. if err:
  687. rep.sections.append(("Captured stderr", err))
  688. else:
  689. yield
  690. @hookimpl(hookwrapper=True)
  691. def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]:
  692. with self.item_capture("setup", item):
  693. yield
  694. @hookimpl(hookwrapper=True)
  695. def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]:
  696. with self.item_capture("call", item):
  697. yield
  698. @hookimpl(hookwrapper=True)
  699. def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]:
  700. with self.item_capture("teardown", item):
  701. yield
  702. @hookimpl(tryfirst=True)
  703. def pytest_keyboard_interrupt(self) -> None:
  704. self.stop_global_capturing()
  705. @hookimpl(tryfirst=True)
  706. def pytest_internalerror(self) -> None:
  707. self.stop_global_capturing()
  708. class CaptureFixture(Generic[AnyStr]):
  709. """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`,
  710. :fixture:`capfd` and :fixture:`capfdbinary` fixtures."""
  711. def __init__(
  712. self,
  713. captureclass: Type[CaptureBase[AnyStr]],
  714. request: SubRequest,
  715. *,
  716. _ispytest: bool = False,
  717. ) -> None:
  718. check_ispytest(_ispytest)
  719. self.captureclass: Type[CaptureBase[AnyStr]] = captureclass
  720. self.request = request
  721. self._capture: Optional[MultiCapture[AnyStr]] = None
  722. self._captured_out: AnyStr = self.captureclass.EMPTY_BUFFER
  723. self._captured_err: AnyStr = self.captureclass.EMPTY_BUFFER
  724. def _start(self) -> None:
  725. if self._capture is None:
  726. self._capture = MultiCapture(
  727. in_=None,
  728. out=self.captureclass(1),
  729. err=self.captureclass(2),
  730. )
  731. self._capture.start_capturing()
  732. def close(self) -> None:
  733. if self._capture is not None:
  734. out, err = self._capture.pop_outerr_to_orig()
  735. self._captured_out += out
  736. self._captured_err += err
  737. self._capture.stop_capturing()
  738. self._capture = None
  739. def readouterr(self) -> CaptureResult[AnyStr]:
  740. """Read and return the captured output so far, resetting the internal
  741. buffer.
  742. :returns:
  743. The captured content as a namedtuple with ``out`` and ``err``
  744. string attributes.
  745. """
  746. captured_out, captured_err = self._captured_out, self._captured_err
  747. if self._capture is not None:
  748. out, err = self._capture.readouterr()
  749. captured_out += out
  750. captured_err += err
  751. self._captured_out = self.captureclass.EMPTY_BUFFER
  752. self._captured_err = self.captureclass.EMPTY_BUFFER
  753. return CaptureResult(captured_out, captured_err)
  754. def _suspend(self) -> None:
  755. """Suspend this fixture's own capturing temporarily."""
  756. if self._capture is not None:
  757. self._capture.suspend_capturing()
  758. def _resume(self) -> None:
  759. """Resume this fixture's own capturing temporarily."""
  760. if self._capture is not None:
  761. self._capture.resume_capturing()
  762. def _is_started(self) -> bool:
  763. """Whether actively capturing -- not disabled or closed."""
  764. if self._capture is not None:
  765. return self._capture.is_started()
  766. return False
  767. @contextlib.contextmanager
  768. def disabled(self) -> Generator[None, None, None]:
  769. """Temporarily disable capturing while inside the ``with`` block."""
  770. capmanager: CaptureManager = self.request.config.pluginmanager.getplugin(
  771. "capturemanager"
  772. )
  773. with capmanager.global_and_fixture_disabled():
  774. yield
  775. # The fixtures.
  776. @fixture
  777. def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
  778. r"""Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
  779. The captured output is made available via ``capsys.readouterr()`` method
  780. calls, which return a ``(out, err)`` namedtuple.
  781. ``out`` and ``err`` will be ``text`` objects.
  782. Returns an instance of :class:`CaptureFixture[str] <pytest.CaptureFixture>`.
  783. Example:
  784. .. code-block:: python
  785. def test_output(capsys):
  786. print("hello")
  787. captured = capsys.readouterr()
  788. assert captured.out == "hello\n"
  789. """
  790. capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager")
  791. capture_fixture = CaptureFixture(SysCapture, request, _ispytest=True)
  792. capman.set_fixture(capture_fixture)
  793. capture_fixture._start()
  794. yield capture_fixture
  795. capture_fixture.close()
  796. capman.unset_fixture()
  797. @fixture
  798. def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
  799. r"""Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
  800. The captured output is made available via ``capsysbinary.readouterr()``
  801. method calls, which return a ``(out, err)`` namedtuple.
  802. ``out`` and ``err`` will be ``bytes`` objects.
  803. Returns an instance of :class:`CaptureFixture[bytes] <pytest.CaptureFixture>`.
  804. Example:
  805. .. code-block:: python
  806. def test_output(capsysbinary):
  807. print("hello")
  808. captured = capsysbinary.readouterr()
  809. assert captured.out == b"hello\n"
  810. """
  811. capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager")
  812. capture_fixture = CaptureFixture(SysCaptureBinary, request, _ispytest=True)
  813. capman.set_fixture(capture_fixture)
  814. capture_fixture._start()
  815. yield capture_fixture
  816. capture_fixture.close()
  817. capman.unset_fixture()
  818. @fixture
  819. def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
  820. r"""Enable text capturing of writes to file descriptors ``1`` and ``2``.
  821. The captured output is made available via ``capfd.readouterr()`` method
  822. calls, which return a ``(out, err)`` namedtuple.
  823. ``out`` and ``err`` will be ``text`` objects.
  824. Returns an instance of :class:`CaptureFixture[str] <pytest.CaptureFixture>`.
  825. Example:
  826. .. code-block:: python
  827. def test_system_echo(capfd):
  828. os.system('echo "hello"')
  829. captured = capfd.readouterr()
  830. assert captured.out == "hello\n"
  831. """
  832. capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager")
  833. capture_fixture = CaptureFixture(FDCapture, request, _ispytest=True)
  834. capman.set_fixture(capture_fixture)
  835. capture_fixture._start()
  836. yield capture_fixture
  837. capture_fixture.close()
  838. capman.unset_fixture()
  839. @fixture
  840. def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
  841. r"""Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
  842. The captured output is made available via ``capfd.readouterr()`` method
  843. calls, which return a ``(out, err)`` namedtuple.
  844. ``out`` and ``err`` will be ``byte`` objects.
  845. Returns an instance of :class:`CaptureFixture[bytes] <pytest.CaptureFixture>`.
  846. Example:
  847. .. code-block:: python
  848. def test_system_echo(capfdbinary):
  849. os.system('echo "hello"')
  850. captured = capfdbinary.readouterr()
  851. assert captured.out == b"hello\n"
  852. """
  853. capman: CaptureManager = request.config.pluginmanager.getplugin("capturemanager")
  854. capture_fixture = CaptureFixture(FDCaptureBinary, request, _ispytest=True)
  855. capman.set_fixture(capture_fixture)
  856. capture_fixture._start()
  857. yield capture_fixture
  858. capture_fixture.close()
  859. capman.unset_fixture()