tcp.py 20 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. TCP support for IOCP reactor
  5. """
  6. from __future__ import annotations
  7. import errno
  8. import socket
  9. import struct
  10. from typing import TYPE_CHECKING, Optional, Union
  11. from zope.interface import classImplements, implementer
  12. from twisted.internet import address, defer, error, interfaces, main
  13. from twisted.internet.abstract import _LogOwner, isIPv6Address
  14. from twisted.internet.address import IPv4Address, IPv6Address
  15. from twisted.internet.interfaces import IProtocol
  16. from twisted.internet.iocpreactor import abstract, iocpsupport as _iocp
  17. from twisted.internet.iocpreactor.const import (
  18. ERROR_CONNECTION_REFUSED,
  19. ERROR_IO_PENDING,
  20. ERROR_NETWORK_UNREACHABLE,
  21. SO_UPDATE_ACCEPT_CONTEXT,
  22. SO_UPDATE_CONNECT_CONTEXT,
  23. )
  24. from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
  25. from twisted.internet.protocol import Protocol
  26. from twisted.internet.tcp import (
  27. Connector as TCPConnector,
  28. _AbortingMixin,
  29. _BaseBaseClient,
  30. _BaseTCPClient,
  31. _getsockname,
  32. _resolveIPv6,
  33. _SocketCloser,
  34. )
  35. from twisted.python import failure, log, reflect
  36. try:
  37. from twisted.internet._newtls import startTLS as __startTLS
  38. except ImportError:
  39. _startTLS = None
  40. else:
  41. _startTLS = __startTLS
  42. if TYPE_CHECKING:
  43. # Circular import only to describe a type.
  44. from twisted.internet.iocpreactor.reactor import IOCPReactor
  45. # ConnectEx returns these. XXX: find out what it does for timeout
  46. connectExErrors = {
  47. ERROR_CONNECTION_REFUSED: errno.WSAECONNREFUSED, # type: ignore[attr-defined]
  48. ERROR_NETWORK_UNREACHABLE: errno.WSAENETUNREACH, # type: ignore[attr-defined]
  49. }
  50. @implementer(IReadWriteHandle, interfaces.ITCPTransport, interfaces.ISystemHandle)
  51. class Connection(abstract.FileHandle, _SocketCloser, _AbortingMixin):
  52. """
  53. @ivar TLS: C{False} to indicate the connection is in normal TCP mode,
  54. C{True} to indicate that TLS has been started and that operations must
  55. be routed through the L{TLSMemoryBIOProtocol} instance.
  56. """
  57. TLS = False
  58. def __init__(self, sock, proto, reactor=None):
  59. abstract.FileHandle.__init__(self, reactor)
  60. self.socket = sock
  61. self.getFileHandle = sock.fileno
  62. self.protocol = proto
  63. def getHandle(self):
  64. return self.socket
  65. def dataReceived(self, rbuffer):
  66. """
  67. @param rbuffer: Data received.
  68. @type rbuffer: L{bytes} or L{bytearray}
  69. """
  70. if isinstance(rbuffer, bytes):
  71. pass
  72. elif isinstance(rbuffer, bytearray):
  73. # XXX: some day, we'll have protocols that can handle raw buffers
  74. rbuffer = bytes(rbuffer)
  75. else:
  76. raise TypeError("data must be bytes or bytearray, not " + type(rbuffer))
  77. self.protocol.dataReceived(rbuffer)
  78. def readFromHandle(self, bufflist, evt):
  79. return _iocp.recv(self.getFileHandle(), bufflist, evt)
  80. def writeToHandle(self, buff, evt):
  81. """
  82. Send C{buff} to current file handle using C{_iocp.send}. The buffer
  83. sent is limited to a size of C{self.SEND_LIMIT}.
  84. """
  85. writeView = memoryview(buff)
  86. return _iocp.send(
  87. self.getFileHandle(), writeView[0 : self.SEND_LIMIT].tobytes(), evt
  88. )
  89. def _closeWriteConnection(self):
  90. try:
  91. self.socket.shutdown(1)
  92. except OSError:
  93. pass
  94. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  95. if p:
  96. try:
  97. p.writeConnectionLost()
  98. except BaseException:
  99. f = failure.Failure()
  100. log.err()
  101. self.connectionLost(f)
  102. def readConnectionLost(self, reason):
  103. p = interfaces.IHalfCloseableProtocol(self.protocol, None)
  104. if p:
  105. try:
  106. p.readConnectionLost()
  107. except BaseException:
  108. log.err()
  109. self.connectionLost(failure.Failure())
  110. else:
  111. self.connectionLost(reason)
  112. def connectionLost(self, reason):
  113. if self.disconnected:
  114. return
  115. abstract.FileHandle.connectionLost(self, reason)
  116. isClean = reason is None or not reason.check(error.ConnectionAborted)
  117. self._closeSocket(isClean)
  118. protocol = self.protocol
  119. del self.protocol
  120. del self.socket
  121. del self.getFileHandle
  122. protocol.connectionLost(reason)
  123. def logPrefix(self):
  124. """
  125. Return the prefix to log with when I own the logging thread.
  126. """
  127. return self.logstr
  128. def getTcpNoDelay(self):
  129. return bool(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))
  130. def setTcpNoDelay(self, enabled):
  131. self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, enabled)
  132. def getTcpKeepAlive(self):
  133. return bool(self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE))
  134. def setTcpKeepAlive(self, enabled):
  135. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, enabled)
  136. if _startTLS is not None:
  137. def startTLS(self, contextFactory, normal=True):
  138. """
  139. @see: L{ITLSTransport.startTLS}
  140. """
  141. _startTLS(self, contextFactory, normal, abstract.FileHandle)
  142. def write(self, data):
  143. """
  144. Write some data, either directly to the underlying handle or, if TLS
  145. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  146. send.
  147. @see: L{twisted.internet.interfaces.ITransport.write}
  148. """
  149. if self.disconnected:
  150. return
  151. if self.TLS:
  152. self.protocol.write(data)
  153. else:
  154. abstract.FileHandle.write(self, data)
  155. def writeSequence(self, iovec):
  156. """
  157. Write some data, either directly to the underlying handle or, if TLS
  158. has been started, to the L{TLSMemoryBIOProtocol} for it to encrypt and
  159. send.
  160. @see: L{twisted.internet.interfaces.ITransport.writeSequence}
  161. """
  162. if self.disconnected:
  163. return
  164. if self.TLS:
  165. self.protocol.writeSequence(iovec)
  166. else:
  167. abstract.FileHandle.writeSequence(self, iovec)
  168. def loseConnection(self, reason=None):
  169. """
  170. Close the underlying handle or, if TLS has been started, first shut it
  171. down.
  172. @see: L{twisted.internet.interfaces.ITransport.loseConnection}
  173. """
  174. if self.TLS:
  175. if self.connected and not self.disconnecting:
  176. self.protocol.loseConnection()
  177. else:
  178. abstract.FileHandle.loseConnection(self, reason)
  179. def registerProducer(self, producer, streaming):
  180. """
  181. Register a producer.
  182. If TLS is enabled, the TLS connection handles this.
  183. """
  184. if self.TLS:
  185. # Registering a producer before we're connected shouldn't be a
  186. # problem. If we end up with a write(), that's already handled in
  187. # the write() code above, and there are no other potential
  188. # side-effects.
  189. self.protocol.registerProducer(producer, streaming)
  190. else:
  191. abstract.FileHandle.registerProducer(self, producer, streaming)
  192. def unregisterProducer(self):
  193. """
  194. Unregister a producer.
  195. If TLS is enabled, the TLS connection handles this.
  196. """
  197. if self.TLS:
  198. self.protocol.unregisterProducer()
  199. else:
  200. abstract.FileHandle.unregisterProducer(self)
  201. def getHost(self):
  202. # ITCPTransport.getHost
  203. pass
  204. def getPeer(self):
  205. # ITCPTransport.getPeer
  206. pass
  207. if _startTLS is not None:
  208. classImplements(Connection, interfaces.ITLSTransport)
  209. class Client(_BaseBaseClient, _BaseTCPClient, Connection):
  210. """
  211. @ivar _tlsClientDefault: Always C{True}, indicating that this is a client
  212. connection, and by default when TLS is negotiated this class will act as
  213. a TLS client.
  214. """
  215. addressFamily = socket.AF_INET
  216. socketType = socket.SOCK_STREAM
  217. _tlsClientDefault = True
  218. _commonConnection = Connection
  219. def __init__(self, host, port, bindAddress, connector, reactor):
  220. # ConnectEx documentation says socket _has_ to be bound
  221. if bindAddress is None:
  222. bindAddress = ("", 0)
  223. self.reactor = reactor # createInternetSocket needs this
  224. _BaseTCPClient.__init__(self, host, port, bindAddress, connector, reactor)
  225. def createInternetSocket(self):
  226. """
  227. Create a socket registered with the IOCP reactor.
  228. @see: L{_BaseTCPClient}
  229. """
  230. return self.reactor.createSocket(self.addressFamily, self.socketType)
  231. def _collectSocketDetails(self):
  232. """
  233. Clean up potentially circular references to the socket and to its
  234. C{getFileHandle} method.
  235. @see: L{_BaseBaseClient}
  236. """
  237. del self.socket, self.getFileHandle
  238. def _stopReadingAndWriting(self):
  239. """
  240. Remove the active handle from the reactor.
  241. @see: L{_BaseBaseClient}
  242. """
  243. self.reactor.removeActiveHandle(self)
  244. def cbConnect(self, rc, data, evt):
  245. if rc:
  246. rc = connectExErrors.get(rc, rc)
  247. self.failIfNotConnected(
  248. error.getConnectError((rc, errno.errorcode.get(rc, "Unknown error")))
  249. )
  250. else:
  251. self.socket.setsockopt(
  252. socket.SOL_SOCKET,
  253. SO_UPDATE_CONNECT_CONTEXT,
  254. struct.pack("P", self.socket.fileno()),
  255. )
  256. self.protocol = self.connector.buildProtocol(self.getPeer())
  257. self.connected = True
  258. logPrefix = self._getLogPrefix(self.protocol)
  259. self.logstr = logPrefix + ",client"
  260. if self.protocol is None:
  261. # Factory.buildProtocol is allowed to return None. In that
  262. # case, make up a protocol to satisfy the rest of the
  263. # implementation; connectionLost is going to be called on
  264. # something, for example. This is easier than adding special
  265. # case support for a None protocol throughout the rest of the
  266. # transport implementation.
  267. self.protocol = Protocol()
  268. # But dispose of the connection quickly.
  269. self.loseConnection()
  270. else:
  271. self.protocol.makeConnection(self)
  272. self.startReading()
  273. def doConnect(self):
  274. if not hasattr(self, "connector"):
  275. # this happens if we connector.stopConnecting in
  276. # factory.startedConnecting
  277. return
  278. assert _iocp.have_connectex
  279. self.reactor.addActiveHandle(self)
  280. evt = _iocp.Event(self.cbConnect, self)
  281. rc = _iocp.connect(self.socket.fileno(), self.realAddress, evt)
  282. if rc and rc != ERROR_IO_PENDING:
  283. self.cbConnect(rc, 0, evt)
  284. class Server(Connection):
  285. """
  286. Serverside socket-stream connection class.
  287. I am a serverside network connection transport; a socket which came from an
  288. accept() on a server.
  289. @ivar _tlsClientDefault: Always C{False}, indicating that this is a server
  290. connection, and by default when TLS is negotiated this class will act as
  291. a TLS server.
  292. """
  293. _tlsClientDefault = False
  294. def __init__(
  295. self,
  296. sock: socket.socket,
  297. protocol: IProtocol,
  298. clientAddr: Union[IPv4Address, IPv6Address],
  299. serverAddr: Union[IPv4Address, IPv6Address],
  300. sessionno: int,
  301. reactor: IOCPReactor,
  302. ):
  303. """
  304. Server(sock, protocol, client, server, sessionno)
  305. Initialize me with a socket, a protocol, a descriptor for my peer (a
  306. tuple of host, port describing the other end of the connection), an
  307. instance of Port, and a session number.
  308. """
  309. Connection.__init__(self, sock, protocol, reactor)
  310. self.serverAddr = serverAddr
  311. self.clientAddr = clientAddr
  312. self.sessionno = sessionno
  313. logPrefix = self._getLogPrefix(self.protocol)
  314. self.logstr = f"{logPrefix},{sessionno},{self.clientAddr.host}"
  315. self.repstr: str = "<{} #{} on {}>".format(
  316. self.protocol.__class__.__name__,
  317. self.sessionno,
  318. self.serverAddr.port,
  319. )
  320. self.connected = True
  321. self.startReading()
  322. def __repr__(self) -> str:
  323. """
  324. A string representation of this connection.
  325. """
  326. return self.repstr
  327. def getHost(self):
  328. """
  329. Returns an IPv4Address.
  330. This indicates the server's address.
  331. """
  332. return self.serverAddr
  333. def getPeer(self):
  334. """
  335. Returns an IPv4Address.
  336. This indicates the client's address.
  337. """
  338. return self.clientAddr
  339. class Connector(TCPConnector):
  340. def _makeTransport(self):
  341. return Client(self.host, self.port, self.bindAddress, self, self.reactor)
  342. @implementer(interfaces.IListeningPort)
  343. class Port(_SocketCloser, _LogOwner):
  344. connected = False
  345. disconnected = False
  346. disconnecting = False
  347. addressFamily = socket.AF_INET
  348. socketType = socket.SOCK_STREAM
  349. _addressType = address.IPv4Address
  350. sessionno = 0
  351. # Actual port number being listened on, only set to a non-None
  352. # value when we are actually listening.
  353. _realPortNumber: Optional[int] = None
  354. # A string describing the connections which will be created by this port.
  355. # Normally this is C{"TCP"}, since this is a TCP port, but when the TLS
  356. # implementation re-uses this class it overrides the value with C{"TLS"}.
  357. # Only used for logging.
  358. _type = "TCP"
  359. def __init__(self, port, factory, backlog=50, interface="", reactor=None):
  360. self.port = port
  361. self.factory = factory
  362. self.backlog = backlog
  363. self.interface = interface
  364. self.reactor = reactor
  365. if isIPv6Address(interface):
  366. self.addressFamily = socket.AF_INET6
  367. self._addressType = address.IPv6Address
  368. def __repr__(self) -> str:
  369. if self._realPortNumber is not None:
  370. return "<{} of {} on {}>".format(
  371. self.__class__,
  372. self.factory.__class__,
  373. self._realPortNumber,
  374. )
  375. else:
  376. return "<{} of {} (not listening)>".format(
  377. self.__class__,
  378. self.factory.__class__,
  379. )
  380. def startListening(self):
  381. try:
  382. skt = self.reactor.createSocket(self.addressFamily, self.socketType)
  383. # TODO: resolve self.interface if necessary
  384. if self.addressFamily == socket.AF_INET6:
  385. addr = _resolveIPv6(self.interface, self.port)
  386. else:
  387. addr = (self.interface, self.port)
  388. skt.bind(addr)
  389. except OSError as le:
  390. raise error.CannotListenError(self.interface, self.port, le)
  391. self.addrLen = _iocp.maxAddrLen(skt.fileno())
  392. # Make sure that if we listened on port 0, we update that to
  393. # reflect what the OS actually assigned us.
  394. self._realPortNumber = skt.getsockname()[1]
  395. log.msg(
  396. "%s starting on %s"
  397. % (self._getLogPrefix(self.factory), self._realPortNumber)
  398. )
  399. self.factory.doStart()
  400. skt.listen(self.backlog)
  401. self.connected = True
  402. self.disconnected = False
  403. self.reactor.addActiveHandle(self)
  404. self.socket = skt
  405. self.getFileHandle = self.socket.fileno
  406. self.doAccept()
  407. def loseConnection(self, connDone=failure.Failure(main.CONNECTION_DONE)):
  408. """
  409. Stop accepting connections on this port.
  410. This will shut down my socket and call self.connectionLost().
  411. It returns a deferred which will fire successfully when the
  412. port is actually closed.
  413. """
  414. self.disconnecting = True
  415. if self.connected:
  416. self.deferred = defer.Deferred()
  417. self.reactor.callLater(0, self.connectionLost, connDone)
  418. return self.deferred
  419. stopListening = loseConnection
  420. def _logConnectionLostMsg(self):
  421. """
  422. Log message for closing port
  423. """
  424. log.msg(f"({self._type} Port {self._realPortNumber} Closed)")
  425. def connectionLost(self, reason):
  426. """
  427. Cleans up the socket.
  428. """
  429. self._logConnectionLostMsg()
  430. self._realPortNumber = None
  431. d = None
  432. if hasattr(self, "deferred"):
  433. d = self.deferred
  434. del self.deferred
  435. self.disconnected = True
  436. self.reactor.removeActiveHandle(self)
  437. self.connected = False
  438. self._closeSocket(True)
  439. del self.socket
  440. del self.getFileHandle
  441. try:
  442. self.factory.doStop()
  443. except BaseException:
  444. self.disconnecting = False
  445. if d is not None:
  446. d.errback(failure.Failure())
  447. else:
  448. raise
  449. else:
  450. self.disconnecting = False
  451. if d is not None:
  452. d.callback(None)
  453. def logPrefix(self):
  454. """
  455. Returns the name of my class, to prefix log entries with.
  456. """
  457. return reflect.qual(self.factory.__class__)
  458. def getHost(self):
  459. """
  460. Returns an IPv4Address or IPv6Address.
  461. This indicates the server's address.
  462. """
  463. return self._addressType("TCP", *_getsockname(self.socket))
  464. def cbAccept(self, rc, data, evt):
  465. self.handleAccept(rc, evt)
  466. if not (self.disconnecting or self.disconnected):
  467. self.doAccept()
  468. def handleAccept(self, rc, evt):
  469. if self.disconnecting or self.disconnected:
  470. return False
  471. # possible errors:
  472. # (WSAEMFILE, WSAENOBUFS, WSAENFILE, WSAENOMEM, WSAECONNABORTED)
  473. if rc:
  474. log.msg(
  475. "Could not accept new connection -- %s (%s)"
  476. % (errno.errorcode.get(rc, "unknown error"), rc)
  477. )
  478. return False
  479. else:
  480. # Inherit the properties from the listening port socket as
  481. # documented in the `Remarks` section of AcceptEx.
  482. # https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nf-mswsock-acceptex
  483. # In this way we can call getsockname and getpeername on the
  484. # accepted socket.
  485. evt.newskt.setsockopt(
  486. socket.SOL_SOCKET,
  487. SO_UPDATE_ACCEPT_CONTEXT,
  488. struct.pack("P", self.socket.fileno()),
  489. )
  490. family, lAddr, rAddr = _iocp.get_accept_addrs(evt.newskt.fileno(), evt.buff)
  491. assert family == self.addressFamily
  492. # Build an IPv6 address that includes the scopeID, if necessary
  493. if "%" in lAddr[0]:
  494. scope = int(lAddr[0].split("%")[1])
  495. lAddr = (lAddr[0], lAddr[1], 0, scope)
  496. if "%" in rAddr[0]:
  497. scope = int(rAddr[0].split("%")[1])
  498. rAddr = (rAddr[0], rAddr[1], 0, scope)
  499. protocol = self.factory.buildProtocol(self._addressType("TCP", *rAddr))
  500. if protocol is None:
  501. evt.newskt.close()
  502. else:
  503. s = self.sessionno
  504. self.sessionno = s + 1
  505. transport = Server(
  506. evt.newskt,
  507. protocol,
  508. self._addressType("TCP", *rAddr),
  509. self._addressType("TCP", *lAddr),
  510. s,
  511. self.reactor,
  512. )
  513. protocol.makeConnection(transport)
  514. return True
  515. def doAccept(self):
  516. evt = _iocp.Event(self.cbAccept, self)
  517. # see AcceptEx documentation
  518. evt.buff = buff = bytearray(2 * (self.addrLen + 16))
  519. evt.newskt = newskt = self.reactor.createSocket(
  520. self.addressFamily, self.socketType
  521. )
  522. rc = _iocp.accept(self.socket.fileno(), newskt.fileno(), buff, evt)
  523. if rc and rc != ERROR_IO_PENDING:
  524. self.handleAccept(rc, evt)