_pollingfile.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # -*- test-case-name: twisted.internet.test.test_pollingfile -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implements a simple polling interface for file descriptors that don't work with
  6. select() - this is pretty much only useful on Windows.
  7. """
  8. from zope.interface import implementer
  9. from twisted.internet.interfaces import IConsumer, IPushProducer
  10. MIN_TIMEOUT = 0.000000001
  11. MAX_TIMEOUT = 0.1
  12. class _PollableResource:
  13. active = True
  14. def activate(self):
  15. self.active = True
  16. def deactivate(self):
  17. self.active = False
  18. class _PollingTimer:
  19. # Everything is private here because it is really an implementation detail.
  20. def __init__(self, reactor):
  21. self.reactor = reactor
  22. self._resources = []
  23. self._pollTimer = None
  24. self._currentTimeout = MAX_TIMEOUT
  25. self._paused = False
  26. def _addPollableResource(self, res):
  27. self._resources.append(res)
  28. self._checkPollingState()
  29. def _checkPollingState(self):
  30. for resource in self._resources:
  31. if resource.active:
  32. self._startPolling()
  33. break
  34. else:
  35. self._stopPolling()
  36. def _startPolling(self):
  37. if self._pollTimer is None:
  38. self._pollTimer = self._reschedule()
  39. def _stopPolling(self):
  40. if self._pollTimer is not None:
  41. self._pollTimer.cancel()
  42. self._pollTimer = None
  43. def _pause(self):
  44. self._paused = True
  45. def _unpause(self):
  46. self._paused = False
  47. self._checkPollingState()
  48. def _reschedule(self):
  49. if not self._paused:
  50. return self.reactor.callLater(self._currentTimeout, self._pollEvent)
  51. def _pollEvent(self):
  52. workUnits = 0.0
  53. anyActive = []
  54. for resource in self._resources:
  55. if resource.active:
  56. workUnits += resource.checkWork()
  57. # Check AFTER work has been done
  58. if resource.active:
  59. anyActive.append(resource)
  60. newTimeout = self._currentTimeout
  61. if workUnits:
  62. newTimeout = self._currentTimeout / (workUnits + 1.0)
  63. if newTimeout < MIN_TIMEOUT:
  64. newTimeout = MIN_TIMEOUT
  65. else:
  66. newTimeout = self._currentTimeout * 2.0
  67. if newTimeout > MAX_TIMEOUT:
  68. newTimeout = MAX_TIMEOUT
  69. self._currentTimeout = newTimeout
  70. if anyActive:
  71. self._pollTimer = self._reschedule()
  72. # If we ever (let's hope not) need the above functionality on UNIX, this could
  73. # be factored into a different module.
  74. import pywintypes
  75. import win32api
  76. import win32file
  77. import win32pipe
  78. @implementer(IPushProducer)
  79. class _PollableReadPipe(_PollableResource):
  80. def __init__(self, pipe, receivedCallback, lostCallback):
  81. # security attributes for pipes
  82. self.pipe = pipe
  83. self.receivedCallback = receivedCallback
  84. self.lostCallback = lostCallback
  85. def checkWork(self):
  86. finished = 0
  87. fullDataRead = []
  88. while 1:
  89. try:
  90. buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
  91. # finished = (result == -1)
  92. if not bytesToRead:
  93. break
  94. hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
  95. fullDataRead.append(data)
  96. except win32api.error:
  97. finished = 1
  98. break
  99. dataBuf = b"".join(fullDataRead)
  100. if dataBuf:
  101. self.receivedCallback(dataBuf)
  102. if finished:
  103. self.cleanup()
  104. return len(dataBuf)
  105. def cleanup(self):
  106. self.deactivate()
  107. self.lostCallback()
  108. def close(self):
  109. try:
  110. win32api.CloseHandle(self.pipe)
  111. except pywintypes.error:
  112. # You can't close std handles...?
  113. pass
  114. def stopProducing(self):
  115. self.close()
  116. def pauseProducing(self):
  117. self.deactivate()
  118. def resumeProducing(self):
  119. self.activate()
  120. FULL_BUFFER_SIZE = 64 * 1024
  121. @implementer(IConsumer)
  122. class _PollableWritePipe(_PollableResource):
  123. def __init__(self, writePipe, lostCallback):
  124. self.disconnecting = False
  125. self.producer = None
  126. self.producerPaused = False
  127. self.streamingProducer = 0
  128. self.outQueue = []
  129. self.writePipe = writePipe
  130. self.lostCallback = lostCallback
  131. try:
  132. win32pipe.SetNamedPipeHandleState(
  133. writePipe, win32pipe.PIPE_NOWAIT, None, None
  134. )
  135. except pywintypes.error:
  136. # Maybe it's an invalid handle. Who knows.
  137. pass
  138. def close(self):
  139. self.disconnecting = True
  140. def bufferFull(self):
  141. if self.producer is not None:
  142. self.producerPaused = True
  143. self.producer.pauseProducing()
  144. def bufferEmpty(self):
  145. if self.producer is not None and (
  146. (not self.streamingProducer) or self.producerPaused
  147. ):
  148. self.producer.producerPaused = False
  149. self.producer.resumeProducing()
  150. return True
  151. return False
  152. # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
  153. def registerProducer(self, producer, streaming):
  154. """Register to receive data from a producer.
  155. This sets this selectable to be a consumer for a producer. When this
  156. selectable runs out of data on a write() call, it will ask the producer
  157. to resumeProducing(). A producer should implement the IProducer
  158. interface.
  159. FileDescriptor provides some infrastructure for producer methods.
  160. """
  161. if self.producer is not None:
  162. raise RuntimeError(
  163. "Cannot register producer %s, because producer %s was never "
  164. "unregistered." % (producer, self.producer)
  165. )
  166. if not self.active:
  167. producer.stopProducing()
  168. else:
  169. self.producer = producer
  170. self.streamingProducer = streaming
  171. if not streaming:
  172. producer.resumeProducing()
  173. def unregisterProducer(self):
  174. """Stop consuming data from a producer, without disconnecting."""
  175. self.producer = None
  176. def writeConnectionLost(self):
  177. self.deactivate()
  178. try:
  179. win32api.CloseHandle(self.writePipe)
  180. except pywintypes.error:
  181. # OMG what
  182. pass
  183. self.lostCallback()
  184. def writeSequence(self, seq):
  185. """
  186. Append a C{list} or C{tuple} of bytes to the output buffer.
  187. @param seq: C{list} or C{tuple} of C{str} instances to be appended to
  188. the output buffer.
  189. @raise TypeError: If C{seq} contains C{unicode}.
  190. """
  191. if str in map(type, seq):
  192. raise TypeError("Unicode not allowed in output buffer.")
  193. self.outQueue.extend(seq)
  194. def write(self, data):
  195. """
  196. Append some bytes to the output buffer.
  197. @param data: C{str} to be appended to the output buffer.
  198. @type data: C{str}.
  199. @raise TypeError: If C{data} is C{unicode} instead of C{str}.
  200. """
  201. if isinstance(data, str):
  202. raise TypeError("Unicode not allowed in output buffer.")
  203. if self.disconnecting:
  204. return
  205. self.outQueue.append(data)
  206. if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
  207. self.bufferFull()
  208. def checkWork(self):
  209. numBytesWritten = 0
  210. if not self.outQueue:
  211. if self.disconnecting:
  212. self.writeConnectionLost()
  213. return 0
  214. try:
  215. win32file.WriteFile(self.writePipe, b"", None)
  216. except pywintypes.error:
  217. self.writeConnectionLost()
  218. return numBytesWritten
  219. while self.outQueue:
  220. data = self.outQueue.pop(0)
  221. errCode = 0
  222. try:
  223. errCode, nBytesWritten = win32file.WriteFile(self.writePipe, data, None)
  224. except win32api.error:
  225. self.writeConnectionLost()
  226. break
  227. else:
  228. # assert not errCode, "wtf an error code???"
  229. numBytesWritten += nBytesWritten
  230. if len(data) > nBytesWritten:
  231. self.outQueue.insert(0, data[nBytesWritten:])
  232. break
  233. else:
  234. resumed = self.bufferEmpty()
  235. if not resumed and self.disconnecting:
  236. self.writeConnectionLost()
  237. return numBytesWritten