threadexception.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import threading
  2. import traceback
  3. import warnings
  4. from types import TracebackType
  5. from typing import Any
  6. from typing import Callable
  7. from typing import Generator
  8. from typing import Optional
  9. from typing import Type
  10. import pytest
  11. # Copied from cpython/Lib/test/support/threading_helper.py, with modifications.
  12. class catch_threading_exception:
  13. """Context manager catching threading.Thread exception using
  14. threading.excepthook.
  15. Storing exc_value using a custom hook can create a reference cycle. The
  16. reference cycle is broken explicitly when the context manager exits.
  17. Storing thread using a custom hook can resurrect it if it is set to an
  18. object which is being finalized. Exiting the context manager clears the
  19. stored object.
  20. Usage:
  21. with threading_helper.catch_threading_exception() as cm:
  22. # code spawning a thread which raises an exception
  23. ...
  24. # check the thread exception: use cm.args
  25. ...
  26. # cm.args attribute no longer exists at this point
  27. # (to break a reference cycle)
  28. """
  29. def __init__(self) -> None:
  30. self.args: Optional["threading.ExceptHookArgs"] = None
  31. self._old_hook: Optional[Callable[["threading.ExceptHookArgs"], Any]] = None
  32. def _hook(self, args: "threading.ExceptHookArgs") -> None:
  33. self.args = args
  34. def __enter__(self) -> "catch_threading_exception":
  35. self._old_hook = threading.excepthook
  36. threading.excepthook = self._hook
  37. return self
  38. def __exit__(
  39. self,
  40. exc_type: Optional[Type[BaseException]],
  41. exc_val: Optional[BaseException],
  42. exc_tb: Optional[TracebackType],
  43. ) -> None:
  44. assert self._old_hook is not None
  45. threading.excepthook = self._old_hook
  46. self._old_hook = None
  47. del self.args
  48. def thread_exception_runtest_hook() -> Generator[None, None, None]:
  49. with catch_threading_exception() as cm:
  50. yield
  51. if cm.args:
  52. thread_name = "<unknown>" if cm.args.thread is None else cm.args.thread.name
  53. msg = f"Exception in thread {thread_name}\n\n"
  54. msg += "".join(
  55. traceback.format_exception(
  56. cm.args.exc_type,
  57. cm.args.exc_value,
  58. cm.args.exc_traceback,
  59. )
  60. )
  61. warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))
  62. @pytest.hookimpl(hookwrapper=True, trylast=True)
  63. def pytest_runtest_setup() -> Generator[None, None, None]:
  64. yield from thread_exception_runtest_hook()
  65. @pytest.hookimpl(hookwrapper=True, tryfirst=True)
  66. def pytest_runtest_call() -> Generator[None, None, None]:
  67. yield from thread_exception_runtest_hook()
  68. @pytest.hookimpl(hookwrapper=True, tryfirst=True)
  69. def pytest_runtest_teardown() -> Generator[None, None, None]:
  70. yield from thread_exception_runtest_hook()