123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- import errno
- import os
- import selectors
- import signal
- import socket
- import struct
- import sys
- import threading
- import warnings
- from . import connection
- from . import process
- from .context import reduction
- from . import resource_tracker
- from . import spawn
- from . import util
- __all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
- 'set_forkserver_preload']
- #
- #
- #
- MAXFDS_TO_SEND = 256
- SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
- #
- # Forkserver class
- #
- class ForkServer(object):
- def __init__(self):
- self._forkserver_address = None
- self._forkserver_alive_fd = None
- self._forkserver_pid = None
- self._inherited_fds = None
- self._lock = threading.Lock()
- self._preload_modules = ['__main__']
- def _stop(self):
- # Method used by unit tests to stop the server
- with self._lock:
- self._stop_unlocked()
- def _stop_unlocked(self):
- if self._forkserver_pid is None:
- return
- # close the "alive" file descriptor asks the server to stop
- os.close(self._forkserver_alive_fd)
- self._forkserver_alive_fd = None
- os.waitpid(self._forkserver_pid, 0)
- self._forkserver_pid = None
- if not util.is_abstract_socket_namespace(self._forkserver_address):
- os.unlink(self._forkserver_address)
- self._forkserver_address = None
- def set_forkserver_preload(self, modules_names):
- '''Set list of module names to try to load in forkserver process.'''
- if not all(type(mod) is str for mod in modules_names):
- raise TypeError('module_names must be a list of strings')
- self._preload_modules = modules_names
- def get_inherited_fds(self):
- '''Return list of fds inherited from parent process.
- This returns None if the current process was not started by fork
- server.
- '''
- return self._inherited_fds
- def connect_to_new_process(self, fds):
- '''Request forkserver to create a child process.
- Returns a pair of fds (status_r, data_w). The calling process can read
- the child process's pid and (eventually) its returncode from status_r.
- The calling process should write to data_w the pickled preparation and
- process data.
- '''
- self.ensure_running()
- if len(fds) + 4 >= MAXFDS_TO_SEND:
- raise ValueError('too many fds')
- with socket.socket(socket.AF_UNIX) as client:
- client.connect(self._forkserver_address)
- parent_r, child_w = os.pipe()
- child_r, parent_w = os.pipe()
- allfds = [child_r, child_w, self._forkserver_alive_fd,
- resource_tracker.getfd()]
- allfds += fds
- try:
- reduction.sendfds(client, allfds)
- return parent_r, parent_w
- except:
- os.close(parent_r)
- os.close(parent_w)
- raise
- finally:
- os.close(child_r)
- os.close(child_w)
- def ensure_running(self):
- '''Make sure that a fork server is running.
- This can be called from any process. Note that usually a child
- process will just reuse the forkserver started by its parent, so
- ensure_running() will do nothing.
- '''
- with self._lock:
- resource_tracker.ensure_running()
- if self._forkserver_pid is not None:
- # forkserver was launched before, is it still running?
- pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
- if not pid:
- # still alive
- return
- # dead, launch it again
- os.close(self._forkserver_alive_fd)
- self._forkserver_address = None
- self._forkserver_alive_fd = None
- self._forkserver_pid = None
- cmd = ('from multiprocessing.forkserver import main; ' +
- 'main(%d, %d, %r, **%r)')
- if self._preload_modules:
- desired_keys = {'main_path', 'sys_path'}
- data = spawn.get_preparation_data('ignore')
- data = {x: y for x, y in data.items() if x in desired_keys}
- else:
- data = {}
- with socket.socket(socket.AF_UNIX) as listener:
- address = connection.arbitrary_address('AF_UNIX')
- listener.bind(address)
- if not util.is_abstract_socket_namespace(address):
- os.chmod(address, 0o600)
- listener.listen()
- # all client processes own the write end of the "alive" pipe;
- # when they all terminate the read end becomes ready.
- alive_r, alive_w = os.pipe()
- try:
- fds_to_pass = [listener.fileno(), alive_r]
- cmd %= (listener.fileno(), alive_r, self._preload_modules,
- data)
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd]
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
- except:
- os.close(alive_w)
- raise
- finally:
- os.close(alive_r)
- self._forkserver_address = address
- self._forkserver_alive_fd = alive_w
- self._forkserver_pid = pid
- #
- #
- #
- def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
- '''Run forkserver.'''
- if preload:
- if '__main__' in preload and main_path is not None:
- process.current_process()._inheriting = True
- try:
- spawn.import_main_path(main_path)
- finally:
- del process.current_process()._inheriting
- for modname in preload:
- try:
- __import__(modname)
- except ImportError:
- pass
- util._close_stdin()
- sig_r, sig_w = os.pipe()
- os.set_blocking(sig_r, False)
- os.set_blocking(sig_w, False)
- def sigchld_handler(*_unused):
- # Dummy signal handler, doesn't do anything
- pass
- handlers = {
- # unblocking SIGCHLD allows the wakeup fd to notify our event loop
- signal.SIGCHLD: sigchld_handler,
- # protect the process from ^C
- signal.SIGINT: signal.SIG_IGN,
- }
- old_handlers = {sig: signal.signal(sig, val)
- for (sig, val) in handlers.items()}
- # calling os.write() in the Python signal handler is racy
- signal.set_wakeup_fd(sig_w)
- # map child pids to client fds
- pid_to_fd = {}
- with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
- selectors.DefaultSelector() as selector:
- _forkserver._forkserver_address = listener.getsockname()
- selector.register(listener, selectors.EVENT_READ)
- selector.register(alive_r, selectors.EVENT_READ)
- selector.register(sig_r, selectors.EVENT_READ)
- while True:
- try:
- while True:
- rfds = [key.fileobj for (key, events) in selector.select()]
- if rfds:
- break
- if alive_r in rfds:
- # EOF because no more client processes left
- assert os.read(alive_r, 1) == b'', "Not at EOF?"
- raise SystemExit
- if sig_r in rfds:
- # Got SIGCHLD
- os.read(sig_r, 65536) # exhaust
- while True:
- # Scan for child processes
- try:
- pid, sts = os.waitpid(-1, os.WNOHANG)
- except ChildProcessError:
- break
- if pid == 0:
- break
- child_w = pid_to_fd.pop(pid, None)
- if child_w is not None:
- returncode = os.waitstatus_to_exitcode(sts)
- # Send exit code to client process
- try:
- write_signed(child_w, returncode)
- except BrokenPipeError:
- # client vanished
- pass
- os.close(child_w)
- else:
- # This shouldn't happen really
- warnings.warn('forkserver: waitpid returned '
- 'unexpected pid %d' % pid)
- if listener in rfds:
- # Incoming fork request
- with listener.accept()[0] as s:
- # Receive fds from client
- fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
- if len(fds) > MAXFDS_TO_SEND:
- raise RuntimeError(
- "Too many ({0:n}) fds to send".format(
- len(fds)))
- child_r, child_w, *fds = fds
- s.close()
- pid = os.fork()
- if pid == 0:
- # Child
- code = 1
- try:
- listener.close()
- selector.close()
- unused_fds = [alive_r, child_w, sig_r, sig_w]
- unused_fds.extend(pid_to_fd.values())
- code = _serve_one(child_r, fds,
- unused_fds,
- old_handlers)
- except Exception:
- sys.excepthook(*sys.exc_info())
- sys.stderr.flush()
- finally:
- os._exit(code)
- else:
- # Send pid to client process
- try:
- write_signed(child_w, pid)
- except BrokenPipeError:
- # client vanished
- pass
- pid_to_fd[pid] = child_w
- os.close(child_r)
- for fd in fds:
- os.close(fd)
- except OSError as e:
- if e.errno != errno.ECONNABORTED:
- raise
- def _serve_one(child_r, fds, unused_fds, handlers):
- # close unnecessary stuff and reset signal handlers
- signal.set_wakeup_fd(-1)
- for sig, val in handlers.items():
- signal.signal(sig, val)
- for fd in unused_fds:
- os.close(fd)
- (_forkserver._forkserver_alive_fd,
- resource_tracker._resource_tracker._fd,
- *_forkserver._inherited_fds) = fds
- # Run process object received over pipe
- parent_sentinel = os.dup(child_r)
- code = spawn._main(child_r, parent_sentinel)
- return code
- #
- # Read and write signed numbers
- #
- def read_signed(fd):
- data = b''
- length = SIGNED_STRUCT.size
- while len(data) < length:
- s = os.read(fd, length - len(data))
- if not s:
- raise EOFError('unexpected EOF')
- data += s
- return SIGNED_STRUCT.unpack(data)[0]
- def write_signed(fd, n):
- msg = SIGNED_STRUCT.pack(n)
- while msg:
- nbytes = os.write(fd, msg)
- if nbytes == 0:
- raise RuntimeError('should not get here')
- msg = msg[nbytes:]
- #
- #
- #
- _forkserver = ForkServer()
- ensure_running = _forkserver.ensure_running
- get_inherited_fds = _forkserver.get_inherited_fds
- connect_to_new_process = _forkserver.connect_to_new_process
- set_forkserver_preload = _forkserver.set_forkserver_preload
|