util.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. #
  2. # Module providing various facilities to other parts of the package
  3. #
  4. # multiprocessing/util.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. import os
  10. import itertools
  11. import sys
  12. import weakref
  13. import atexit
  14. import threading # we want threading to install it's
  15. # cleanup function before multiprocessing does
  16. from subprocess import _args_from_interpreter_flags
  17. from . import process
  18. __all__ = [
  19. 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
  20. 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
  21. 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
  22. 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
  23. ]
  24. #
  25. # Logging
  26. #
  27. NOTSET = 0
  28. SUBDEBUG = 5
  29. DEBUG = 10
  30. INFO = 20
  31. SUBWARNING = 25
  32. LOGGER_NAME = 'multiprocessing'
  33. DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
  34. _logger = None
  35. _log_to_stderr = False
  36. def sub_debug(msg, *args):
  37. if _logger:
  38. _logger.log(SUBDEBUG, msg, *args, stacklevel=2)
  39. def debug(msg, *args):
  40. if _logger:
  41. _logger.log(DEBUG, msg, *args, stacklevel=2)
  42. def info(msg, *args):
  43. if _logger:
  44. _logger.log(INFO, msg, *args, stacklevel=2)
  45. def sub_warning(msg, *args):
  46. if _logger:
  47. _logger.log(SUBWARNING, msg, *args, stacklevel=2)
  48. def get_logger():
  49. '''
  50. Returns logger used by multiprocessing
  51. '''
  52. global _logger
  53. import logging
  54. logging._acquireLock()
  55. try:
  56. if not _logger:
  57. _logger = logging.getLogger(LOGGER_NAME)
  58. _logger.propagate = 0
  59. # XXX multiprocessing should cleanup before logging
  60. if hasattr(atexit, 'unregister'):
  61. atexit.unregister(_exit_function)
  62. atexit.register(_exit_function)
  63. else:
  64. atexit._exithandlers.remove((_exit_function, (), {}))
  65. atexit._exithandlers.append((_exit_function, (), {}))
  66. finally:
  67. logging._releaseLock()
  68. return _logger
  69. def log_to_stderr(level=None):
  70. '''
  71. Turn on logging and add a handler which prints to stderr
  72. '''
  73. global _log_to_stderr
  74. import logging
  75. logger = get_logger()
  76. formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
  77. handler = logging.StreamHandler()
  78. handler.setFormatter(formatter)
  79. logger.addHandler(handler)
  80. if level:
  81. logger.setLevel(level)
  82. _log_to_stderr = True
  83. return _logger
  84. # Abstract socket support
  85. def _platform_supports_abstract_sockets():
  86. if sys.platform == "linux":
  87. return True
  88. if hasattr(sys, 'getandroidapilevel'):
  89. return True
  90. return False
  91. def is_abstract_socket_namespace(address):
  92. if not address:
  93. return False
  94. if isinstance(address, bytes):
  95. return address[0] == 0
  96. elif isinstance(address, str):
  97. return address[0] == "\0"
  98. raise TypeError(f'address type of {address!r} unrecognized')
  99. abstract_sockets_supported = _platform_supports_abstract_sockets()
  100. #
  101. # Function returning a temp directory which will be removed on exit
  102. #
  103. def _remove_temp_dir(rmtree, tempdir):
  104. def onerror(func, path, err_info):
  105. if not issubclass(err_info[0], FileNotFoundError):
  106. raise
  107. rmtree(tempdir, onerror=onerror)
  108. current_process = process.current_process()
  109. # current_process() can be None if the finalizer is called
  110. # late during Python finalization
  111. if current_process is not None:
  112. current_process._config['tempdir'] = None
  113. def get_temp_dir():
  114. # get name of a temp directory which will be automatically cleaned up
  115. tempdir = process.current_process()._config.get('tempdir')
  116. if tempdir is None:
  117. import shutil, tempfile
  118. tempdir = tempfile.mkdtemp(prefix='pymp-')
  119. info('created temp directory %s', tempdir)
  120. # keep a strong reference to shutil.rmtree(), since the finalizer
  121. # can be called late during Python shutdown
  122. Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
  123. exitpriority=-100)
  124. process.current_process()._config['tempdir'] = tempdir
  125. return tempdir
  126. #
  127. # Support for reinitialization of objects when bootstrapping a child process
  128. #
  129. _afterfork_registry = weakref.WeakValueDictionary()
  130. _afterfork_counter = itertools.count()
  131. def _run_after_forkers():
  132. items = list(_afterfork_registry.items())
  133. items.sort()
  134. for (index, ident, func), obj in items:
  135. try:
  136. func(obj)
  137. except Exception as e:
  138. info('after forker raised exception %s', e)
  139. def register_after_fork(obj, func):
  140. _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
  141. #
  142. # Finalization using weakrefs
  143. #
  144. _finalizer_registry = {}
  145. _finalizer_counter = itertools.count()
  146. class Finalize(object):
  147. '''
  148. Class which supports object finalization using weakrefs
  149. '''
  150. def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
  151. if (exitpriority is not None) and not isinstance(exitpriority,int):
  152. raise TypeError(
  153. "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
  154. exitpriority, type(exitpriority)))
  155. if obj is not None:
  156. self._weakref = weakref.ref(obj, self)
  157. elif exitpriority is None:
  158. raise ValueError("Without object, exitpriority cannot be None")
  159. self._callback = callback
  160. self._args = args
  161. self._kwargs = kwargs or {}
  162. self._key = (exitpriority, next(_finalizer_counter))
  163. self._pid = os.getpid()
  164. _finalizer_registry[self._key] = self
  165. def __call__(self, wr=None,
  166. # Need to bind these locally because the globals can have
  167. # been cleared at shutdown
  168. _finalizer_registry=_finalizer_registry,
  169. sub_debug=sub_debug, getpid=os.getpid):
  170. '''
  171. Run the callback unless it has already been called or cancelled
  172. '''
  173. try:
  174. del _finalizer_registry[self._key]
  175. except KeyError:
  176. sub_debug('finalizer no longer registered')
  177. else:
  178. if self._pid != getpid():
  179. sub_debug('finalizer ignored because different process')
  180. res = None
  181. else:
  182. sub_debug('finalizer calling %s with args %s and kwargs %s',
  183. self._callback, self._args, self._kwargs)
  184. res = self._callback(*self._args, **self._kwargs)
  185. self._weakref = self._callback = self._args = \
  186. self._kwargs = self._key = None
  187. return res
  188. def cancel(self):
  189. '''
  190. Cancel finalization of the object
  191. '''
  192. try:
  193. del _finalizer_registry[self._key]
  194. except KeyError:
  195. pass
  196. else:
  197. self._weakref = self._callback = self._args = \
  198. self._kwargs = self._key = None
  199. def still_active(self):
  200. '''
  201. Return whether this finalizer is still waiting to invoke callback
  202. '''
  203. return self._key in _finalizer_registry
  204. def __repr__(self):
  205. try:
  206. obj = self._weakref()
  207. except (AttributeError, TypeError):
  208. obj = None
  209. if obj is None:
  210. return '<%s object, dead>' % self.__class__.__name__
  211. x = '<%s object, callback=%s' % (
  212. self.__class__.__name__,
  213. getattr(self._callback, '__name__', self._callback))
  214. if self._args:
  215. x += ', args=' + str(self._args)
  216. if self._kwargs:
  217. x += ', kwargs=' + str(self._kwargs)
  218. if self._key[0] is not None:
  219. x += ', exitpriority=' + str(self._key[0])
  220. return x + '>'
  221. def _run_finalizers(minpriority=None):
  222. '''
  223. Run all finalizers whose exit priority is not None and at least minpriority
  224. Finalizers with highest priority are called first; finalizers with
  225. the same priority will be called in reverse order of creation.
  226. '''
  227. if _finalizer_registry is None:
  228. # This function may be called after this module's globals are
  229. # destroyed. See the _exit_function function in this module for more
  230. # notes.
  231. return
  232. if minpriority is None:
  233. f = lambda p : p[0] is not None
  234. else:
  235. f = lambda p : p[0] is not None and p[0] >= minpriority
  236. # Careful: _finalizer_registry may be mutated while this function
  237. # is running (either by a GC run or by another thread).
  238. # list(_finalizer_registry) should be atomic, while
  239. # list(_finalizer_registry.items()) is not.
  240. keys = [key for key in list(_finalizer_registry) if f(key)]
  241. keys.sort(reverse=True)
  242. for key in keys:
  243. finalizer = _finalizer_registry.get(key)
  244. # key may have been removed from the registry
  245. if finalizer is not None:
  246. sub_debug('calling %s', finalizer)
  247. try:
  248. finalizer()
  249. except Exception:
  250. import traceback
  251. traceback.print_exc()
  252. if minpriority is None:
  253. _finalizer_registry.clear()
  254. #
  255. # Clean up on exit
  256. #
  257. def is_exiting():
  258. '''
  259. Returns true if the process is shutting down
  260. '''
  261. return _exiting or _exiting is None
  262. _exiting = False
  263. def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
  264. active_children=process.active_children,
  265. current_process=process.current_process):
  266. # We hold on to references to functions in the arglist due to the
  267. # situation described below, where this function is called after this
  268. # module's globals are destroyed.
  269. global _exiting
  270. if not _exiting:
  271. _exiting = True
  272. info('process shutting down')
  273. debug('running all "atexit" finalizers with priority >= 0')
  274. _run_finalizers(0)
  275. if current_process() is not None:
  276. # We check if the current process is None here because if
  277. # it's None, any call to ``active_children()`` will raise
  278. # an AttributeError (active_children winds up trying to
  279. # get attributes from util._current_process). One
  280. # situation where this can happen is if someone has
  281. # manipulated sys.modules, causing this module to be
  282. # garbage collected. The destructor for the module type
  283. # then replaces all values in the module dict with None.
  284. # For instance, after setuptools runs a test it replaces
  285. # sys.modules with a copy created earlier. See issues
  286. # #9775 and #15881. Also related: #4106, #9205, and
  287. # #9207.
  288. for p in active_children():
  289. if p.daemon:
  290. info('calling terminate() for daemon %s', p.name)
  291. p._popen.terminate()
  292. for p in active_children():
  293. info('calling join() for process %s', p.name)
  294. p.join()
  295. debug('running the remaining "atexit" finalizers')
  296. _run_finalizers()
  297. atexit.register(_exit_function)
  298. #
  299. # Some fork aware types
  300. #
  301. class ForkAwareThreadLock(object):
  302. def __init__(self):
  303. self._lock = threading.Lock()
  304. self.acquire = self._lock.acquire
  305. self.release = self._lock.release
  306. register_after_fork(self, ForkAwareThreadLock._at_fork_reinit)
  307. def _at_fork_reinit(self):
  308. self._lock._at_fork_reinit()
  309. def __enter__(self):
  310. return self._lock.__enter__()
  311. def __exit__(self, *args):
  312. return self._lock.__exit__(*args)
  313. class ForkAwareLocal(threading.local):
  314. def __new__(cls):
  315. self = threading.local.__new__(cls)
  316. register_after_fork(self, lambda obj: obj.__dict__.clear())
  317. return self
  318. def __reduce__(self):
  319. return type(self), ()
  320. #
  321. # Close fds except those specified
  322. #
  323. try:
  324. MAXFD = os.sysconf("SC_OPEN_MAX")
  325. except Exception:
  326. MAXFD = 256
  327. def close_all_fds_except(fds):
  328. fds = list(fds) + [-1, MAXFD]
  329. fds.sort()
  330. assert fds[-1] == MAXFD, 'fd too large'
  331. for i in range(len(fds) - 1):
  332. os.closerange(fds[i]+1, fds[i+1])
  333. #
  334. # Close sys.stdin and replace stdin with os.devnull
  335. #
  336. def _close_stdin():
  337. if sys.stdin is None:
  338. return
  339. try:
  340. sys.stdin.close()
  341. except (OSError, ValueError):
  342. pass
  343. try:
  344. fd = os.open(os.devnull, os.O_RDONLY)
  345. try:
  346. sys.stdin = open(fd, encoding="utf-8", closefd=False)
  347. except:
  348. os.close(fd)
  349. raise
  350. except (OSError, ValueError):
  351. pass
  352. #
  353. # Flush standard streams, if any
  354. #
  355. def _flush_std_streams():
  356. try:
  357. sys.stdout.flush()
  358. except (AttributeError, ValueError):
  359. pass
  360. try:
  361. sys.stderr.flush()
  362. except (AttributeError, ValueError):
  363. pass
  364. #
  365. # Start a program with only specified fds kept open
  366. #
  367. def _env_list():
  368. # Based on fork_exec in subprocess.py.
  369. env = os.environ.copy()
  370. env['Y_PYTHON_ENTRY_POINT'] = ':main'
  371. env_list = []
  372. for k, v in env.items():
  373. if '=' not in k:
  374. env_list.append(os.fsencode(k) + b'=' + os.fsencode(v))
  375. return env_list
  376. def spawnv_passfds(path, args, passfds):
  377. import _posixsubprocess
  378. import subprocess
  379. passfds = tuple(sorted(map(int, passfds)))
  380. errpipe_read, errpipe_write = os.pipe()
  381. try:
  382. return _posixsubprocess.fork_exec(
  383. args, [path], True, passfds, None, _env_list(),
  384. -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
  385. False, False, -1, None, None, None, -1, None,
  386. subprocess._USE_VFORK)
  387. finally:
  388. os.close(errpipe_read)
  389. os.close(errpipe_write)
  390. def close_fds(*fds):
  391. """Close each file descriptor given as an argument"""
  392. for fd in fds:
  393. os.close(fd)
  394. def _cleanup_tests():
  395. """Cleanup multiprocessing resources when multiprocessing tests
  396. completed."""
  397. from test import support
  398. # cleanup multiprocessing
  399. process._cleanup()
  400. # Stop the ForkServer process if it's running
  401. from multiprocessing import forkserver
  402. forkserver._forkserver._stop()
  403. # Stop the ResourceTracker process if it's running
  404. from multiprocessing import resource_tracker
  405. resource_tracker._resource_tracker._stop()
  406. # bpo-37421: Explicitly call _run_finalizers() to remove immediately
  407. # temporary directories created by multiprocessing.util.get_temp_dir().
  408. _run_finalizers()
  409. support.gc_collect()
  410. support.reap_children()