tcp.py 20 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. TCP support for IOCP reactor
  5. """
  6. import socket, operator, errno, struct
  7. from zope.interface import implementer, classImplements
  8. from twisted.internet import interfaces, error, address, main, defer
  9. from twisted.internet.protocol import Protocol
  10. from twisted.internet.abstract import _LogOwner, isIPv6Address
  11. from twisted.internet.tcp import (
  12. _SocketCloser, Connector as TCPConnector, _AbortingMixin, _BaseBaseClient,
  13. _BaseTCPClient, _resolveIPv6, _getsockname)
  14. from twisted.python import log, failure, reflect
  15. from twisted.python.compat import _PY3, nativeString
  16. from twisted.internet.iocpreactor import iocpsupport as _iocp, abstract
  17. from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
  18. from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
  19. from twisted.internet.iocpreactor.const import SO_UPDATE_CONNECT_CONTEXT
  20. from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
  21. from twisted.internet.iocpreactor.const import ERROR_CONNECTION_REFUSED
  22. from twisted.internet.iocpreactor.const import ERROR_NETWORK_UNREACHABLE
  23. try:
  24. from twisted.internet._newtls import startTLS as _startTLS
  25. except ImportError:
  26. _startTLS = None
  27. # ConnectEx returns these. XXX: find out what it does for timeout
  28. connectExErrors = {
  29. ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED,
  30. ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH,
  31. }
  32. @implementer(IReadWriteHandle, interfaces.ITCPTransport,
  33. interfaces.ISystemHandle)
  34. class Connection(abstract.FileHandle, _SocketCloser, _AbortingMixin):
  35. """
  36. @ivar TLS: C{False} to indicate the connection is in normal TCP mode,
  37. C{True} to indicate that TLS has been started and that operations must
  38. be routed through the L{TLSMemoryBIOProtocol} instance.
  39. """
  40. TLS = False
  41. def __init__(self, sock, proto, reactor=None):
  42. abstract.FileHandle.__init__(self, reactor)
  43. self.socket = sock
  44. self.getFileHandle = sock.fileno
  45. self.protocol = proto
  46. def getHandle(self):
  47. return self.socket
  48. def dataReceived(self, rbuffer):
  49. """
  50. @param rbuffer: Data received.
  51. @type rbuffer: L{bytes} or L{bytearray}
  52. """
  53. if isinstance(rbuffer, bytes):
  54. pass
  55. elif isinstance(rbuffer, bytearray):
  56. # XXX: some day, we'll have protocols that can handle raw buffers
  57. rbuffer = bytes(rbuffer)
  58. else:
  59. raise TypeError("data must be bytes or bytearray, not " +
  60. type(rbuffer))
  61. self.protocol.dataReceived(rbuffer)
  62. def readFromHandle(self, bufflist, evt):
  63. return _iocp.recv(self.getFileHandle(), bufflist, evt)
  64. def writeToHandle(self, buff, evt):
  65. """
  66. Send C{buff} to current file handle using C{_iocp.send}. The buffer
  67. sent is limited to a size of C{self.SEND_LIMIT}.
  68. """
  69. writeView = memoryview(buff)
  70. return _iocp.send(self.getFileHandle(),
  71. writeView[0:self.SEND_LIMIT].tobytes(), evt)
  72. def _closeWriteConnection(self):
  73. try:
  74. self.socket.shutdown(1)
  75. except socket.error:
  76. pass
  77. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  78. if p:
  79. try:
  80. p.writeConnectionLost()
  81. except:
  82. f = failure.Failure()
  83. log.err()
  84. self.connectionLost(f)
  85. def readConnectionLost(self, reason):
  86. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  87. if p:
  88. try:
  89. p.readConnectionLost()
  90. except:
  91. log.err()
  92. self.connectionLost(failure.Failure())
  93. else:
  94. self.connectionLost(reason)
  95. def connectionLost(self, reason):
  96. if self.disconnected:
  97. return
  98. abstract.FileHandle.connectionLost(self, reason)
  99. isClean = (reason is None or
  100. not reason.check(error.ConnectionAborted))
  101. self._closeSocket(isClean)
  102. protocol = self.protocol
  103. del self.protocol
  104. del self.socket
  105. del self.getFileHandle
  106. protocol.connectionLost(reason)
  107. def logPrefix(self):
  108. """
  109. Return the prefix to log with when I own the logging thread.
  110. """
  111. return self.logstr
  112. def getTcpNoDelay(self):
  113. return operator.truth(self.socket.getsockopt(socket.IPPROTO_TCP,
  114. socket.TCP_NODELAY))
  115. def setTcpNoDelay(self, enabled):
  116. self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
  117. def getTcpKeepAlive(self):
  118. return operator.truth(self.socket.getsockopt(socket.SOL_SOCKET,
  119. socket.SO_KEEPALIVE))
  120. def setTcpKeepAlive(self, enabled):
  121. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
  122. if _startTLS is not None:
  123. def startTLS(self, contextFactory, normal=True):
  124. """
  125. @see: L{ITLSTransport.startTLS}
  126. """
  127. _startTLS(self, contextFactory, normal, abstract.FileHandle)
  128. def write(self, data):
  129. """
  130. Write some data, either directly to the underlying handle or, if TLS
  131. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  132. send.
  133. @see: L{twisted.internet.interfaces.ITransport.write}
  134. """
  135. if self.disconnected:
  136. return
  137. if self.TLS:
  138. self.protocol.write(data)
  139. else:
  140. abstract.FileHandle.write(self, data)
  141. def writeSequence(self, iovec):
  142. """
  143. Write some data, either directly to the underlying handle or, if TLS
  144. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  145. send.
  146. @see: L{twisted.internet.interfaces.ITransport.writeSequence}
  147. """
  148. if self.disconnected:
  149. return
  150. if self.TLS:
  151. self.protocol.writeSequence(iovec)
  152. else:
  153. abstract.FileHandle.writeSequence(self, iovec)
  154. def loseConnection(self, reason=None):
  155. """
  156. Close the underlying handle or, if TLS has been started, first shut it
  157. down.
  158. @see: L{twisted.internet.interfaces.ITransport.loseConnection}
  159. """
  160. if self.TLS:
  161. if self.connected and not self.disconnecting:
  162. self.protocol.loseConnection()
  163. else:
  164. abstract.FileHandle.loseConnection(self, reason)
  165. def registerProducer(self, producer, streaming):
  166. """
  167. Register a producer.
  168. If TLS is enabled, the TLS connection handles this.
  169. """
  170. if self.TLS:
  171. # Registering a producer before we're connected shouldn't be a
  172. # problem. If we end up with a write(), that's already handled in
  173. # the write() code above, and there are no other potential
  174. # side-effects.
  175. self.protocol.registerProducer(producer, streaming)
  176. else:
  177. abstract.FileHandle.registerProducer(self, producer, streaming)
  178. def unregisterProducer(self):
  179. """
  180. Unregister a producer.
  181. If TLS is enabled, the TLS connection handles this.
  182. """
  183. if self.TLS:
  184. self.protocol.unregisterProducer()
  185. else:
  186. abstract.FileHandle.unregisterProducer(self)
  187. if _startTLS is not None:
  188. classImplements(Connection, interfaces.ITLSTransport)
  189. class Client(_BaseBaseClient, _BaseTCPClient, Connection):
  190. """
  191. @ivar _tlsClientDefault: Always C{True}, indicating that this is a client
  192. connection, and by default when TLS is negotiated this class will act as
  193. a TLS client.
  194. """
  195. addressFamily = socket.AF_INET
  196. socketType = socket.SOCK_STREAM
  197. _tlsClientDefault = True
  198. _commonConnection = Connection
  199. def __init__(self, host, port, bindAddress, connector, reactor):
  200. # ConnectEx documentation says socket _has_ to be bound
  201. if bindAddress is None:
  202. bindAddress = ('', 0)
  203. self.reactor = reactor # createInternetSocket needs this
  204. _BaseTCPClient.__init__(self, host, port, bindAddress, connector,
  205. reactor)
  206. def createInternetSocket(self):
  207. """
  208. Create a socket registered with the IOCP reactor.
  209. @see: L{_BaseTCPClient}
  210. """
  211. return self.reactor.createSocket(self.addressFamily, self.socketType)
  212. def _collectSocketDetails(self):
  213. """
  214. Clean up potentially circular references to the socket and to its
  215. C{getFileHandle} method.
  216. @see: L{_BaseBaseClient}
  217. """
  218. del self.socket, self.getFileHandle
  219. def _stopReadingAndWriting(self):
  220. """
  221. Remove the active handle from the reactor.
  222. @see: L{_BaseBaseClient}
  223. """
  224. self.reactor.removeActiveHandle(self)
  225. def cbConnect(self, rc, data, evt):
  226. if rc:
  227. rc = connectExErrors.get(rc, rc)
  228. self.failIfNotConnected(error.getConnectError((rc,
  229. errno.errorcode.get(rc, 'Unknown error'))))
  230. else:
  231. self.socket.setsockopt(
  232. socket.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
  233. struct.pack('P', self.socket.fileno()))
  234. self.protocol = self.connector.buildProtocol(self.getPeer())
  235. self.connected = True
  236. logPrefix = self._getLogPrefix(self.protocol)
  237. self.logstr = logPrefix + ",client"
  238. if self.protocol is None:
  239. # Factory.buildProtocol is allowed to return None. In that
  240. # case, make up a protocol to satisfy the rest of the
  241. # implementation; connectionLost is going to be called on
  242. # something, for example. This is easier than adding special
  243. # case support for a None protocol throughout the rest of the
  244. # transport implementation.
  245. self.protocol = Protocol()
  246. # But dispose of the connection quickly.
  247. self.loseConnection()
  248. else:
  249. self.protocol.makeConnection(self)
  250. self.startReading()
  251. def doConnect(self):
  252. if not hasattr(self, "connector"):
  253. # this happens if we connector.stopConnecting in
  254. # factory.startedConnecting
  255. return
  256. assert _iocp.have_connectex
  257. self.reactor.addActiveHandle(self)
  258. evt = _iocp.Event(self.cbConnect, self)
  259. rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt)
  260. if rc and rc != ERROR_IO_PENDING:
  261. self.cbConnect(rc, 0, evt)
  262. class Server(Connection):
  263. """
  264. Serverside socket-stream connection class.
  265. I am a serverside network connection transport; a socket which came from an
  266. accept() on a server.
  267. @ivar _tlsClientDefault: Always C{False}, indicating that this is a server
  268. connection, and by default when TLS is negotiated this class will act as
  269. a TLS server.
  270. """
  271. _tlsClientDefault = False
  272. def __init__(self, sock, protocol, clientAddr, serverAddr, sessionno, reactor):
  273. """
  274. Server(sock, protocol, client, server, sessionno)
  275. Initialize me with a socket, a protocol, a descriptor for my peer (a
  276. tuple of host, port describing the other end of the connection), an
  277. instance of Port, and a session number.
  278. """
  279. Connection.__init__(self, sock, protocol, reactor)
  280. self.serverAddr = serverAddr
  281. self.clientAddr = clientAddr
  282. self.sessionno = sessionno
  283. logPrefix = self._getLogPrefix(self.protocol)
  284. self.logstr = "%s,%s,%s" % (logPrefix, sessionno, self.clientAddr.host)
  285. self.repstr = "<%s #%s on %s>" % (self.protocol.__class__.__name__,
  286. self.sessionno, self.serverAddr.port)
  287. self.connected = True
  288. self.startReading()
  289. def __repr__(self):
  290. """
  291. A string representation of this connection.
  292. """
  293. return self.repstr
  294. def getHost(self):
  295. """
  296. Returns an IPv4Address.
  297. This indicates the server's address.
  298. """
  299. return self.serverAddr
  300. def getPeer(self):
  301. """
  302. Returns an IPv4Address.
  303. This indicates the client's address.
  304. """
  305. return self.clientAddr
  306. class Connector(TCPConnector):
  307. def _makeTransport(self):
  308. return Client(self.host, self.port, self.bindAddress, self,
  309. self.reactor)
  310. @implementer(interfaces.IListeningPort)
  311. class Port(_SocketCloser, _LogOwner):
  312. connected = False
  313. disconnected = False
  314. disconnecting = False
  315. addressFamily = socket.AF_INET
  316. socketType = socket.SOCK_STREAM
  317. _addressType = address.IPv4Address
  318. sessionno = 0
  319. # Actual port number being listened on, only set to a non-None
  320. # value when we are actually listening.
  321. _realPortNumber = None
  322. # A string describing the connections which will be created by this port.
  323. # Normally this is C{"TCP"}, since this is a TCP port, but when the TLS
  324. # implementation re-uses this class it overrides the value with C{"TLS"}.
  325. # Only used for logging.
  326. _type = 'TCP'
  327. def __init__(self, port, factory, backlog=50, interface='', reactor=None):
  328. self.port = port
  329. self.factory = factory
  330. self.backlog = backlog
  331. self.interface = interface
  332. self.reactor = reactor
  333. if isIPv6Address(interface):
  334. self.addressFamily = socket.AF_INET6
  335. self._addressType = address.IPv6Address
  336. def __repr__(self):
  337. if self._realPortNumber is not None:
  338. return "<%s of %s on %s>" % (self.__class__,
  339. self.factory.__class__,
  340. self._realPortNumber)
  341. else:
  342. return "<%s of %s (not listening)>" % (self.__class__,
  343. self.factory.__class__)
  344. def startListening(self):
  345. try:
  346. skt = self.reactor.createSocket(self.addressFamily,
  347. self.socketType)
  348. # TODO: resolve self.interface if necessary
  349. if self.addressFamily == socket.AF_INET6:
  350. addr = _resolveIPv6(self.interface, self.port)
  351. else:
  352. addr = (self.interface, self.port)
  353. skt.bind(addr)
  354. except socket.error as le:
  355. raise error.CannotListenError(self.interface, self.port, le)
  356. self.addrLen = _iocp.maxAddrLen(skt.fileno())
  357. # Make sure that if we listened on port 0, we update that to
  358. # reflect what the OS actually assigned us.
  359. self._realPortNumber = skt.getsockname()[1]
  360. log.msg("%s starting on %s" % (self._getLogPrefix(self.factory),
  361. self._realPortNumber))
  362. self.factory.doStart()
  363. skt.listen(self.backlog)
  364. self.connected = True
  365. self.disconnected = False
  366. self.reactor.addActiveHandle(self)
  367. self.socket = skt
  368. self.getFileHandle = self.socket.fileno
  369. self.doAccept()
  370. def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
  371. """
  372. Stop accepting connections on this port.
  373. This will shut down my socket and call self.connectionLost().
  374. It returns a deferred which will fire successfully when the
  375. port is actually closed.
  376. """
  377. self.disconnecting = True
  378. if self.connected:
  379. self.deferred = defer.Deferred()
  380. self.reactor.callLater(0, self.connectionLost, connDone)
  381. return self.deferred
  382. stopListening = loseConnection
  383. def _logConnectionLostMsg(self):
  384. """
  385. Log message for closing port
  386. """
  387. log.msg('(%s Port %s Closed)' % (self._type, self._realPortNumber))
  388. def connectionLost(self, reason):
  389. """
  390. Cleans up the socket.
  391. """
  392. self._logConnectionLostMsg()
  393. self._realPortNumber = None
  394. d = None
  395. if hasattr(self, "deferred"):
  396. d = self.deferred
  397. del self.deferred
  398. self.disconnected = True
  399. self.reactor.removeActiveHandle(self)
  400. self.connected = False
  401. self._closeSocket(True)
  402. del self.socket
  403. del self.getFileHandle
  404. try:
  405. self.factory.doStop()
  406. except:
  407. self.disconnecting = False
  408. if d is not None:
  409. d.errback(failure.Failure())
  410. else:
  411. raise
  412. else:
  413. self.disconnecting = False
  414. if d is not None:
  415. d.callback(None)
  416. def logPrefix(self):
  417. """
  418. Returns the name of my class, to prefix log entries with.
  419. """
  420. return reflect.qual(self.factory.__class__)
  421. def getHost(self):
  422. """
  423. Returns an IPv4Address or IPv6Address.
  424. This indicates the server's address.
  425. """
  426. return self._addressType('TCP', *_getsockname(self.socket))
  427. def cbAccept(self, rc, data, evt):
  428. self.handleAccept(rc, evt)
  429. if not (self.disconnecting or self.disconnected):
  430. self.doAccept()
  431. def handleAccept(self, rc, evt):
  432. if self.disconnecting or self.disconnected:
  433. return False
  434. # possible errors:
  435. # (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED)
  436. if rc:
  437. log.msg("Could not accept new connection -- %s (%s)" %
  438. (errno.errorcode.get(rc, 'unknown error'), rc))
  439. return False
  440. else:
  441. evt.newskt.setsockopt(
  442. socket.SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
  443. struct.pack('P', self.socket.fileno()))
  444. family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(),
  445. evt.buff)
  446. if not _PY3:
  447. # In _makesockaddr(), we use the Win32 API which
  448. # gives us an address of the form: (unicode host, port).
  449. # Only on Python 2 do we need to convert it to a
  450. # non-unicode str.
  451. # On Python 3, we leave it alone as unicode.
  452. lAddr = (nativeString(lAddr[0]), lAddr[1])
  453. rAddr = (nativeString(rAddr[0]), rAddr[1])
  454. assert family == self.addressFamily
  455. # Build an IPv6 address that includes the scopeID, if necessary
  456. if "%" in lAddr[0]:
  457. scope = int(lAddr[0].split("%")[1])
  458. lAddr = (lAddr[0], lAddr[1], 0, scope)
  459. if "%" in rAddr[0]:
  460. scope = int(rAddr[0].split("%")[1])
  461. rAddr = (rAddr[0], rAddr[1], 0, scope)
  462. protocol = self.factory.buildProtocol(
  463. self._addressType('TCP', *rAddr))
  464. if protocol is None:
  465. evt.newskt.close()
  466. else:
  467. s = self.sessionno
  468. self.sessionno = s+1
  469. transport = Server(evt.newskt, protocol,
  470. self._addressType('TCP', *rAddr),
  471. self._addressType('TCP', *lAddr),
  472. s, self.reactor)
  473. protocol.makeConnection(transport)
  474. return True
  475. def doAccept(self):
  476. evt = _iocp.Event(self.cbAccept, self)
  477. # see AcceptEx documentation
  478. evt.buff = buff = bytearray(2 * (self.addrLen + 16))
  479. evt.newskt = newskt = self.reactor.createSocket(self.addressFamily,
  480. self.socketType)
  481. rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt)
  482. if rc and rc != ERROR_IO_PENDING:
  483. self.handleAccept(rc, evt)