abstract.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Abstract file handle class
  5. """
  6. from twisted.internet import main, error, interfaces
  7. from twisted.internet.abstract import _ConsumerMixin, _LogOwner
  8. from twisted.python import failure
  9. from twisted.python.compat import unicode
  10. from zope.interface import implementer
  11. import errno
  12. from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
  13. from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
  14. from twisted.internet.iocpreactor import iocpsupport as _iocp
  15. @implementer(interfaces.IPushProducer, interfaces.IConsumer,
  16. interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
  17. class FileHandle(_ConsumerMixin, _LogOwner):
  18. """
  19. File handle that can read and write asynchronously
  20. """
  21. # read stuff
  22. maxReadBuffers = 16
  23. readBufferSize = 4096
  24. reading = False
  25. dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
  26. _readNextBuffer = 0
  27. _readSize = 0 # how much data we have in the read buffer
  28. _readScheduled = None
  29. _readScheduledInOS = False
  30. def startReading(self):
  31. self.reactor.addActiveHandle(self)
  32. if not self._readScheduled and not self.reading:
  33. self.reading = True
  34. self._readScheduled = self.reactor.callLater(0,
  35. self._resumeReading)
  36. def stopReading(self):
  37. if self._readScheduled:
  38. self._readScheduled.cancel()
  39. self._readScheduled = None
  40. self.reading = False
  41. def _resumeReading(self):
  42. self._readScheduled = None
  43. if self._dispatchData() and not self._readScheduledInOS:
  44. self.doRead()
  45. def _dispatchData(self):
  46. """
  47. Dispatch previously read data. Return True if self.reading and we don't
  48. have any more data
  49. """
  50. if not self._readSize:
  51. return self.reading
  52. size = self._readSize
  53. full_buffers = size // self.readBufferSize
  54. while self._readNextBuffer < full_buffers:
  55. self.dataReceived(self._readBuffers[self._readNextBuffer])
  56. self._readNextBuffer += 1
  57. if not self.reading:
  58. return False
  59. remainder = size % self.readBufferSize
  60. if remainder:
  61. self.dataReceived(self._readBuffers[full_buffers][0:remainder])
  62. if self.dynamicReadBuffers:
  63. total_buffer_size = self.readBufferSize * len(self._readBuffers)
  64. # we have one buffer too many
  65. if size < total_buffer_size - self.readBufferSize:
  66. del self._readBuffers[-1]
  67. # we filled all buffers, so allocate one more
  68. elif (size == total_buffer_size and
  69. len(self._readBuffers) < self.maxReadBuffers):
  70. self._readBuffers.append(bytearray(self.readBufferSize))
  71. self._readNextBuffer = 0
  72. self._readSize = 0
  73. return self.reading
  74. def _cbRead(self, rc, data, evt):
  75. self._readScheduledInOS = False
  76. if self._handleRead(rc, data, evt):
  77. self.doRead()
  78. def _handleRead(self, rc, data, evt):
  79. """
  80. Returns False if we should stop reading for now
  81. """
  82. if self.disconnected:
  83. return False
  84. # graceful disconnection
  85. if (not (rc or data)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
  86. self.reactor.removeActiveHandle(self)
  87. self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
  88. return False
  89. # XXX: not handling WSAEWOULDBLOCK
  90. # ("too many outstanding overlapped I/O requests")
  91. elif rc:
  92. self.connectionLost(failure.Failure(
  93. error.ConnectionLost("read error -- %s (%s)" %
  94. (errno.errorcode.get(rc, 'unknown'), rc))))
  95. return False
  96. else:
  97. assert self._readSize == 0
  98. assert self._readNextBuffer == 0
  99. self._readSize = data
  100. return self._dispatchData()
  101. def doRead(self):
  102. evt = _iocp.Event(self._cbRead, self)
  103. evt.buff = buff = self._readBuffers
  104. rc, numBytesRead = self.readFromHandle(buff, evt)
  105. if not rc or rc == ERROR_IO_PENDING:
  106. self._readScheduledInOS = True
  107. else:
  108. self._handleRead(rc, numBytesRead, evt)
  109. def readFromHandle(self, bufflist, evt):
  110. raise NotImplementedError() # TODO: this should default to ReadFile
  111. def dataReceived(self, data):
  112. raise NotImplementedError
  113. def readConnectionLost(self, reason):
  114. self.connectionLost(reason)
  115. # write stuff
  116. dataBuffer = b''
  117. offset = 0
  118. writing = False
  119. _writeScheduled = None
  120. _writeDisconnecting = False
  121. _writeDisconnected = False
  122. writeBufferSize = 2**2**2**2
  123. def loseWriteConnection(self):
  124. self._writeDisconnecting = True
  125. self.startWriting()
  126. def _closeWriteConnection(self):
  127. # override in subclasses
  128. pass
  129. def writeConnectionLost(self, reason):
  130. # in current code should never be called
  131. self.connectionLost(reason)
  132. def startWriting(self):
  133. self.reactor.addActiveHandle(self)
  134. if not self._writeScheduled and not self.writing:
  135. self.writing = True
  136. self._writeScheduled = self.reactor.callLater(0,
  137. self._resumeWriting)
  138. def stopWriting(self):
  139. if self._writeScheduled:
  140. self._writeScheduled.cancel()
  141. self._writeScheduled = None
  142. self.writing = False
  143. def _resumeWriting(self):
  144. self._writeScheduled = None
  145. self.doWrite()
  146. def _cbWrite(self, rc, numBytesWritten, evt):
  147. if self._handleWrite(rc, numBytesWritten, evt):
  148. self.doWrite()
  149. def _handleWrite(self, rc, numBytesWritten, evt):
  150. """
  151. Returns false if we should stop writing for now
  152. """
  153. if self.disconnected or self._writeDisconnected:
  154. return False
  155. # XXX: not handling WSAEWOULDBLOCK
  156. # ("too many outstanding overlapped I/O requests")
  157. if rc:
  158. self.connectionLost(failure.Failure(
  159. error.ConnectionLost("write error -- %s (%s)" %
  160. (errno.errorcode.get(rc, 'unknown'), rc))))
  161. return False
  162. else:
  163. self.offset += numBytesWritten
  164. # If there is nothing left to send,
  165. if self.offset == len(self.dataBuffer) and not self._tempDataLen:
  166. self.dataBuffer = b""
  167. self.offset = 0
  168. # stop writing
  169. self.stopWriting()
  170. # If I've got a producer who is supposed to supply me with data
  171. if self.producer is not None and ((not self.streamingProducer)
  172. or self.producerPaused):
  173. # tell them to supply some more.
  174. self.producerPaused = True
  175. self.producer.resumeProducing()
  176. elif self.disconnecting:
  177. # But if I was previously asked to let the connection die,
  178. # do so.
  179. self.connectionLost(failure.Failure(main.CONNECTION_DONE))
  180. elif self._writeDisconnecting:
  181. # I was previously asked to half-close the connection.
  182. self._writeDisconnected = True
  183. self._closeWriteConnection()
  184. return False
  185. else:
  186. return True
  187. def doWrite(self):
  188. if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
  189. # If there is currently less than SEND_LIMIT bytes left to send
  190. # in the string, extend it with the array data.
  191. self.dataBuffer = (self.dataBuffer[self.offset:] +
  192. b"".join(self._tempDataBuffer))
  193. self.offset = 0
  194. self._tempDataBuffer = []
  195. self._tempDataLen = 0
  196. evt = _iocp.Event(self._cbWrite, self)
  197. # Send as much data as you can.
  198. if self.offset:
  199. sendView = memoryview(self.dataBuffer)
  200. evt.buff = buff = sendView[self.offset:]
  201. else:
  202. evt.buff = buff = self.dataBuffer
  203. rc, data = self.writeToHandle(buff, evt)
  204. if rc and rc != ERROR_IO_PENDING:
  205. self._handleWrite(rc, data, evt)
  206. def writeToHandle(self, buff, evt):
  207. raise NotImplementedError() # TODO: this should default to WriteFile
  208. def write(self, data):
  209. """Reliably write some data.
  210. The data is buffered until his file descriptor is ready for writing.
  211. """
  212. if isinstance(data, unicode): # no, really, I mean it
  213. raise TypeError("Data must not be unicode")
  214. if not self.connected or self._writeDisconnected:
  215. return
  216. if data:
  217. self._tempDataBuffer.append(data)
  218. self._tempDataLen += len(data)
  219. if self.producer is not None and self.streamingProducer:
  220. if (len(self.dataBuffer) + self._tempDataLen
  221. > self.writeBufferSize):
  222. self.producerPaused = True
  223. self.producer.pauseProducing()
  224. self.startWriting()
  225. def writeSequence(self, iovec):
  226. for i in iovec:
  227. if isinstance(i, unicode): # no, really, I mean it
  228. raise TypeError("Data must not be unicode")
  229. if not self.connected or not iovec or self._writeDisconnected:
  230. return
  231. self._tempDataBuffer.extend(iovec)
  232. for i in iovec:
  233. self._tempDataLen += len(i)
  234. if self.producer is not None and self.streamingProducer:
  235. if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
  236. self.producerPaused = True
  237. self.producer.pauseProducing()
  238. self.startWriting()
  239. # general stuff
  240. connected = False
  241. disconnected = False
  242. disconnecting = False
  243. logstr = "Uninitialized"
  244. SEND_LIMIT = 128*1024
  245. def __init__(self, reactor = None):
  246. if not reactor:
  247. from twisted.internet import reactor
  248. self.reactor = reactor
  249. self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
  250. self._tempDataLen = 0
  251. self._readBuffers = [bytearray(self.readBufferSize)]
  252. def connectionLost(self, reason):
  253. """
  254. The connection was lost.
  255. This is called when the connection on a selectable object has been
  256. lost. It will be called whether the connection was closed explicitly,
  257. an exception occurred in an event handler, or the other end of the
  258. connection closed it first.
  259. Clean up state here, but make sure to call back up to FileDescriptor.
  260. """
  261. self.disconnected = True
  262. self.connected = False
  263. if self.producer is not None:
  264. self.producer.stopProducing()
  265. self.producer = None
  266. self.stopReading()
  267. self.stopWriting()
  268. self.reactor.removeActiveHandle(self)
  269. def getFileHandle(self):
  270. return -1
  271. def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
  272. """
  273. Close the connection at the next available opportunity.
  274. Call this to cause this FileDescriptor to lose its connection. It will
  275. first write any data that it has buffered.
  276. If there is data buffered yet to be written, this method will cause the
  277. transport to lose its connection as soon as it's done flushing its
  278. write buffer. If you have a producer registered, the connection won't
  279. be closed until the producer is finished. Therefore, make sure you
  280. unregister your producer when it's finished, or the connection will
  281. never close.
  282. """
  283. if self.connected and not self.disconnecting:
  284. if self._writeDisconnected:
  285. # doWrite won't trigger the connection close anymore
  286. self.stopReading()
  287. self.stopWriting
  288. self.connectionLost(_connDone)
  289. else:
  290. self.stopReading()
  291. self.startWriting()
  292. self.disconnecting = 1
  293. # Producer/consumer implementation
  294. def stopConsuming(self):
  295. """
  296. Stop consuming data.
  297. This is called when a producer has lost its connection, to tell the
  298. consumer to go lose its connection (and break potential circular
  299. references).
  300. """
  301. self.unregisterProducer()
  302. self.loseConnection()
  303. # producer interface implementation
  304. def resumeProducing(self):
  305. if self.connected and not self.disconnecting:
  306. self.startReading()
  307. def pauseProducing(self):
  308. self.stopReading()
  309. def stopProducing(self):
  310. self.loseConnection()
  311. __all__ = ['FileHandle']