_posixstdio.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. # -*- test-case-name: twisted.test.test_stdio -*-
  2. """Standard input/out/err support.
  3. Future Plans::
  4. support for stderr, perhaps
  5. Rewrite to use the reactor instead of an ad-hoc mechanism for connecting
  6. protocols to transport.
  7. Maintainer: James Y Knight
  8. """
  9. from __future__ import annotations
  10. from zope.interface import implementer
  11. from twisted.internet import interfaces, process
  12. from twisted.internet.interfaces import IProtocol, IReactorFDSet
  13. from twisted.logger import Logger
  14. from twisted.python.failure import Failure
  15. _log = Logger()
  16. @implementer(interfaces.IAddress)
  17. class PipeAddress:
  18. pass
  19. @implementer(
  20. interfaces.ITransport,
  21. interfaces.IProducer,
  22. interfaces.IConsumer,
  23. interfaces.IHalfCloseableDescriptor,
  24. )
  25. class StandardIO:
  26. _reader = None
  27. _writer = None
  28. disconnected = False
  29. disconnecting = False
  30. def __init__(
  31. self,
  32. proto: IProtocol,
  33. stdin: int = 0,
  34. stdout: int = 1,
  35. reactor: IReactorFDSet | None = None,
  36. ):
  37. if reactor is None:
  38. from twisted.internet import reactor # type:ignore[assignment]
  39. self.protocol: IProtocol = proto
  40. self._writer = process.ProcessWriter(reactor, self, "write", stdout)
  41. self._reader = process.ProcessReader(reactor, self, "read", stdin)
  42. self._reader.startReading()
  43. self.protocol.makeConnection(self)
  44. # ITransport
  45. # XXX Actually, see #3597.
  46. def loseWriteConnection(self):
  47. if self._writer is not None:
  48. self._writer.loseConnection()
  49. def write(self, data):
  50. if self._writer is not None:
  51. self._writer.write(data)
  52. def writeSequence(self, data):
  53. if self._writer is not None:
  54. self._writer.writeSequence(data)
  55. def loseConnection(self):
  56. self.disconnecting = True
  57. if self._writer is not None:
  58. self._writer.loseConnection()
  59. if self._reader is not None:
  60. # Don't loseConnection, because we don't want to SIGPIPE it.
  61. self._reader.stopReading()
  62. def getPeer(self):
  63. return PipeAddress()
  64. def getHost(self):
  65. return PipeAddress()
  66. # Callbacks from process.ProcessReader/ProcessWriter
  67. def childDataReceived(self, fd: str, data: bytes) -> None:
  68. self.protocol.dataReceived(data)
  69. def childConnectionLost(self, fd: str, reason: Failure) -> None:
  70. if self.disconnected:
  71. return
  72. if fd == "read":
  73. self._readConnectionLost(reason)
  74. else:
  75. self._writeConnectionLost(reason)
  76. def connectionLost(self, reason):
  77. self.disconnected = True
  78. # Make sure to cleanup the other half
  79. _reader = self._reader
  80. _writer = self._writer
  81. protocol = self.protocol
  82. self._reader = self._writer = None
  83. self.protocol = None # type:ignore[assignment]
  84. if _writer is not None and not _writer.disconnected:
  85. _writer.connectionLost(reason)
  86. if _reader is not None and not _reader.disconnected:
  87. _reader.connectionLost(reason)
  88. with _log.failuresHandled("while calling stdio connectionLost:"):
  89. protocol.connectionLost(reason)
  90. def _writeConnectionLost(self, reason: Failure) -> None:
  91. self._writer = None
  92. if self.disconnecting:
  93. self.connectionLost(reason)
  94. return
  95. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  96. if p:
  97. with _log.failuresHandled(
  98. "while calling stdio writeConnectionLost:"
  99. ) as wcl:
  100. p.writeConnectionLost()
  101. if wcl.failed:
  102. self.connectionLost(wcl.failure)
  103. def _readConnectionLost(self, reason: Failure) -> None:
  104. self._reader = None
  105. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  106. if p:
  107. with _log.failuresHandled("while calling stdio readConnectionLost:") as rcl:
  108. p.readConnectionLost()
  109. if rcl.failed:
  110. self.connectionLost(rcl.failure)
  111. else:
  112. self.connectionLost(reason)
  113. # IConsumer
  114. def registerProducer(self, producer, streaming):
  115. if self._writer is None:
  116. producer.stopProducing()
  117. else:
  118. self._writer.registerProducer(producer, streaming)
  119. def unregisterProducer(self):
  120. if self._writer is not None:
  121. self._writer.unregisterProducer()
  122. # IProducer
  123. def stopProducing(self):
  124. self.loseConnection()
  125. def pauseProducing(self):
  126. if self._reader is not None:
  127. self._reader.pauseProducing()
  128. def resumeProducing(self):
  129. if self._reader is not None:
  130. self._reader.resumeProducing()
  131. def stopReading(self):
  132. """Compatibility only, don't use. Call pauseProducing."""
  133. self.pauseProducing()
  134. def startReading(self):
  135. """Compatibility only, don't use. Call resumeProducing."""
  136. self.resumeProducing()
  137. def readConnectionLost(self, reason):
  138. # L{IHalfCloseableDescriptor.readConnectionLost}
  139. raise NotImplementedError()
  140. def writeConnectionLost(self, reason):
  141. # L{IHalfCloseableDescriptor.writeConnectionLost}
  142. raise NotImplementedError()