process.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114
  1. # -*- test-case-name: twisted.test.test_process -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. UNIX Process management.
  6. Do NOT use this module directly - use reactor.spawnProcess() instead.
  7. Maintainer: Itamar Shtull-Trauring
  8. """
  9. from __future__ import division, absolute_import, print_function
  10. from twisted.python.runtime import platform
  11. if platform.isWindows():
  12. raise ImportError(("twisted.internet.process does not work on Windows. "
  13. "Use the reactor.spawnProcess() API instead."))
  14. import errno
  15. import gc
  16. import os
  17. import io
  18. import signal
  19. import stat
  20. import sys
  21. import traceback
  22. try:
  23. import pty
  24. except ImportError:
  25. pty = None
  26. try:
  27. import fcntl, termios
  28. except ImportError:
  29. fcntl = None
  30. from zope.interface import implementer
  31. from twisted.python import log, failure
  32. from twisted.python.util import switchUID
  33. from twisted.python.compat import items, range, _PY3
  34. from twisted.internet import fdesc, abstract, error
  35. from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE
  36. from twisted.internet._baseprocess import BaseProcess
  37. from twisted.internet.interfaces import IProcessTransport
  38. # Some people were importing this, which is incorrect, just keeping it
  39. # here for backwards compatibility:
  40. ProcessExitedAlready = error.ProcessExitedAlready
  41. reapProcessHandlers = {}
  42. def reapAllProcesses():
  43. """
  44. Reap all registered processes.
  45. """
  46. # Coerce this to a list, as reaping the process changes the dictionary and
  47. # causes a "size changed during iteration" exception
  48. for process in list(reapProcessHandlers.values()):
  49. process.reapProcess()
  50. def registerReapProcessHandler(pid, process):
  51. """
  52. Register a process handler for the given pid, in case L{reapAllProcesses}
  53. is called.
  54. @param pid: the pid of the process.
  55. @param process: a process handler.
  56. """
  57. if pid in reapProcessHandlers:
  58. raise RuntimeError("Try to register an already registered process.")
  59. try:
  60. auxPID, status = os.waitpid(pid, os.WNOHANG)
  61. except:
  62. log.msg('Failed to reap %d:' % pid)
  63. log.err()
  64. auxPID = None
  65. if auxPID:
  66. process.processEnded(status)
  67. else:
  68. # if auxPID is 0, there are children but none have exited
  69. reapProcessHandlers[pid] = process
  70. def unregisterReapProcessHandler(pid, process):
  71. """
  72. Unregister a process handler previously registered with
  73. L{registerReapProcessHandler}.
  74. """
  75. if not (pid in reapProcessHandlers
  76. and reapProcessHandlers[pid] == process):
  77. raise RuntimeError("Try to unregister a process not registered.")
  78. del reapProcessHandlers[pid]
  79. class ProcessWriter(abstract.FileDescriptor):
  80. """
  81. (Internal) Helper class to write into a Process's input pipe.
  82. I am a helper which describes a selectable asynchronous writer to a
  83. process's input pipe, including stdin.
  84. @ivar enableReadHack: A flag which determines how readability on this
  85. write descriptor will be handled. If C{True}, then readability may
  86. indicate the reader for this write descriptor has been closed (ie,
  87. the connection has been lost). If C{False}, then readability events
  88. are ignored.
  89. """
  90. connected = 1
  91. ic = 0
  92. enableReadHack = False
  93. def __init__(self, reactor, proc, name, fileno, forceReadHack=False):
  94. """
  95. Initialize, specifying a Process instance to connect to.
  96. """
  97. abstract.FileDescriptor.__init__(self, reactor)
  98. fdesc.setNonBlocking(fileno)
  99. self.proc = proc
  100. self.name = name
  101. self.fd = fileno
  102. if not stat.S_ISFIFO(os.fstat(self.fileno()).st_mode):
  103. # If the fd is not a pipe, then the read hack is never
  104. # applicable. This case arises when ProcessWriter is used by
  105. # StandardIO and stdout is redirected to a normal file.
  106. self.enableReadHack = False
  107. elif forceReadHack:
  108. self.enableReadHack = True
  109. else:
  110. # Detect if this fd is actually a write-only fd. If it's
  111. # valid to read, don't try to detect closing via read.
  112. # This really only means that we cannot detect a TTY's write
  113. # pipe being closed.
  114. try:
  115. os.read(self.fileno(), 0)
  116. except OSError:
  117. # It's a write-only pipe end, enable hack
  118. self.enableReadHack = True
  119. if self.enableReadHack:
  120. self.startReading()
  121. def fileno(self):
  122. """
  123. Return the fileno() of my process's stdin.
  124. """
  125. return self.fd
  126. def writeSomeData(self, data):
  127. """
  128. Write some data to the open process.
  129. """
  130. rv = fdesc.writeToFD(self.fd, data)
  131. if rv == len(data) and self.enableReadHack:
  132. # If the send buffer is now empty and it is necessary to monitor
  133. # this descriptor for readability to detect close, try detecting
  134. # readability now.
  135. self.startReading()
  136. return rv
  137. def write(self, data):
  138. self.stopReading()
  139. abstract.FileDescriptor.write(self, data)
  140. def doRead(self):
  141. """
  142. The only way a write pipe can become "readable" is at EOF, because the
  143. child has closed it, and we're using a reactor which doesn't
  144. distinguish between readable and closed (such as the select reactor).
  145. Except that's not true on linux < 2.6.11. It has the following
  146. characteristics: write pipe is completely empty => POLLOUT (writable in
  147. select), write pipe is not completely empty => POLLIN (readable in
  148. select), write pipe's reader closed => POLLIN|POLLERR (readable and
  149. writable in select)
  150. That's what this funky code is for. If linux was not broken, this
  151. function could be simply "return CONNECTION_LOST".
  152. """
  153. if self.enableReadHack:
  154. return CONNECTION_LOST
  155. else:
  156. self.stopReading()
  157. def connectionLost(self, reason):
  158. """
  159. See abstract.FileDescriptor.connectionLost.
  160. """
  161. # At least on macOS 10.4, exiting while stdout is non-blocking can
  162. # result in data loss. For some reason putting the file descriptor
  163. # back into blocking mode seems to resolve this issue.
  164. fdesc.setBlocking(self.fd)
  165. abstract.FileDescriptor.connectionLost(self, reason)
  166. self.proc.childConnectionLost(self.name, reason)
  167. class ProcessReader(abstract.FileDescriptor):
  168. """
  169. ProcessReader
  170. I am a selectable representation of a process's output pipe, such as
  171. stdout and stderr.
  172. """
  173. connected = True
  174. def __init__(self, reactor, proc, name, fileno):
  175. """
  176. Initialize, specifying a process to connect to.
  177. """
  178. abstract.FileDescriptor.__init__(self, reactor)
  179. fdesc.setNonBlocking(fileno)
  180. self.proc = proc
  181. self.name = name
  182. self.fd = fileno
  183. self.startReading()
  184. def fileno(self):
  185. """
  186. Return the fileno() of my process's stderr.
  187. """
  188. return self.fd
  189. def writeSomeData(self, data):
  190. # the only time this is actually called is after .loseConnection Any
  191. # actual write attempt would fail, so we must avoid that. This hack
  192. # allows us to use .loseConnection on both readers and writers.
  193. assert data == b""
  194. return CONNECTION_LOST
  195. def doRead(self):
  196. """
  197. This is called when the pipe becomes readable.
  198. """
  199. return fdesc.readFromFD(self.fd, self.dataReceived)
  200. def dataReceived(self, data):
  201. self.proc.childDataReceived(self.name, data)
  202. def loseConnection(self):
  203. if self.connected and not self.disconnecting:
  204. self.disconnecting = 1
  205. self.stopReading()
  206. self.reactor.callLater(0, self.connectionLost,
  207. failure.Failure(CONNECTION_DONE))
  208. def connectionLost(self, reason):
  209. """
  210. Close my end of the pipe, signal the Process (which signals the
  211. ProcessProtocol).
  212. """
  213. abstract.FileDescriptor.connectionLost(self, reason)
  214. self.proc.childConnectionLost(self.name, reason)
  215. class _BaseProcess(BaseProcess, object):
  216. """
  217. Base class for Process and PTYProcess.
  218. """
  219. status = None
  220. pid = None
  221. def reapProcess(self):
  222. """
  223. Try to reap a process (without blocking) via waitpid.
  224. This is called when sigchild is caught or a Process object loses its
  225. "connection" (stdout is closed) This ought to result in reaping all
  226. zombie processes, since it will be called twice as often as it needs
  227. to be.
  228. (Unfortunately, this is a slightly experimental approach, since
  229. UNIX has no way to be really sure that your process is going to
  230. go away w/o blocking. I don't want to block.)
  231. """
  232. try:
  233. try:
  234. pid, status = os.waitpid(self.pid, os.WNOHANG)
  235. except OSError as e:
  236. if e.errno == errno.ECHILD:
  237. # no child process
  238. pid = None
  239. else:
  240. raise
  241. except:
  242. log.msg('Failed to reap %d:' % self.pid)
  243. log.err()
  244. pid = None
  245. if pid:
  246. self.processEnded(status)
  247. unregisterReapProcessHandler(pid, self)
  248. def _getReason(self, status):
  249. exitCode = sig = None
  250. if os.WIFEXITED(status):
  251. exitCode = os.WEXITSTATUS(status)
  252. else:
  253. sig = os.WTERMSIG(status)
  254. if exitCode or sig:
  255. return error.ProcessTerminated(exitCode, sig, status)
  256. return error.ProcessDone(status)
  257. def signalProcess(self, signalID):
  258. """
  259. Send the given signal C{signalID} to the process. It'll translate a
  260. few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
  261. representation to its int value, otherwise it'll pass directly the
  262. value provided
  263. @type signalID: C{str} or C{int}
  264. """
  265. if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
  266. signalID = getattr(signal, 'SIG%s' % (signalID,))
  267. if self.pid is None:
  268. raise ProcessExitedAlready()
  269. try:
  270. os.kill(self.pid, signalID)
  271. except OSError as e:
  272. if e.errno == errno.ESRCH:
  273. raise ProcessExitedAlready()
  274. else:
  275. raise
  276. def _resetSignalDisposition(self):
  277. # The Python interpreter ignores some signals, and our child
  278. # process will inherit that behaviour. To have a child process
  279. # that responds to signals normally, we need to reset our
  280. # child process's signal handling (just) after we fork and
  281. # before we execvpe.
  282. for signalnum in range(1, signal.NSIG):
  283. if signal.getsignal(signalnum) == signal.SIG_IGN:
  284. # Reset signal handling to the default
  285. signal.signal(signalnum, signal.SIG_DFL)
  286. def _fork(self, path, uid, gid, executable, args, environment, **kwargs):
  287. """
  288. Fork and then exec sub-process.
  289. @param path: the path where to run the new process.
  290. @type path: L{bytes} or L{unicode}
  291. @param uid: if defined, the uid used to run the new process.
  292. @type uid: L{int}
  293. @param gid: if defined, the gid used to run the new process.
  294. @type gid: L{int}
  295. @param executable: the executable to run in a new process.
  296. @type executable: L{str}
  297. @param args: arguments used to create the new process.
  298. @type args: L{list}.
  299. @param environment: environment used for the new process.
  300. @type environment: L{dict}.
  301. @param kwargs: keyword arguments to L{_setupChild} method.
  302. """
  303. collectorEnabled = gc.isenabled()
  304. gc.disable()
  305. try:
  306. self.pid = os.fork()
  307. except:
  308. # Still in the parent process
  309. if collectorEnabled:
  310. gc.enable()
  311. raise
  312. else:
  313. if self.pid == 0:
  314. # A return value of 0 from fork() indicates that we are now
  315. # executing in the child process.
  316. # Do not put *ANY* code outside the try block. The child
  317. # process must either exec or _exit. If it gets outside this
  318. # block (due to an exception that is not handled here, but
  319. # which might be handled higher up), there will be two copies
  320. # of the parent running in parallel, doing all kinds of damage.
  321. # After each change to this code, review it to make sure there
  322. # are no exit paths.
  323. try:
  324. # Stop debugging. If I am, I don't care anymore.
  325. sys.settrace(None)
  326. self._setupChild(**kwargs)
  327. self._execChild(path, uid, gid, executable, args,
  328. environment)
  329. except:
  330. # If there are errors, try to write something descriptive
  331. # to stderr before exiting.
  332. # The parent's stderr isn't *necessarily* fd 2 anymore, or
  333. # even still available; however, even libc assumes that
  334. # write(2, err) is a useful thing to attempt.
  335. try:
  336. stderr = os.fdopen(2, 'wb')
  337. msg = ("Upon execvpe {0} {1} in environment id {2}"
  338. "\n:").format(executable, str(args),
  339. id(environment))
  340. if _PY3:
  341. # On Python 3, print_exc takes a text stream, but
  342. # on Python 2 it still takes a byte stream. So on
  343. # Python 3 we will wrap up the byte stream returned
  344. # by os.fdopen using TextIOWrapper.
  345. # We hard-code UTF-8 as the encoding here, rather
  346. # than looking at something like
  347. # getfilesystemencoding() or sys.stderr.encoding,
  348. # because we want an encoding that will be able to
  349. # encode the full range of code points. We are
  350. # (most likely) talking to the parent process on
  351. # the other end of this pipe and not the filesystem
  352. # or the original sys.stderr, so there's no point
  353. # in trying to match the encoding of one of those
  354. # objects.
  355. stderr = io.TextIOWrapper(stderr, encoding="utf-8")
  356. stderr.write(msg)
  357. traceback.print_exc(file=stderr)
  358. stderr.flush()
  359. for fd in range(3):
  360. os.close(fd)
  361. except:
  362. # Handle all errors during the error-reporting process
  363. # silently to ensure that the child terminates.
  364. pass
  365. # See comment above about making sure that we reach this line
  366. # of code.
  367. os._exit(1)
  368. # we are now in parent process
  369. if collectorEnabled:
  370. gc.enable()
  371. self.status = -1 # this records the exit status of the child
  372. def _setupChild(self, *args, **kwargs):
  373. """
  374. Setup the child process. Override in subclasses.
  375. """
  376. raise NotImplementedError()
  377. def _execChild(self, path, uid, gid, executable, args, environment):
  378. """
  379. The exec() which is done in the forked child.
  380. """
  381. if path:
  382. os.chdir(path)
  383. if uid is not None or gid is not None:
  384. if uid is None:
  385. uid = os.geteuid()
  386. if gid is None:
  387. gid = os.getegid()
  388. # set the UID before I actually exec the process
  389. os.setuid(0)
  390. os.setgid(0)
  391. switchUID(uid, gid)
  392. os.execvpe(executable, args, environment)
  393. def __repr__(self):
  394. """
  395. String representation of a process.
  396. """
  397. return "<%s pid=%s status=%s>" % (self.__class__.__name__,
  398. self.pid, self.status)
  399. class _FDDetector(object):
  400. """
  401. This class contains the logic necessary to decide which of the available
  402. system techniques should be used to detect the open file descriptors for
  403. the current process. The chosen technique gets monkey-patched into the
  404. _listOpenFDs method of this class so that the detection only needs to occur
  405. once.
  406. @ivar listdir: The implementation of listdir to use. This gets overwritten
  407. by the test cases.
  408. @ivar getpid: The implementation of getpid to use, returns the PID of the
  409. running process.
  410. @ivar openfile: The implementation of open() to use, by default the Python
  411. builtin.
  412. """
  413. # So that we can unit test this
  414. listdir = os.listdir
  415. getpid = os.getpid
  416. openfile = open
  417. def __init__(self):
  418. self._implementations = [
  419. self._procFDImplementation, self._devFDImplementation,
  420. self._fallbackFDImplementation]
  421. def _listOpenFDs(self):
  422. """
  423. Return an iterable of file descriptors which I{may} be open in this
  424. process.
  425. This will try to return the fewest possible descriptors without missing
  426. any.
  427. """
  428. self._listOpenFDs = self._getImplementation()
  429. return self._listOpenFDs()
  430. def _getImplementation(self):
  431. """
  432. Pick a method which gives correct results for C{_listOpenFDs} in this
  433. runtime environment.
  434. This involves a lot of very platform-specific checks, some of which may
  435. be relatively expensive. Therefore the returned method should be saved
  436. and re-used, rather than always calling this method to determine what it
  437. is.
  438. See the implementation for the details of how a method is selected.
  439. """
  440. for impl in self._implementations:
  441. try:
  442. before = impl()
  443. except:
  444. continue
  445. with self.openfile("/dev/null", "r"):
  446. after = impl()
  447. if before != after:
  448. return impl
  449. # If no implementation can detect the newly opened file above, then just
  450. # return the last one. The last one should therefore always be one
  451. # which makes a simple static guess which includes all possible open
  452. # file descriptors, but perhaps also many other values which do not
  453. # correspond to file descriptors. For example, the scheme implemented
  454. # by _fallbackFDImplementation is suitable to be the last entry.
  455. return impl
  456. def _devFDImplementation(self):
  457. """
  458. Simple implementation for systems where /dev/fd actually works.
  459. See: http://www.freebsd.org/cgi/man.cgi?fdescfs
  460. """
  461. dname = "/dev/fd"
  462. result = [int(fd) for fd in self.listdir(dname)]
  463. return result
  464. def _procFDImplementation(self):
  465. """
  466. Simple implementation for systems where /proc/pid/fd exists (we assume
  467. it works).
  468. """
  469. dname = "/proc/%d/fd" % (self.getpid(),)
  470. return [int(fd) for fd in self.listdir(dname)]
  471. def _fallbackFDImplementation(self):
  472. """
  473. Fallback implementation where either the resource module can inform us
  474. about the upper bound of how many FDs to expect, or where we just guess
  475. a constant maximum if there is no resource module.
  476. All possible file descriptors from 0 to that upper bound are returned
  477. with no attempt to exclude invalid file descriptor values.
  478. """
  479. try:
  480. import resource
  481. except ImportError:
  482. maxfds = 1024
  483. else:
  484. # OS-X reports 9223372036854775808. That's a lot of fds to close.
  485. # OS-X should get the /dev/fd implementation instead, so mostly
  486. # this check probably isn't necessary.
  487. maxfds = min(1024, resource.getrlimit(resource.RLIMIT_NOFILE)[1])
  488. return range(maxfds)
  489. detector = _FDDetector()
  490. def _listOpenFDs():
  491. """
  492. Use the global detector object to figure out which FD implementation to
  493. use.
  494. """
  495. return detector._listOpenFDs()
  496. @implementer(IProcessTransport)
  497. class Process(_BaseProcess):
  498. """
  499. An operating-system Process.
  500. This represents an operating-system process with arbitrary input/output
  501. pipes connected to it. Those pipes may represent standard input,
  502. standard output, and standard error, or any other file descriptor.
  503. On UNIX, this is implemented using fork(), exec(), pipe()
  504. and fcntl(). These calls may not exist elsewhere so this
  505. code is not cross-platform. (also, windows can only select
  506. on sockets...)
  507. """
  508. debug = False
  509. debug_child = False
  510. status = -1
  511. pid = None
  512. processWriterFactory = ProcessWriter
  513. processReaderFactory = ProcessReader
  514. def __init__(self,
  515. reactor, executable, args, environment, path, proto,
  516. uid=None, gid=None, childFDs=None):
  517. """
  518. Spawn an operating-system process.
  519. This is where the hard work of disconnecting all currently open
  520. files / forking / executing the new process happens. (This is
  521. executed automatically when a Process is instantiated.)
  522. This will also run the subprocess as a given user ID and group ID, if
  523. specified. (Implementation Note: this doesn't support all the arcane
  524. nuances of setXXuid on UNIX: it will assume that either your effective
  525. or real UID is 0.)
  526. """
  527. if not proto:
  528. assert 'r' not in childFDs.values()
  529. assert 'w' not in childFDs.values()
  530. _BaseProcess.__init__(self, proto)
  531. self.pipes = {}
  532. # keys are childFDs, we can sense them closing
  533. # values are ProcessReader/ProcessWriters
  534. helpers = {}
  535. # keys are childFDs
  536. # values are parentFDs
  537. if childFDs is None:
  538. childFDs = {0: "w", # we write to the child's stdin
  539. 1: "r", # we read from their stdout
  540. 2: "r", # and we read from their stderr
  541. }
  542. debug = self.debug
  543. if debug: print("childFDs", childFDs)
  544. _openedPipes = []
  545. def pipe():
  546. r, w = os.pipe()
  547. _openedPipes.extend([r, w])
  548. return r, w
  549. # fdmap.keys() are filenos of pipes that are used by the child.
  550. fdmap = {} # maps childFD to parentFD
  551. try:
  552. for childFD, target in items(childFDs):
  553. if debug: print("[%d]" % childFD, target)
  554. if target == "r":
  555. # we need a pipe that the parent can read from
  556. readFD, writeFD = pipe()
  557. if debug: print("readFD=%d, writeFD=%d" % (readFD, writeFD))
  558. fdmap[childFD] = writeFD # child writes to this
  559. helpers[childFD] = readFD # parent reads from this
  560. elif target == "w":
  561. # we need a pipe that the parent can write to
  562. readFD, writeFD = pipe()
  563. if debug: print("readFD=%d, writeFD=%d" % (readFD, writeFD))
  564. fdmap[childFD] = readFD # child reads from this
  565. helpers[childFD] = writeFD # parent writes to this
  566. else:
  567. assert type(target) == int, '%r should be an int' % (target,)
  568. fdmap[childFD] = target # parent ignores this
  569. if debug: print("fdmap", fdmap)
  570. if debug: print("helpers", helpers)
  571. # the child only cares about fdmap.values()
  572. self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap)
  573. except:
  574. for pipe in _openedPipes:
  575. os.close(pipe)
  576. raise
  577. # we are the parent process:
  578. self.proto = proto
  579. # arrange for the parent-side pipes to be read and written
  580. for childFD, parentFD in items(helpers):
  581. os.close(fdmap[childFD])
  582. if childFDs[childFD] == "r":
  583. reader = self.processReaderFactory(reactor, self, childFD,
  584. parentFD)
  585. self.pipes[childFD] = reader
  586. if childFDs[childFD] == "w":
  587. writer = self.processWriterFactory(reactor, self, childFD,
  588. parentFD, forceReadHack=True)
  589. self.pipes[childFD] = writer
  590. try:
  591. # the 'transport' is used for some compatibility methods
  592. if self.proto is not None:
  593. self.proto.makeConnection(self)
  594. except:
  595. log.err()
  596. # The reactor might not be running yet. This might call back into
  597. # processEnded synchronously, triggering an application-visible
  598. # callback. That's probably not ideal. The replacement API for
  599. # spawnProcess should improve upon this situation.
  600. registerReapProcessHandler(self.pid, self)
  601. def _setupChild(self, fdmap):
  602. """
  603. fdmap[childFD] = parentFD
  604. The child wants to end up with 'childFD' attached to what used to be
  605. the parent's parentFD. As an example, a bash command run like
  606. 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
  607. 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
  608. This is accomplished in two steps::
  609. 1. close all file descriptors that aren't values of fdmap. This
  610. means 0 .. maxfds (or just the open fds within that range, if
  611. the platform supports '/proc/<pid>/fd').
  612. 2. for each childFD::
  613. - if fdmap[childFD] == childFD, the descriptor is already in
  614. place. Make sure the CLOEXEC flag is not set, then delete
  615. the entry from fdmap.
  616. - if childFD is in fdmap.values(), then the target descriptor
  617. is busy. Use os.dup() to move it elsewhere, update all
  618. fdmap[childFD] items that point to it, then close the
  619. original. Then fall through to the next case.
  620. - now fdmap[childFD] is not in fdmap.values(), and is free.
  621. Use os.dup2() to move it to the right place, then close the
  622. original.
  623. """
  624. debug = self.debug_child
  625. if debug:
  626. errfd = sys.stderr
  627. errfd.write("starting _setupChild\n")
  628. destList = fdmap.values()
  629. for fd in _listOpenFDs():
  630. if fd in destList:
  631. continue
  632. if debug and fd == errfd.fileno():
  633. continue
  634. try:
  635. os.close(fd)
  636. except:
  637. pass
  638. # at this point, the only fds still open are the ones that need to
  639. # be moved to their appropriate positions in the child (the targets
  640. # of fdmap, i.e. fdmap.values() )
  641. if debug: print("fdmap", fdmap, file=errfd)
  642. for child in sorted(fdmap.keys()):
  643. target = fdmap[child]
  644. if target == child:
  645. # fd is already in place
  646. if debug: print("%d already in place" % target, file=errfd)
  647. fdesc._unsetCloseOnExec(child)
  648. else:
  649. if child in fdmap.values():
  650. # we can't replace child-fd yet, as some other mapping
  651. # still needs the fd it wants to target. We must preserve
  652. # that old fd by duping it to a new home.
  653. newtarget = os.dup(child) # give it a safe home
  654. if debug: print("os.dup(%d) -> %d" % (child, newtarget),
  655. file=errfd)
  656. os.close(child) # close the original
  657. for c, p in items(fdmap):
  658. if p == child:
  659. fdmap[c] = newtarget # update all pointers
  660. # now it should be available
  661. if debug: print("os.dup2(%d,%d)" % (target, child), file=errfd)
  662. os.dup2(target, child)
  663. # At this point, the child has everything it needs. We want to close
  664. # everything that isn't going to be used by the child, i.e.
  665. # everything not in fdmap.keys(). The only remaining fds open are
  666. # those in fdmap.values().
  667. # Any given fd may appear in fdmap.values() multiple times, so we
  668. # need to remove duplicates first.
  669. old = []
  670. for fd in fdmap.values():
  671. if not fd in old:
  672. if not fd in fdmap.keys():
  673. old.append(fd)
  674. if debug: print("old", old, file=errfd)
  675. for fd in old:
  676. os.close(fd)
  677. self._resetSignalDisposition()
  678. def writeToChild(self, childFD, data):
  679. self.pipes[childFD].write(data)
  680. def closeChildFD(self, childFD):
  681. # for writer pipes, loseConnection tries to write the remaining data
  682. # out to the pipe before closing it
  683. # if childFD is not in the list of pipes, assume that it is already
  684. # closed
  685. if childFD in self.pipes:
  686. self.pipes[childFD].loseConnection()
  687. def pauseProducing(self):
  688. for p in self.pipes.itervalues():
  689. if isinstance(p, ProcessReader):
  690. p.stopReading()
  691. def resumeProducing(self):
  692. for p in self.pipes.itervalues():
  693. if isinstance(p, ProcessReader):
  694. p.startReading()
  695. # compatibility
  696. def closeStdin(self):
  697. """
  698. Call this to close standard input on this process.
  699. """
  700. self.closeChildFD(0)
  701. def closeStdout(self):
  702. self.closeChildFD(1)
  703. def closeStderr(self):
  704. self.closeChildFD(2)
  705. def loseConnection(self):
  706. self.closeStdin()
  707. self.closeStderr()
  708. self.closeStdout()
  709. def write(self, data):
  710. """
  711. Call this to write to standard input on this process.
  712. NOTE: This will silently lose data if there is no standard input.
  713. """
  714. if 0 in self.pipes:
  715. self.pipes[0].write(data)
  716. def registerProducer(self, producer, streaming):
  717. """
  718. Call this to register producer for standard input.
  719. If there is no standard input producer.stopProducing() will
  720. be called immediately.
  721. """
  722. if 0 in self.pipes:
  723. self.pipes[0].registerProducer(producer, streaming)
  724. else:
  725. producer.stopProducing()
  726. def unregisterProducer(self):
  727. """
  728. Call this to unregister producer for standard input."""
  729. if 0 in self.pipes:
  730. self.pipes[0].unregisterProducer()
  731. def writeSequence(self, seq):
  732. """
  733. Call this to write to standard input on this process.
  734. NOTE: This will silently lose data if there is no standard input.
  735. """
  736. if 0 in self.pipes:
  737. self.pipes[0].writeSequence(seq)
  738. def childDataReceived(self, name, data):
  739. self.proto.childDataReceived(name, data)
  740. def childConnectionLost(self, childFD, reason):
  741. # this is called when one of the helpers (ProcessReader or
  742. # ProcessWriter) notices their pipe has been closed
  743. os.close(self.pipes[childFD].fileno())
  744. del self.pipes[childFD]
  745. try:
  746. self.proto.childConnectionLost(childFD)
  747. except:
  748. log.err()
  749. self.maybeCallProcessEnded()
  750. def maybeCallProcessEnded(self):
  751. # we don't call ProcessProtocol.processEnded until:
  752. # the child has terminated, AND
  753. # all writers have indicated an error status, AND
  754. # all readers have indicated EOF
  755. # This insures that we've gathered all output from the process.
  756. if self.pipes:
  757. return
  758. if not self.lostProcess:
  759. self.reapProcess()
  760. return
  761. _BaseProcess.maybeCallProcessEnded(self)
  762. @implementer(IProcessTransport)
  763. class PTYProcess(abstract.FileDescriptor, _BaseProcess):
  764. """
  765. An operating-system Process that uses PTY support.
  766. """
  767. status = -1
  768. pid = None
  769. def __init__(self, reactor, executable, args, environment, path, proto,
  770. uid=None, gid=None, usePTY=None):
  771. """
  772. Spawn an operating-system process.
  773. This is where the hard work of disconnecting all currently open
  774. files / forking / executing the new process happens. (This is
  775. executed automatically when a Process is instantiated.)
  776. This will also run the subprocess as a given user ID and group ID, if
  777. specified. (Implementation Note: this doesn't support all the arcane
  778. nuances of setXXuid on UNIX: it will assume that either your effective
  779. or real UID is 0.)
  780. """
  781. if pty is None and not isinstance(usePTY, (tuple, list)):
  782. # no pty module and we didn't get a pty to use
  783. raise NotImplementedError(
  784. "cannot use PTYProcess on platforms without the pty module.")
  785. abstract.FileDescriptor.__init__(self, reactor)
  786. _BaseProcess.__init__(self, proto)
  787. if isinstance(usePTY, (tuple, list)):
  788. masterfd, slavefd, _ = usePTY
  789. else:
  790. masterfd, slavefd = pty.openpty()
  791. try:
  792. self._fork(path, uid, gid, executable, args, environment,
  793. masterfd=masterfd, slavefd=slavefd)
  794. except:
  795. if not isinstance(usePTY, (tuple, list)):
  796. os.close(masterfd)
  797. os.close(slavefd)
  798. raise
  799. # we are now in parent process:
  800. os.close(slavefd)
  801. fdesc.setNonBlocking(masterfd)
  802. self.fd = masterfd
  803. self.startReading()
  804. self.connected = 1
  805. self.status = -1
  806. try:
  807. self.proto.makeConnection(self)
  808. except:
  809. log.err()
  810. registerReapProcessHandler(self.pid, self)
  811. def _setupChild(self, masterfd, slavefd):
  812. """
  813. Set up child process after C{fork()} but before C{exec()}.
  814. This involves:
  815. - closing C{masterfd}, since it is not used in the subprocess
  816. - creating a new session with C{os.setsid}
  817. - changing the controlling terminal of the process (and the new
  818. session) to point at C{slavefd}
  819. - duplicating C{slavefd} to standard input, output, and error
  820. - closing all other open file descriptors (according to
  821. L{_listOpenFDs})
  822. - re-setting all signal handlers to C{SIG_DFL}
  823. @param masterfd: The master end of a PTY file descriptors opened with
  824. C{openpty}.
  825. @type masterfd: L{int}
  826. @param slavefd: The slave end of a PTY opened with C{openpty}.
  827. @type slavefd: L{int}
  828. """
  829. os.close(masterfd)
  830. os.setsid()
  831. fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
  832. for fd in range(3):
  833. if fd != slavefd:
  834. os.close(fd)
  835. os.dup2(slavefd, 0) # stdin
  836. os.dup2(slavefd, 1) # stdout
  837. os.dup2(slavefd, 2) # stderr
  838. for fd in _listOpenFDs():
  839. if fd > 2:
  840. try:
  841. os.close(fd)
  842. except:
  843. pass
  844. self._resetSignalDisposition()
  845. def closeStdin(self):
  846. # PTYs do not have stdin/stdout/stderr. They only have in and out, just
  847. # like sockets. You cannot close one without closing off the entire PTY
  848. pass
  849. def closeStdout(self):
  850. pass
  851. def closeStderr(self):
  852. pass
  853. def doRead(self):
  854. """
  855. Called when my standard output stream is ready for reading.
  856. """
  857. return fdesc.readFromFD(
  858. self.fd,
  859. lambda data: self.proto.childDataReceived(1, data))
  860. def fileno(self):
  861. """
  862. This returns the file number of standard output on this process.
  863. """
  864. return self.fd
  865. def maybeCallProcessEnded(self):
  866. # two things must happen before we call the ProcessProtocol's
  867. # processEnded method. 1: the child process must die and be reaped
  868. # (which calls our own processEnded method). 2: the child must close
  869. # their stdin/stdout/stderr fds, causing the pty to close, causing
  870. # our connectionLost method to be called. #2 can also be triggered
  871. # by calling .loseConnection().
  872. if self.lostProcess == 2:
  873. _BaseProcess.maybeCallProcessEnded(self)
  874. def connectionLost(self, reason):
  875. """
  876. I call this to clean up when one or all of my connections has died.
  877. """
  878. abstract.FileDescriptor.connectionLost(self, reason)
  879. os.close(self.fd)
  880. self.lostProcess += 1
  881. self.maybeCallProcessEnded()
  882. def writeSomeData(self, data):
  883. """
  884. Write some data to the open process.
  885. """
  886. return fdesc.writeToFD(self.fd, data)