_pollingfile.py 8.8 KB


  1. # -*- test-case-name: twisted.internet.test.test_pollingfile -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implements a simple polling interface for file descriptors that don't work with
  6. select() - this is pretty much only useful on Windows.
  7. """
  8. from __future__ import absolute_import, division
  9. from zope.interface import implementer
  10. from twisted.internet.interfaces import IConsumer, IPushProducer
  11. from twisted.python.compat import unicode
  12. MIN_TIMEOUT = 0.000000001
  13. MAX_TIMEOUT = 0.1
  14. class _PollableResource:
  15. active = True
  16. def activate(self):
  17. self.active = True
  18. def deactivate(self):
  19. self.active = False
  20. class _PollingTimer:
  21. # Everything is private here because it is really an implementation detail.
  22. def __init__(self, reactor):
  23. self.reactor = reactor
  24. self._resources = []
  25. self._pollTimer = None
  26. self._currentTimeout = MAX_TIMEOUT
  27. self._paused = False
  28. def _addPollableResource(self, res):
  29. self._resources.append(res)
  30. self._checkPollingState()
  31. def _checkPollingState(self):
  32. for resource in self._resources:
  33. if resource.active:
  34. self._startPolling()
  35. break
  36. else:
  37. self._stopPolling()
  38. def _startPolling(self):
  39. if self._pollTimer is None:
  40. self._pollTimer = self._reschedule()
  41. def _stopPolling(self):
  42. if self._pollTimer is not None:
  43. self._pollTimer.cancel()
  44. self._pollTimer = None
  45. def _pause(self):
  46. self._paused = True
  47. def _unpause(self):
  48. self._paused = False
  49. self._checkPollingState()
  50. def _reschedule(self):
  51. if not self._paused:
  52. return self.reactor.callLater(self._currentTimeout, self._pollEvent)
  53. def _pollEvent(self):
  54. workUnits = 0.
  55. anyActive = []
  56. for resource in self._resources:
  57. if resource.active:
  58. workUnits += resource.checkWork()
  59. # Check AFTER work has been done
  60. if resource.active:
  61. anyActive.append(resource)
  62. newTimeout = self._currentTimeout
  63. if workUnits:
  64. newTimeout = self._currentTimeout / (workUnits + 1.)
  65. if newTimeout < MIN_TIMEOUT:
  66. newTimeout = MIN_TIMEOUT
  67. else:
  68. newTimeout = self._currentTimeout * 2.
  69. if newTimeout > MAX_TIMEOUT:
  70. newTimeout = MAX_TIMEOUT
  71. self._currentTimeout = newTimeout
  72. if anyActive:
  73. self._pollTimer = self._reschedule()
  74. # If we ever (let's hope not) need the above functionality on UNIX, this could
  75. # be factored into a different module.
  76. import win32pipe
  77. import win32file
  78. import win32api
  79. import pywintypes
  80. @implementer(IPushProducer)
  81. class _PollableReadPipe(_PollableResource):
  82. def __init__(self, pipe, receivedCallback, lostCallback):
  83. # security attributes for pipes
  84. self.pipe = pipe
  85. self.receivedCallback = receivedCallback
  86. self.lostCallback = lostCallback
  87. def checkWork(self):
  88. finished = 0
  89. fullDataRead = []
  90. while 1:
  91. try:
  92. buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
  93. # finished = (result == -1)
  94. if not bytesToRead:
  95. break
  96. hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
  97. fullDataRead.append(data)
  98. except win32api.error:
  99. finished = 1
  100. break
  101. dataBuf = b''.join(fullDataRead)
  102. if dataBuf:
  103. self.receivedCallback(dataBuf)
  104. if finished:
  105. self.cleanup()
  106. return len(dataBuf)
  107. def cleanup(self):
  108. self.deactivate()
  109. self.lostCallback()
  110. def close(self):
  111. try:
  112. win32api.CloseHandle(self.pipe)
  113. except pywintypes.error:
  114. # You can't close std handles...?
  115. pass
  116. def stopProducing(self):
  117. self.close()
  118. def pauseProducing(self):
  119. self.deactivate()
  120. def resumeProducing(self):
  121. self.activate()
  122. FULL_BUFFER_SIZE = 64 * 1024
  123. @implementer(IConsumer)
  124. class _PollableWritePipe(_PollableResource):
  125. def __init__(self, writePipe, lostCallback):
  126. self.disconnecting = False
  127. self.producer = None
  128. self.producerPaused = False
  129. self.streamingProducer = 0
  130. self.outQueue = []
  131. self.writePipe = writePipe
  132. self.lostCallback = lostCallback
  133. try:
  134. win32pipe.SetNamedPipeHandleState(writePipe,
  135. win32pipe.PIPE_NOWAIT,
  136. None,
  137. None)
  138. except pywintypes.error:
  139. # Maybe it's an invalid handle. Who knows.
  140. pass
  141. def close(self):
  142. self.disconnecting = True
  143. def bufferFull(self):
  144. if self.producer is not None:
  145. self.producerPaused = True
  146. self.producer.pauseProducing()
  147. def bufferEmpty(self):
  148. if self.producer is not None and ((not self.streamingProducer) or
  149. self.producerPaused):
  150. self.producer.producerPaused = False
  151. self.producer.resumeProducing()
  152. return True
  153. return False
  154. # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
  155. def registerProducer(self, producer, streaming):
  156. """Register to receive data from a producer.
  157. This sets this selectable to be a consumer for a producer. When this
  158. selectable runs out of data on a write() call, it will ask the producer
  159. to resumeProducing(). A producer should implement the IProducer
  160. interface.
  161. FileDescriptor provides some infrastructure for producer methods.
  162. """
  163. if self.producer is not None:
  164. raise RuntimeError(
  165. "Cannot register producer %s, because producer %s was never "
  166. "unregistered." % (producer, self.producer))
  167. if not self.active:
  168. producer.stopProducing()
  169. else:
  170. self.producer = producer
  171. self.streamingProducer = streaming
  172. if not streaming:
  173. producer.resumeProducing()
  174. def unregisterProducer(self):
  175. """Stop consuming data from a producer, without disconnecting.
  176. """
  177. self.producer = None
  178. def writeConnectionLost(self):
  179. self.deactivate()
  180. try:
  181. win32api.CloseHandle(self.writePipe)
  182. except pywintypes.error:
  183. # OMG what
  184. pass
  185. self.lostCallback()
  186. def writeSequence(self, seq):
  187. """
  188. Append a C{list} or C{tuple} of bytes to the output buffer.
  189. @param seq: C{list} or C{tuple} of C{str} instances to be appended to
  190. the output buffer.
  191. @raise TypeError: If C{seq} contains C{unicode}.
  192. """
  193. if unicode in map(type, seq):
  194. raise TypeError("Unicode not allowed in output buffer.")
  195. self.outQueue.extend(seq)
  196. def write(self, data):
  197. """
  198. Append some bytes to the output buffer.
  199. @param data: C{str} to be appended to the output buffer.
  200. @type data: C{str}.
  201. @raise TypeError: If C{data} is C{unicode} instead of C{str}.
  202. """
  203. if isinstance(data, unicode):
  204. raise TypeError("Unicode not allowed in output buffer.")
  205. if self.disconnecting:
  206. return
  207. self.outQueue.append(data)
  208. if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
  209. self.bufferFull()
  210. def checkWork(self):
  211. numBytesWritten = 0
  212. if not self.outQueue:
  213. if self.disconnecting:
  214. self.writeConnectionLost()
  215. return 0
  216. try:
  217. win32file.WriteFile(self.writePipe, b'', None)
  218. except pywintypes.error:
  219. self.writeConnectionLost()
  220. return numBytesWritten
  221. while self.outQueue:
  222. data = self.outQueue.pop(0)
  223. errCode = 0
  224. try:
  225. errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
  226. data, None)
  227. except win32api.error:
  228. self.writeConnectionLost()
  229. break
  230. else:
  231. # assert not errCode, "wtf an error code???"
  232. numBytesWritten += nBytesWritten
  233. if len(data) > nBytesWritten:
  234. self.outQueue.insert(0, data[nBytesWritten:])
  235. break
  236. else:
  237. resumed = self.bufferEmpty()
  238. if not resumed and self.disconnecting:
  239. self.writeConnectionLost()
  240. return numBytesWritten