123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Abstract file handle class
- """
- from twisted.internet import main, error, interfaces
- from twisted.internet.abstract import _ConsumerMixin, _LogOwner
- from twisted.python import failure
- from twisted.python.compat import unicode
- from zope.interface import implementer
- import errno
- from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
- from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
- from twisted.internet.iocpreactor import iocpsupport as _iocp
- @implementer(interfaces.IPushProducer, interfaces.IConsumer,
- interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
- class FileHandle(_ConsumerMixin, _LogOwner):
- """
- File handle that can read and write asynchronously
- """
- # read stuff
- maxReadBuffers = 16
- readBufferSize = 4096
- reading = False
- dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
- _readNextBuffer = 0
- _readSize = 0 # how much data we have in the read buffer
- _readScheduled = None
- _readScheduledInOS = False
- def startReading(self):
- self.reactor.addActiveHandle(self)
- if not self._readScheduled and not self.reading:
- self.reading = True
- self._readScheduled = self.reactor.callLater(0,
- self._resumeReading)
- def stopReading(self):
- if self._readScheduled:
- self._readScheduled.cancel()
- self._readScheduled = None
- self.reading = False
- def _resumeReading(self):
- self._readScheduled = None
- if self._dispatchData() and not self._readScheduledInOS:
- self.doRead()
- def _dispatchData(self):
- """
- Dispatch previously read data. Return True if self.reading and we don't
- have any more data
- """
- if not self._readSize:
- return self.reading
- size = self._readSize
- full_buffers = size // self.readBufferSize
- while self._readNextBuffer < full_buffers:
- self.dataReceived(self._readBuffers[self._readNextBuffer])
- self._readNextBuffer += 1
- if not self.reading:
- return False
- remainder = size % self.readBufferSize
- if remainder:
- self.dataReceived(self._readBuffers[full_buffers][0:remainder])
- if self.dynamicReadBuffers:
- total_buffer_size = self.readBufferSize * len(self._readBuffers)
- # we have one buffer too many
- if size < total_buffer_size - self.readBufferSize:
- del self._readBuffers[-1]
- # we filled all buffers, so allocate one more
- elif (size == total_buffer_size and
- len(self._readBuffers) < self.maxReadBuffers):
- self._readBuffers.append(bytearray(self.readBufferSize))
- self._readNextBuffer = 0
- self._readSize = 0
- return self.reading
- def _cbRead(self, rc, data, evt):
- self._readScheduledInOS = False
- if self._handleRead(rc, data, evt):
- self.doRead()
- def _handleRead(self, rc, data, evt):
- """
- Returns False if we should stop reading for now
- """
- if self.disconnected:
- return False
- # graceful disconnection
- if (not (rc or data)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
- self.reactor.removeActiveHandle(self)
- self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
- return False
- # XXX: not handling WSAEWOULDBLOCK
- # ("too many outstanding overlapped I/O requests")
- elif rc:
- self.connectionLost(failure.Failure(
- error.ConnectionLost("read error -- %s (%s)" %
- (errno.errorcode.get(rc, 'unknown'), rc))))
- return False
- else:
- assert self._readSize == 0
- assert self._readNextBuffer == 0
- self._readSize = data
- return self._dispatchData()
- def doRead(self):
- evt = _iocp.Event(self._cbRead, self)
- evt.buff = buff = self._readBuffers
- rc, numBytesRead = self.readFromHandle(buff, evt)
- if not rc or rc == ERROR_IO_PENDING:
- self._readScheduledInOS = True
- else:
- self._handleRead(rc, numBytesRead, evt)
- def readFromHandle(self, bufflist, evt):
- raise NotImplementedError() # TODO: this should default to ReadFile
- def dataReceived(self, data):
- raise NotImplementedError
- def readConnectionLost(self, reason):
- self.connectionLost(reason)
- # write stuff
- dataBuffer = b''
- offset = 0
- writing = False
- _writeScheduled = None
- _writeDisconnecting = False
- _writeDisconnected = False
- writeBufferSize = 2**2**2**2
- def loseWriteConnection(self):
- self._writeDisconnecting = True
- self.startWriting()
- def _closeWriteConnection(self):
- # override in subclasses
- pass
- def writeConnectionLost(self, reason):
- # in current code should never be called
- self.connectionLost(reason)
- def startWriting(self):
- self.reactor.addActiveHandle(self)
- if not self._writeScheduled and not self.writing:
- self.writing = True
- self._writeScheduled = self.reactor.callLater(0,
- self._resumeWriting)
- def stopWriting(self):
- if self._writeScheduled:
- self._writeScheduled.cancel()
- self._writeScheduled = None
- self.writing = False
- def _resumeWriting(self):
- self._writeScheduled = None
- self.doWrite()
- def _cbWrite(self, rc, numBytesWritten, evt):
- if self._handleWrite(rc, numBytesWritten, evt):
- self.doWrite()
- def _handleWrite(self, rc, numBytesWritten, evt):
- """
- Returns false if we should stop writing for now
- """
- if self.disconnected or self._writeDisconnected:
- return False
- # XXX: not handling WSAEWOULDBLOCK
- # ("too many outstanding overlapped I/O requests")
- if rc:
- self.connectionLost(failure.Failure(
- error.ConnectionLost("write error -- %s (%s)" %
- (errno.errorcode.get(rc, 'unknown'), rc))))
- return False
- else:
- self.offset += numBytesWritten
- # If there is nothing left to send,
- if self.offset == len(self.dataBuffer) and not self._tempDataLen:
- self.dataBuffer = b""
- self.offset = 0
- # stop writing
- self.stopWriting()
- # If I've got a producer who is supposed to supply me with data
- if self.producer is not None and ((not self.streamingProducer)
- or self.producerPaused):
- # tell them to supply some more.
- self.producerPaused = True
- self.producer.resumeProducing()
- elif self.disconnecting:
- # But if I was previously asked to let the connection die,
- # do so.
- self.connectionLost(failure.Failure(main.CONNECTION_DONE))
- elif self._writeDisconnecting:
- # I was previously asked to half-close the connection.
- self._writeDisconnected = True
- self._closeWriteConnection()
- return False
- else:
- return True
- def doWrite(self):
- if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
- # If there is currently less than SEND_LIMIT bytes left to send
- # in the string, extend it with the array data.
- self.dataBuffer = (self.dataBuffer[self.offset:] +
- b"".join(self._tempDataBuffer))
- self.offset = 0
- self._tempDataBuffer = []
- self._tempDataLen = 0
- evt = _iocp.Event(self._cbWrite, self)
- # Send as much data as you can.
- if self.offset:
- sendView = memoryview(self.dataBuffer)
- evt.buff = buff = sendView[self.offset:]
- else:
- evt.buff = buff = self.dataBuffer
- rc, data = self.writeToHandle(buff, evt)
- if rc and rc != ERROR_IO_PENDING:
- self._handleWrite(rc, data, evt)
- def writeToHandle(self, buff, evt):
- raise NotImplementedError() # TODO: this should default to WriteFile
- def write(self, data):
- """Reliably write some data.
- The data is buffered until his file descriptor is ready for writing.
- """
- if isinstance(data, unicode): # no, really, I mean it
- raise TypeError("Data must not be unicode")
- if not self.connected or self._writeDisconnected:
- return
- if data:
- self._tempDataBuffer.append(data)
- self._tempDataLen += len(data)
- if self.producer is not None and self.streamingProducer:
- if (len(self.dataBuffer) + self._tempDataLen
- > self.writeBufferSize):
- self.producerPaused = True
- self.producer.pauseProducing()
- self.startWriting()
- def writeSequence(self, iovec):
- for i in iovec:
- if isinstance(i, unicode): # no, really, I mean it
- raise TypeError("Data must not be unicode")
- if not self.connected or not iovec or self._writeDisconnected:
- return
- self._tempDataBuffer.extend(iovec)
- for i in iovec:
- self._tempDataLen += len(i)
- if self.producer is not None and self.streamingProducer:
- if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
- self.producerPaused = True
- self.producer.pauseProducing()
- self.startWriting()
- # general stuff
- connected = False
- disconnected = False
- disconnecting = False
- logstr = "Uninitialized"
- SEND_LIMIT = 128*1024
- def __init__(self, reactor = None):
- if not reactor:
- from twisted.internet import reactor
- self.reactor = reactor
- self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
- self._tempDataLen = 0
- self._readBuffers = [bytearray(self.readBufferSize)]
- def connectionLost(self, reason):
- """
- The connection was lost.
- This is called when the connection on a selectable object has been
- lost. It will be called whether the connection was closed explicitly,
- an exception occurred in an event handler, or the other end of the
- connection closed it first.
- Clean up state here, but make sure to call back up to FileDescriptor.
- """
- self.disconnected = True
- self.connected = False
- if self.producer is not None:
- self.producer.stopProducing()
- self.producer = None
- self.stopReading()
- self.stopWriting()
- self.reactor.removeActiveHandle(self)
- def getFileHandle(self):
- return -1
- def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
- """
- Close the connection at the next available opportunity.
- Call this to cause this FileDescriptor to lose its connection. It will
- first write any data that it has buffered.
- If there is data buffered yet to be written, this method will cause the
- transport to lose its connection as soon as it's done flushing its
- write buffer. If you have a producer registered, the connection won't
- be closed until the producer is finished. Therefore, make sure you
- unregister your producer when it's finished, or the connection will
- never close.
- """
- if self.connected and not self.disconnecting:
- if self._writeDisconnected:
- # doWrite won't trigger the connection close anymore
- self.stopReading()
- self.stopWriting
- self.connectionLost(_connDone)
- else:
- self.stopReading()
- self.startWriting()
- self.disconnecting = 1
- # Producer/consumer implementation
- def stopConsuming(self):
- """
- Stop consuming data.
- This is called when a producer has lost its connection, to tell the
- consumer to go lose its connection (and break potential circular
- references).
- """
- self.unregisterProducer()
- self.loseConnection()
- # producer interface implementation
- def resumeProducing(self):
- if self.connected and not self.disconnecting:
- self.startReading()
- def pauseProducing(self):
- self.stopReading()
- def stopProducing(self):
- self.loseConnection()
- __all__ = ['FileHandle']
|