subprocess.py 7.6 KB


  1. __all__ = 'create_subprocess_exec', 'create_subprocess_shell'
  2. import subprocess
  3. from . import events
  4. from . import protocols
  5. from . import streams
  6. from . import tasks
  7. from .log import logger
  8. PIPE = subprocess.PIPE
  9. STDOUT = subprocess.STDOUT
  10. DEVNULL = subprocess.DEVNULL
  11. class SubprocessStreamProtocol(streams.FlowControlMixin,
  12. protocols.SubprocessProtocol):
  13. """Like StreamReaderProtocol, but for a subprocess."""
  14. def __init__(self, limit, loop):
  15. super().__init__(loop=loop)
  16. self._limit = limit
  17. self.stdin = self.stdout = self.stderr = None
  18. self._transport = None
  19. self._process_exited = False
  20. self._pipe_fds = []
  21. self._stdin_closed = self._loop.create_future()
  22. def __repr__(self):
  23. info = [self.__class__.__name__]
  24. if self.stdin is not None:
  25. info.append(f'stdin={self.stdin!r}')
  26. if self.stdout is not None:
  27. info.append(f'stdout={self.stdout!r}')
  28. if self.stderr is not None:
  29. info.append(f'stderr={self.stderr!r}')
  30. return '<{}>'.format(' '.join(info))
  31. def connection_made(self, transport):
  32. self._transport = transport
  33. stdout_transport = transport.get_pipe_transport(1)
  34. if stdout_transport is not None:
  35. self.stdout = streams.StreamReader(limit=self._limit,
  36. loop=self._loop)
  37. self.stdout.set_transport(stdout_transport)
  38. self._pipe_fds.append(1)
  39. stderr_transport = transport.get_pipe_transport(2)
  40. if stderr_transport is not None:
  41. self.stderr = streams.StreamReader(limit=self._limit,
  42. loop=self._loop)
  43. self.stderr.set_transport(stderr_transport)
  44. self._pipe_fds.append(2)
  45. stdin_transport = transport.get_pipe_transport(0)
  46. if stdin_transport is not None:
  47. self.stdin = streams.StreamWriter(stdin_transport,
  48. protocol=self,
  49. reader=None,
  50. loop=self._loop)
  51. def pipe_data_received(self, fd, data):
  52. if fd == 1:
  53. reader = self.stdout
  54. elif fd == 2:
  55. reader = self.stderr
  56. else:
  57. reader = None
  58. if reader is not None:
  59. reader.feed_data(data)
  60. def pipe_connection_lost(self, fd, exc):
  61. if fd == 0:
  62. pipe = self.stdin
  63. if pipe is not None:
  64. pipe.close()
  65. self.connection_lost(exc)
  66. if exc is None:
  67. self._stdin_closed.set_result(None)
  68. else:
  69. self._stdin_closed.set_exception(exc)
  70. # Since calling `wait_closed()` is not mandatory,
  71. # we shouldn't log the traceback if this is not awaited.
  72. self._stdin_closed._log_traceback = False
  73. return
  74. if fd == 1:
  75. reader = self.stdout
  76. elif fd == 2:
  77. reader = self.stderr
  78. else:
  79. reader = None
  80. if reader is not None:
  81. if exc is None:
  82. reader.feed_eof()
  83. else:
  84. reader.set_exception(exc)
  85. if fd in self._pipe_fds:
  86. self._pipe_fds.remove(fd)
  87. self._maybe_close_transport()
  88. def process_exited(self):
  89. self._process_exited = True
  90. self._maybe_close_transport()
  91. def _maybe_close_transport(self):
  92. if len(self._pipe_fds) == 0 and self._process_exited:
  93. self._transport.close()
  94. self._transport = None
  95. def _get_close_waiter(self, stream):
  96. if stream is self.stdin:
  97. return self._stdin_closed
  98. class Process:
  99. def __init__(self, transport, protocol, loop):
  100. self._transport = transport
  101. self._protocol = protocol
  102. self._loop = loop
  103. self.stdin = protocol.stdin
  104. self.stdout = protocol.stdout
  105. self.stderr = protocol.stderr
  106. self.pid = transport.get_pid()
  107. def __repr__(self):
  108. return f'<{self.__class__.__name__} {self.pid}>'
  109. @property
  110. def returncode(self):
  111. return self._transport.get_returncode()
  112. async def wait(self):
  113. """Wait until the process exit and return the process return code."""
  114. return await self._transport._wait()
  115. def send_signal(self, signal):
  116. self._transport.send_signal(signal)
  117. def terminate(self):
  118. self._transport.terminate()
  119. def kill(self):
  120. self._transport.kill()
  121. async def _feed_stdin(self, input):
  122. debug = self._loop.get_debug()
  123. try:
  124. if input is not None:
  125. self.stdin.write(input)
  126. if debug:
  127. logger.debug(
  128. '%r communicate: feed stdin (%s bytes)', self, len(input))
  129. await self.stdin.drain()
  130. except (BrokenPipeError, ConnectionResetError) as exc:
  131. # communicate() ignores BrokenPipeError and ConnectionResetError.
  132. # write() and drain() can raise these exceptions.
  133. if debug:
  134. logger.debug('%r communicate: stdin got %r', self, exc)
  135. if debug:
  136. logger.debug('%r communicate: close stdin', self)
  137. self.stdin.close()
  138. async def _noop(self):
  139. return None
  140. async def _read_stream(self, fd):
  141. transport = self._transport.get_pipe_transport(fd)
  142. if fd == 2:
  143. stream = self.stderr
  144. else:
  145. assert fd == 1
  146. stream = self.stdout
  147. if self._loop.get_debug():
  148. name = 'stdout' if fd == 1 else 'stderr'
  149. logger.debug('%r communicate: read %s', self, name)
  150. output = await stream.read()
  151. if self._loop.get_debug():
  152. name = 'stdout' if fd == 1 else 'stderr'
  153. logger.debug('%r communicate: close %s', self, name)
  154. transport.close()
  155. return output
  156. async def communicate(self, input=None):
  157. if self.stdin is not None:
  158. stdin = self._feed_stdin(input)
  159. else:
  160. stdin = self._noop()
  161. if self.stdout is not None:
  162. stdout = self._read_stream(1)
  163. else:
  164. stdout = self._noop()
  165. if self.stderr is not None:
  166. stderr = self._read_stream(2)
  167. else:
  168. stderr = self._noop()
  169. stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
  170. await self.wait()
  171. return (stdout, stderr)
  172. async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
  173. limit=streams._DEFAULT_LIMIT, **kwds):
  174. loop = events.get_running_loop()
  175. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  176. loop=loop)
  177. transport, protocol = await loop.subprocess_shell(
  178. protocol_factory,
  179. cmd, stdin=stdin, stdout=stdout,
  180. stderr=stderr, **kwds)
  181. return Process(transport, protocol, loop)
  182. async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
  183. stderr=None, limit=streams._DEFAULT_LIMIT,
  184. **kwds):
  185. loop = events.get_running_loop()
  186. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  187. loop=loop)
  188. transport, protocol = await loop.subprocess_exec(
  189. protocol_factory,
  190. program, *args,
  191. stdin=stdin, stdout=stdout,
  192. stderr=stderr, **kwds)
  193. return Process(transport, protocol, loop)