warnings.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. """Python part of the warnings subsystem."""
  2. import sys
  3. __all__ = ["warn", "warn_explicit", "showwarning",
  4. "formatwarning", "filterwarnings", "simplefilter",
  5. "resetwarnings", "catch_warnings"]
  6. def showwarning(message, category, filename, lineno, file=None, line=None):
  7. """Hook to write a warning to a file; replace if you like."""
  8. msg = WarningMessage(message, category, filename, lineno, file, line)
  9. _showwarnmsg_impl(msg)
  10. def formatwarning(message, category, filename, lineno, line=None):
  11. """Function to format a warning the standard way."""
  12. msg = WarningMessage(message, category, filename, lineno, None, line)
  13. return _formatwarnmsg_impl(msg)
  14. def _showwarnmsg_impl(msg):
  15. file = msg.file
  16. if file is None:
  17. file = sys.stderr
  18. if file is None:
  19. # sys.stderr is None when run with pythonw.exe:
  20. # warnings get lost
  21. return
  22. text = _formatwarnmsg(msg)
  23. try:
  24. file.write(text)
  25. except OSError:
  26. # the file (probably stderr) is invalid - this warning gets lost.
  27. pass
  28. def _formatwarnmsg_impl(msg):
  29. category = msg.category.__name__
  30. s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n"
  31. if msg.line is None:
  32. try:
  33. import linecache
  34. line = linecache.getline(msg.filename, msg.lineno)
  35. except Exception:
  36. # When a warning is logged during Python shutdown, linecache
  37. # and the import machinery don't work anymore
  38. line = None
  39. linecache = None
  40. else:
  41. line = msg.line
  42. if line:
  43. line = line.strip()
  44. s += " %s\n" % line
  45. if msg.source is not None:
  46. try:
  47. import tracemalloc
  48. # Logging a warning should not raise a new exception:
  49. # catch Exception, not only ImportError and RecursionError.
  50. except Exception:
  51. # don't suggest to enable tracemalloc if it's not available
  52. suggest_tracemalloc = False
  53. tb = None
  54. else:
  55. try:
  56. suggest_tracemalloc = not tracemalloc.is_tracing()
  57. tb = tracemalloc.get_object_traceback(msg.source)
  58. except Exception:
  59. # When a warning is logged during Python shutdown, tracemalloc
  60. # and the import machinery don't work anymore
  61. suggest_tracemalloc = False
  62. tb = None
  63. if tb is not None:
  64. s += 'Object allocated at (most recent call last):\n'
  65. for frame in tb:
  66. s += (' File "%s", lineno %s\n'
  67. % (frame.filename, frame.lineno))
  68. try:
  69. if linecache is not None:
  70. line = linecache.getline(frame.filename, frame.lineno)
  71. else:
  72. line = None
  73. except Exception:
  74. line = None
  75. if line:
  76. line = line.strip()
  77. s += ' %s\n' % line
  78. elif suggest_tracemalloc:
  79. s += (f'{category}: Enable tracemalloc to get the object '
  80. f'allocation traceback\n')
  81. return s
  82. # Keep a reference to check if the function was replaced
  83. _showwarning_orig = showwarning
  84. def _showwarnmsg(msg):
  85. """Hook to write a warning to a file; replace if you like."""
  86. try:
  87. sw = showwarning
  88. except NameError:
  89. pass
  90. else:
  91. if sw is not _showwarning_orig:
  92. # warnings.showwarning() was replaced
  93. if not callable(sw):
  94. raise TypeError("warnings.showwarning() must be set to a "
  95. "function or method")
  96. sw(msg.message, msg.category, msg.filename, msg.lineno,
  97. msg.file, msg.line)
  98. return
  99. _showwarnmsg_impl(msg)
  100. # Keep a reference to check if the function was replaced
  101. _formatwarning_orig = formatwarning
  102. def _formatwarnmsg(msg):
  103. """Function to format a warning the standard way."""
  104. try:
  105. fw = formatwarning
  106. except NameError:
  107. pass
  108. else:
  109. if fw is not _formatwarning_orig:
  110. # warnings.formatwarning() was replaced
  111. return fw(msg.message, msg.category,
  112. msg.filename, msg.lineno, msg.line)
  113. return _formatwarnmsg_impl(msg)
  114. def filterwarnings(action, message="", category=Warning, module="", lineno=0,
  115. append=False):
  116. """Insert an entry into the list of warnings filters (at the front).
  117. 'action' -- one of "error", "ignore", "always", "default", "module",
  118. or "once"
  119. 'message' -- a regex that the warning message must match
  120. 'category' -- a class that the warning must be a subclass of
  121. 'module' -- a regex that the module name must match
  122. 'lineno' -- an integer line number, 0 matches all warnings
  123. 'append' -- if true, append to the list of filters
  124. """
  125. assert action in ("error", "ignore", "always", "default", "module",
  126. "once"), "invalid action: %r" % (action,)
  127. assert isinstance(message, str), "message must be a string"
  128. assert isinstance(category, type), "category must be a class"
  129. assert issubclass(category, Warning), "category must be a Warning subclass"
  130. assert isinstance(module, str), "module must be a string"
  131. assert isinstance(lineno, int) and lineno >= 0, \
  132. "lineno must be an int >= 0"
  133. if message or module:
  134. import re
  135. if message:
  136. message = re.compile(message, re.I)
  137. else:
  138. message = None
  139. if module:
  140. module = re.compile(module)
  141. else:
  142. module = None
  143. _add_filter(action, message, category, module, lineno, append=append)
  144. def simplefilter(action, category=Warning, lineno=0, append=False):
  145. """Insert a simple entry into the list of warnings filters (at the front).
  146. A simple filter matches all modules and messages.
  147. 'action' -- one of "error", "ignore", "always", "default", "module",
  148. or "once"
  149. 'category' -- a class that the warning must be a subclass of
  150. 'lineno' -- an integer line number, 0 matches all warnings
  151. 'append' -- if true, append to the list of filters
  152. """
  153. assert action in ("error", "ignore", "always", "default", "module",
  154. "once"), "invalid action: %r" % (action,)
  155. assert isinstance(lineno, int) and lineno >= 0, \
  156. "lineno must be an int >= 0"
  157. _add_filter(action, None, category, None, lineno, append=append)
  158. def _add_filter(*item, append):
  159. # Remove possible duplicate filters, so new one will be placed
  160. # in correct place. If append=True and duplicate exists, do nothing.
  161. if not append:
  162. try:
  163. filters.remove(item)
  164. except ValueError:
  165. pass
  166. filters.insert(0, item)
  167. else:
  168. if item not in filters:
  169. filters.append(item)
  170. _filters_mutated()
  171. def resetwarnings():
  172. """Clear the list of warning filters, so that no filters are active."""
  173. filters[:] = []
  174. _filters_mutated()
  175. class _OptionError(Exception):
  176. """Exception used by option processing helpers."""
  177. pass
  178. # Helper to process -W options passed via sys.warnoptions
  179. def _processoptions(args):
  180. for arg in args:
  181. try:
  182. _setoption(arg)
  183. except _OptionError as msg:
  184. print("Invalid -W option ignored:", msg, file=sys.stderr)
  185. # Helper for _processoptions()
  186. def _setoption(arg):
  187. parts = arg.split(':')
  188. if len(parts) > 5:
  189. raise _OptionError("too many fields (max 5): %r" % (arg,))
  190. while len(parts) < 5:
  191. parts.append('')
  192. action, message, category, module, lineno = [s.strip()
  193. for s in parts]
  194. action = _getaction(action)
  195. category = _getcategory(category)
  196. if message or module:
  197. import re
  198. if message:
  199. message = re.escape(message)
  200. if module:
  201. module = re.escape(module) + r'\Z'
  202. if lineno:
  203. try:
  204. lineno = int(lineno)
  205. if lineno < 0:
  206. raise ValueError
  207. except (ValueError, OverflowError):
  208. raise _OptionError("invalid lineno %r" % (lineno,)) from None
  209. else:
  210. lineno = 0
  211. filterwarnings(action, message, category, module, lineno)
  212. # Helper for _setoption()
  213. def _getaction(action):
  214. if not action:
  215. return "default"
  216. if action == "all": return "always" # Alias
  217. for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
  218. if a.startswith(action):
  219. return a
  220. raise _OptionError("invalid action: %r" % (action,))
  221. # Helper for _setoption()
  222. def _getcategory(category):
  223. if not category:
  224. return Warning
  225. if '.' not in category:
  226. import builtins as m
  227. klass = category
  228. else:
  229. module, _, klass = category.rpartition('.')
  230. try:
  231. m = __import__(module, None, None, [klass])
  232. except ImportError:
  233. raise _OptionError("invalid module name: %r" % (module,)) from None
  234. try:
  235. cat = getattr(m, klass)
  236. except AttributeError:
  237. raise _OptionError("unknown warning category: %r" % (category,)) from None
  238. if not issubclass(cat, Warning):
  239. raise _OptionError("invalid warning category: %r" % (category,))
  240. return cat
  241. def _is_internal_filename(filename):
  242. return 'importlib' in filename and '_bootstrap' in filename
  243. def _is_filename_to_skip(filename, skip_file_prefixes):
  244. return any(filename.startswith(prefix) for prefix in skip_file_prefixes)
  245. def _is_internal_frame(frame):
  246. """Signal whether the frame is an internal CPython implementation detail."""
  247. return _is_internal_filename(frame.f_code.co_filename)
  248. def _next_external_frame(frame, skip_file_prefixes):
  249. """Find the next frame that doesn't involve Python or user internals."""
  250. frame = frame.f_back
  251. while frame is not None and (
  252. _is_internal_filename(filename := frame.f_code.co_filename) or
  253. _is_filename_to_skip(filename, skip_file_prefixes)):
  254. frame = frame.f_back
  255. return frame
  256. # Code typically replaced by _warnings
  257. def warn(message, category=None, stacklevel=1, source=None,
  258. *, skip_file_prefixes=()):
  259. """Issue a warning, or maybe ignore it or raise an exception."""
  260. # Check if message is already a Warning object
  261. if isinstance(message, Warning):
  262. category = message.__class__
  263. # Check category argument
  264. if category is None:
  265. category = UserWarning
  266. if not (isinstance(category, type) and issubclass(category, Warning)):
  267. raise TypeError("category must be a Warning subclass, "
  268. "not '{:s}'".format(type(category).__name__))
  269. if not isinstance(skip_file_prefixes, tuple):
  270. # The C version demands a tuple for implementation performance.
  271. raise TypeError('skip_file_prefixes must be a tuple of strs.')
  272. if skip_file_prefixes:
  273. stacklevel = max(2, stacklevel)
  274. # Get context information
  275. try:
  276. if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)):
  277. # If frame is too small to care or if the warning originated in
  278. # internal code, then do not try to hide any frames.
  279. frame = sys._getframe(stacklevel)
  280. else:
  281. frame = sys._getframe(1)
  282. # Look for one frame less since the above line starts us off.
  283. for x in range(stacklevel-1):
  284. frame = _next_external_frame(frame, skip_file_prefixes)
  285. if frame is None:
  286. raise ValueError
  287. except ValueError:
  288. globals = sys.__dict__
  289. filename = "sys"
  290. lineno = 1
  291. else:
  292. globals = frame.f_globals
  293. filename = frame.f_code.co_filename
  294. lineno = frame.f_lineno
  295. if '__name__' in globals:
  296. module = globals['__name__']
  297. else:
  298. module = "<string>"
  299. registry = globals.setdefault("__warningregistry__", {})
  300. warn_explicit(message, category, filename, lineno, module, registry,
  301. globals, source)
  302. def warn_explicit(message, category, filename, lineno,
  303. module=None, registry=None, module_globals=None,
  304. source=None):
  305. lineno = int(lineno)
  306. if module is None:
  307. module = filename or "<unknown>"
  308. if module[-3:].lower() == ".py":
  309. module = module[:-3] # XXX What about leading pathname?
  310. if registry is None:
  311. registry = {}
  312. if registry.get('version', 0) != _filters_version:
  313. registry.clear()
  314. registry['version'] = _filters_version
  315. if isinstance(message, Warning):
  316. text = str(message)
  317. category = message.__class__
  318. else:
  319. text = message
  320. message = category(message)
  321. key = (text, category, lineno)
  322. # Quick test for common case
  323. if registry.get(key):
  324. return
  325. # Search the filters
  326. for item in filters:
  327. action, msg, cat, mod, ln = item
  328. if ((msg is None or msg.match(text)) and
  329. issubclass(category, cat) and
  330. (mod is None or mod.match(module)) and
  331. (ln == 0 or lineno == ln)):
  332. break
  333. else:
  334. action = defaultaction
  335. # Early exit actions
  336. if action == "ignore":
  337. return
  338. # Prime the linecache for formatting, in case the
  339. # "file" is actually in a zipfile or something.
  340. import linecache
  341. linecache.getlines(filename, module_globals)
  342. if action == "error":
  343. raise message
  344. # Other actions
  345. if action == "once":
  346. registry[key] = 1
  347. oncekey = (text, category)
  348. if onceregistry.get(oncekey):
  349. return
  350. onceregistry[oncekey] = 1
  351. elif action == "always":
  352. pass
  353. elif action == "module":
  354. registry[key] = 1
  355. altkey = (text, category, 0)
  356. if registry.get(altkey):
  357. return
  358. registry[altkey] = 1
  359. elif action == "default":
  360. registry[key] = 1
  361. else:
  362. # Unrecognized actions are errors
  363. raise RuntimeError(
  364. "Unrecognized action (%r) in warnings.filters:\n %s" %
  365. (action, item))
  366. # Print message and context
  367. msg = WarningMessage(message, category, filename, lineno, source)
  368. _showwarnmsg(msg)
  369. class WarningMessage(object):
  370. _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
  371. "line", "source")
  372. def __init__(self, message, category, filename, lineno, file=None,
  373. line=None, source=None):
  374. self.message = message
  375. self.category = category
  376. self.filename = filename
  377. self.lineno = lineno
  378. self.file = file
  379. self.line = line
  380. self.source = source
  381. self._category_name = category.__name__ if category else None
  382. def __str__(self):
  383. return ("{message : %r, category : %r, filename : %r, lineno : %s, "
  384. "line : %r}" % (self.message, self._category_name,
  385. self.filename, self.lineno, self.line))
  386. class catch_warnings(object):
  387. """A context manager that copies and restores the warnings filter upon
  388. exiting the context.
  389. The 'record' argument specifies whether warnings should be captured by a
  390. custom implementation of warnings.showwarning() and be appended to a list
  391. returned by the context manager. Otherwise None is returned by the context
  392. manager. The objects appended to the list are arguments whose attributes
  393. mirror the arguments to showwarning().
  394. The 'module' argument is to specify an alternative module to the module
  395. named 'warnings' and imported under that name. This argument is only useful
  396. when testing the warnings module itself.
  397. If the 'action' argument is not None, the remaining arguments are passed
  398. to warnings.simplefilter() as if it were called immediately on entering the
  399. context.
  400. """
  401. def __init__(self, *, record=False, module=None,
  402. action=None, category=Warning, lineno=0, append=False):
  403. """Specify whether to record warnings and if an alternative module
  404. should be used other than sys.modules['warnings'].
  405. For compatibility with Python 3.0, please consider all arguments to be
  406. keyword-only.
  407. """
  408. self._record = record
  409. self._module = sys.modules['warnings'] if module is None else module
  410. self._entered = False
  411. if action is None:
  412. self._filter = None
  413. else:
  414. self._filter = (action, category, lineno, append)
  415. def __repr__(self):
  416. args = []
  417. if self._record:
  418. args.append("record=True")
  419. if self._module is not sys.modules['warnings']:
  420. args.append("module=%r" % self._module)
  421. name = type(self).__name__
  422. return "%s(%s)" % (name, ", ".join(args))
  423. def __enter__(self):
  424. if self._entered:
  425. raise RuntimeError("Cannot enter %r twice" % self)
  426. self._entered = True
  427. self._filters = self._module.filters
  428. self._module.filters = self._filters[:]
  429. self._module._filters_mutated()
  430. self._showwarning = self._module.showwarning
  431. self._showwarnmsg_impl = self._module._showwarnmsg_impl
  432. if self._filter is not None:
  433. simplefilter(*self._filter)
  434. if self._record:
  435. log = []
  436. self._module._showwarnmsg_impl = log.append
  437. # Reset showwarning() to the default implementation to make sure
  438. # that _showwarnmsg() calls _showwarnmsg_impl()
  439. self._module.showwarning = self._module._showwarning_orig
  440. return log
  441. else:
  442. return None
  443. def __exit__(self, *exc_info):
  444. if not self._entered:
  445. raise RuntimeError("Cannot exit %r without entering first" % self)
  446. self._module.filters = self._filters
  447. self._module._filters_mutated()
  448. self._module.showwarning = self._showwarning
  449. self._module._showwarnmsg_impl = self._showwarnmsg_impl
  450. _DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}"
  451. def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info):
  452. """Warn that *name* is deprecated or should be removed.
  453. RuntimeError is raised if *remove* specifies a major/minor tuple older than
  454. the current Python version or the same version but past the alpha.
  455. The *message* argument is formatted with *name* and *remove* as a Python
  456. version (e.g. "3.11").
  457. """
  458. remove_formatted = f"{remove[0]}.{remove[1]}"
  459. if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"):
  460. msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha"
  461. raise RuntimeError(msg)
  462. else:
  463. msg = message.format(name=name, remove=remove_formatted)
  464. warn(msg, DeprecationWarning, stacklevel=3)
  465. # Private utility function called by _PyErr_WarnUnawaitedCoroutine
  466. def _warn_unawaited_coroutine(coro):
  467. msg_lines = [
  468. f"coroutine '{coro.__qualname__}' was never awaited\n"
  469. ]
  470. if coro.cr_origin is not None:
  471. import linecache, traceback
  472. def extract():
  473. for filename, lineno, funcname in reversed(coro.cr_origin):
  474. line = linecache.getline(filename, lineno)
  475. yield (filename, lineno, funcname, line)
  476. msg_lines.append("Coroutine created at (most recent call last)\n")
  477. msg_lines += traceback.format_list(list(extract()))
  478. msg = "".join(msg_lines).rstrip("\n")
  479. # Passing source= here means that if the user happens to have tracemalloc
  480. # enabled and tracking where the coroutine was created, the warning will
  481. # contain that traceback. This does mean that if they have *both*
  482. # coroutine origin tracking *and* tracemalloc enabled, they'll get two
  483. # partially-redundant tracebacks. If we wanted to be clever we could
  484. # probably detect this case and avoid it, but for now we don't bother.
  485. warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
  486. # filters contains a sequence of filter 5-tuples
  487. # The components of the 5-tuple are:
  488. # - an action: error, ignore, always, default, module, or once
  489. # - a compiled regex that must match the warning message
  490. # - a class representing the warning category
  491. # - a compiled regex that must match the module that is being warned
  492. # - a line number for the line being warning, or 0 to mean any line
  493. # If either if the compiled regexs are None, match anything.
  494. try:
  495. from _warnings import (filters, _defaultaction, _onceregistry,
  496. warn, warn_explicit, _filters_mutated)
  497. defaultaction = _defaultaction
  498. onceregistry = _onceregistry
  499. _warnings_defaults = True
  500. except ImportError:
  501. filters = []
  502. defaultaction = "default"
  503. onceregistry = {}
  504. _filters_version = 1
  505. def _filters_mutated():
  506. global _filters_version
  507. _filters_version += 1
  508. _warnings_defaults = False
  509. # Module initialization
  510. _processoptions(sys.warnoptions)
  511. if not _warnings_defaults:
  512. # Several warning categories are ignored by default in regular builds
  513. if not hasattr(sys, 'gettotalrefcount'):
  514. filterwarnings("default", category=DeprecationWarning,
  515. module="__main__", append=1)
  516. simplefilter("ignore", category=DeprecationWarning, append=1)
  517. simplefilter("ignore", category=PendingDeprecationWarning, append=1)
  518. simplefilter("ignore", category=ImportWarning, append=1)
  519. simplefilter("ignore", category=ResourceWarning, append=1)
  520. del _warnings_defaults