_posixstdio.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- test-case-name: twisted.test.test_stdio -*-
  2. """Standard input/out/err support.
  3. Future Plans::
  4. support for stderr, perhaps
  5. Rewrite to use the reactor instead of an ad-hoc mechanism for connecting
  6. protocols to transport.
  7. Maintainer: James Y Knight
  8. """
  9. from zope.interface import implementer
  10. from twisted.internet import error, interfaces, process
  11. from twisted.python import failure, log
  12. @implementer(interfaces.IAddress)
  13. class PipeAddress:
  14. pass
  15. @implementer(
  16. interfaces.ITransport,
  17. interfaces.IProducer,
  18. interfaces.IConsumer,
  19. interfaces.IHalfCloseableDescriptor,
  20. )
  21. class StandardIO:
  22. _reader = None
  23. _writer = None
  24. disconnected = False
  25. disconnecting = False
  26. def __init__(self, proto, stdin=0, stdout=1, reactor=None):
  27. if reactor is None:
  28. from twisted.internet import reactor
  29. self.protocol = proto
  30. self._writer = process.ProcessWriter(reactor, self, "write", stdout)
  31. self._reader = process.ProcessReader(reactor, self, "read", stdin)
  32. self._reader.startReading()
  33. self.protocol.makeConnection(self)
  34. # ITransport
  35. # XXX Actually, see #3597.
  36. def loseWriteConnection(self):
  37. if self._writer is not None:
  38. self._writer.loseConnection()
  39. def write(self, data):
  40. if self._writer is not None:
  41. self._writer.write(data)
  42. def writeSequence(self, data):
  43. if self._writer is not None:
  44. self._writer.writeSequence(data)
  45. def loseConnection(self):
  46. self.disconnecting = True
  47. if self._writer is not None:
  48. self._writer.loseConnection()
  49. if self._reader is not None:
  50. # Don't loseConnection, because we don't want to SIGPIPE it.
  51. self._reader.stopReading()
  52. def getPeer(self):
  53. return PipeAddress()
  54. def getHost(self):
  55. return PipeAddress()
  56. # Callbacks from process.ProcessReader/ProcessWriter
  57. def childDataReceived(self, fd, data):
  58. self.protocol.dataReceived(data)
  59. def childConnectionLost(self, fd, reason):
  60. if self.disconnected:
  61. return
  62. if reason.value.__class__ == error.ConnectionDone:
  63. # Normal close
  64. if fd == "read":
  65. self._readConnectionLost(reason)
  66. else:
  67. self._writeConnectionLost(reason)
  68. else:
  69. self.connectionLost(reason)
  70. def connectionLost(self, reason):
  71. self.disconnected = True
  72. # Make sure to cleanup the other half
  73. _reader = self._reader
  74. _writer = self._writer
  75. protocol = self.protocol
  76. self._reader = self._writer = None
  77. self.protocol = None
  78. if _writer is not None and not _writer.disconnected:
  79. _writer.connectionLost(reason)
  80. if _reader is not None and not _reader.disconnected:
  81. _reader.connectionLost(reason)
  82. try:
  83. protocol.connectionLost(reason)
  84. except BaseException:
  85. log.err()
  86. def _writeConnectionLost(self, reason):
  87. self._writer = None
  88. if self.disconnecting:
  89. self.connectionLost(reason)
  90. return
  91. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  92. if p:
  93. try:
  94. p.writeConnectionLost()
  95. except BaseException:
  96. log.err()
  97. self.connectionLost(failure.Failure())
  98. def _readConnectionLost(self, reason):
  99. self._reader = None
  100. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  101. if p:
  102. try:
  103. p.readConnectionLost()
  104. except BaseException:
  105. log.err()
  106. self.connectionLost(failure.Failure())
  107. else:
  108. self.connectionLost(reason)
  109. # IConsumer
  110. def registerProducer(self, producer, streaming):
  111. if self._writer is None:
  112. producer.stopProducing()
  113. else:
  114. self._writer.registerProducer(producer, streaming)
  115. def unregisterProducer(self):
  116. if self._writer is not None:
  117. self._writer.unregisterProducer()
  118. # IProducer
  119. def stopProducing(self):
  120. self.loseConnection()
  121. def pauseProducing(self):
  122. if self._reader is not None:
  123. self._reader.pauseProducing()
  124. def resumeProducing(self):
  125. if self._reader is not None:
  126. self._reader.resumeProducing()
  127. def stopReading(self):
  128. """Compatibility only, don't use. Call pauseProducing."""
  129. self.pauseProducing()
  130. def startReading(self):
  131. """Compatibility only, don't use. Call resumeProducing."""
  132. self.resumeProducing()
  133. def readConnectionLost(self, reason):
  134. # L{IHalfCloseableDescriptor.readConnectionLost}
  135. raise NotImplementedError()
  136. def writeConnectionLost(self, reason):
  137. # L{IHalfCloseableDescriptor.writeConnectionLost}
  138. raise NotImplementedError()