_log.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import logging
  2. import collections
  3. from .case import _BaseTestCaseContext
  4. _LoggingWatcher = collections.namedtuple("_LoggingWatcher",
  5. ["records", "output"])
  6. class _CapturingHandler(logging.Handler):
  7. """
  8. A logging handler capturing all (raw and formatted) logging output.
  9. """
  10. def __init__(self):
  11. logging.Handler.__init__(self)
  12. self.watcher = _LoggingWatcher([], [])
  13. def flush(self):
  14. pass
  15. def emit(self, record):
  16. self.watcher.records.append(record)
  17. msg = self.format(record)
  18. self.watcher.output.append(msg)
  19. class _AssertLogsContext(_BaseTestCaseContext):
  20. """A context manager for assertLogs() and assertNoLogs() """
  21. LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
  22. def __init__(self, test_case, logger_name, level, no_logs):
  23. _BaseTestCaseContext.__init__(self, test_case)
  24. self.logger_name = logger_name
  25. if level:
  26. self.level = logging._nameToLevel.get(level, level)
  27. else:
  28. self.level = logging.INFO
  29. self.msg = None
  30. self.no_logs = no_logs
  31. def __enter__(self):
  32. if isinstance(self.logger_name, logging.Logger):
  33. logger = self.logger = self.logger_name
  34. else:
  35. logger = self.logger = logging.getLogger(self.logger_name)
  36. formatter = logging.Formatter(self.LOGGING_FORMAT)
  37. handler = _CapturingHandler()
  38. handler.setLevel(self.level)
  39. handler.setFormatter(formatter)
  40. self.watcher = handler.watcher
  41. self.old_handlers = logger.handlers[:]
  42. self.old_level = logger.level
  43. self.old_propagate = logger.propagate
  44. logger.handlers = [handler]
  45. logger.setLevel(self.level)
  46. logger.propagate = False
  47. if self.no_logs:
  48. return
  49. return handler.watcher
  50. def __exit__(self, exc_type, exc_value, tb):
  51. self.logger.handlers = self.old_handlers
  52. self.logger.propagate = self.old_propagate
  53. self.logger.setLevel(self.old_level)
  54. if exc_type is not None:
  55. # let unexpected exceptions pass through
  56. return False
  57. if self.no_logs:
  58. # assertNoLogs
  59. if len(self.watcher.records) > 0:
  60. self._raiseFailure(
  61. "Unexpected logs found: {!r}".format(
  62. self.watcher.output
  63. )
  64. )
  65. else:
  66. # assertLogs
  67. if len(self.watcher.records) == 0:
  68. self._raiseFailure(
  69. "no logs of level {} or higher triggered on {}"
  70. .format(logging.getLevelName(self.level), self.logger.name))