_win32stdio.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # -*- test-case-name: twisted.test.test_stdio -*-
  2. """
  3. Windows-specific implementation of the L{twisted.internet.stdio} interface.
  4. """
  5. import msvcrt
  6. import os
  7. from zope.interface import implementer
  8. import win32api
  9. from twisted.internet import _pollingfile, main
  10. from twisted.internet.interfaces import (
  11. IAddress,
  12. IConsumer,
  13. IHalfCloseableProtocol,
  14. IPushProducer,
  15. ITransport,
  16. )
  17. from twisted.logger import Logger
  18. from twisted.python.failure import Failure
  19. _log = Logger()
  20. @implementer(IAddress)
  21. class Win32PipeAddress:
  22. pass
  23. @implementer(ITransport, IConsumer, IPushProducer)
  24. class StandardIO(_pollingfile._PollingTimer):
  25. disconnecting = False
  26. disconnected = False
  27. def __init__(self, proto, reactor=None):
  28. """
  29. Start talking to standard IO with the given protocol.
  30. Also, put it stdin/stdout/stderr into binary mode.
  31. """
  32. if reactor is None:
  33. from twisted.internet import reactor
  34. for stdfd in range(0, 1, 2):
  35. msvcrt.setmode(stdfd, os.O_BINARY)
  36. _pollingfile._PollingTimer.__init__(self, reactor)
  37. self.proto = proto
  38. hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
  39. hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
  40. self.stdin = _pollingfile._PollableReadPipe(
  41. hstdin, self.dataReceived, self.readConnectionLost
  42. )
  43. self.stdout = _pollingfile._PollableWritePipe(hstdout, self.writeConnectionLost)
  44. self._addPollableResource(self.stdin)
  45. self._addPollableResource(self.stdout)
  46. self.proto.makeConnection(self)
  47. def dataReceived(self, data):
  48. self.proto.dataReceived(data)
  49. def readConnectionLost(self):
  50. with _log.failuresHandled("read connection lost") as op:
  51. if IHalfCloseableProtocol.providedBy(self.proto):
  52. self.proto.readConnectionLost()
  53. self.checkConnLost()
  54. if not op.succeeded and not self.disconnecting:
  55. self.loseConnection()
  56. def writeConnectionLost(self):
  57. with _log.failuresHandled("write connection lost") as op:
  58. if IHalfCloseableProtocol.providedBy(self.proto):
  59. self.proto.writeConnectionLost()
  60. self.checkConnLost()
  61. if not op.succeeded and not self.disconnecting:
  62. self.loseConnection()
  63. connsLost = 0
  64. def checkConnLost(self):
  65. self.connsLost += 1
  66. if self.connsLost >= 2:
  67. self.disconnecting = True
  68. self.disconnected = True
  69. self.proto.connectionLost(Failure(main.CONNECTION_DONE))
  70. # ITransport
  71. def write(self, data):
  72. self.stdout.write(data)
  73. def writeSequence(self, seq):
  74. self.stdout.write(b"".join(seq))
  75. def loseConnection(self):
  76. self.disconnecting = True
  77. self.stdin.close()
  78. self.stdout.close()
  79. def getPeer(self):
  80. return Win32PipeAddress()
  81. def getHost(self):
  82. return Win32PipeAddress()
  83. # IConsumer
  84. def registerProducer(self, producer, streaming):
  85. return self.stdout.registerProducer(producer, streaming)
  86. def unregisterProducer(self):
  87. return self.stdout.unregisterProducer()
  88. # def write() above
  89. # IProducer
  90. def stopProducing(self):
  91. self.stdin.stopProducing()
  92. # IPushProducer
  93. def pauseProducing(self):
  94. self.stdin.pauseProducing()
  95. def resumeProducing(self):
  96. self.stdin.resumeProducing()