selector_events.py 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321
  1. """Event loop using a selector and related classes.
  2. A selector is a "notify-when-ready" multiplexer. For a subclass which
  3. also includes support for signal handling, see the unix_events sub-module.
  4. """
  5. __all__ = 'BaseSelectorEventLoop',
  6. import collections
  7. import errno
  8. import functools
  9. import itertools
  10. import os
  11. import selectors
  12. import socket
  13. import warnings
  14. import weakref
  15. try:
  16. import ssl
  17. except ImportError: # pragma: no cover
  18. ssl = None
  19. from . import base_events
  20. from . import constants
  21. from . import events
  22. from . import futures
  23. from . import protocols
  24. from . import sslproto
  25. from . import transports
  26. from . import trsock
  27. from .log import logger
  28. _HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
  29. if _HAS_SENDMSG:
  30. try:
  31. SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
  32. except OSError:
  33. # Fallback to send
  34. _HAS_SENDMSG = False
  35. def _test_selector_event(selector, fd, event):
  36. # Test if the selector is monitoring 'event' events
  37. # for the file descriptor 'fd'.
  38. try:
  39. key = selector.get_key(fd)
  40. except KeyError:
  41. return False
  42. else:
  43. return bool(key.events & event)
  44. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  45. """Selector event loop.
  46. See events.EventLoop for API specification.
  47. """
  48. def __init__(self, selector=None):
  49. super().__init__()
  50. if selector is None:
  51. selector = selectors.DefaultSelector()
  52. logger.debug('Using selector: %s', selector.__class__.__name__)
  53. self._selector = selector
  54. self._make_self_pipe()
  55. self._transports = weakref.WeakValueDictionary()
  56. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  57. extra=None, server=None):
  58. self._ensure_fd_no_transport(sock)
  59. return _SelectorSocketTransport(self, sock, protocol, waiter,
  60. extra, server)
  61. def _make_ssl_transport(
  62. self, rawsock, protocol, sslcontext, waiter=None,
  63. *, server_side=False, server_hostname=None,
  64. extra=None, server=None,
  65. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  66. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
  67. ):
  68. self._ensure_fd_no_transport(rawsock)
  69. ssl_protocol = sslproto.SSLProtocol(
  70. self, protocol, sslcontext, waiter,
  71. server_side, server_hostname,
  72. ssl_handshake_timeout=ssl_handshake_timeout,
  73. ssl_shutdown_timeout=ssl_shutdown_timeout
  74. )
  75. _SelectorSocketTransport(self, rawsock, ssl_protocol,
  76. extra=extra, server=server)
  77. return ssl_protocol._app_transport
  78. def _make_datagram_transport(self, sock, protocol,
  79. address=None, waiter=None, extra=None):
  80. self._ensure_fd_no_transport(sock)
  81. return _SelectorDatagramTransport(self, sock, protocol,
  82. address, waiter, extra)
  83. def close(self):
  84. if self.is_running():
  85. raise RuntimeError("Cannot close a running event loop")
  86. if self.is_closed():
  87. return
  88. self._close_self_pipe()
  89. super().close()
  90. if self._selector is not None:
  91. self._selector.close()
  92. self._selector = None
  93. def _close_self_pipe(self):
  94. self._remove_reader(self._ssock.fileno())
  95. self._ssock.close()
  96. self._ssock = None
  97. self._csock.close()
  98. self._csock = None
  99. self._internal_fds -= 1
  100. def _make_self_pipe(self):
  101. # A self-socket, really. :-)
  102. self._ssock, self._csock = socket.socketpair()
  103. self._ssock.setblocking(False)
  104. self._csock.setblocking(False)
  105. self._internal_fds += 1
  106. self._add_reader(self._ssock.fileno(), self._read_from_self)
  107. def _process_self_data(self, data):
  108. pass
  109. def _read_from_self(self):
  110. while True:
  111. try:
  112. data = self._ssock.recv(4096)
  113. if not data:
  114. break
  115. self._process_self_data(data)
  116. except InterruptedError:
  117. continue
  118. except BlockingIOError:
  119. break
  120. def _write_to_self(self):
  121. # This may be called from a different thread, possibly after
  122. # _close_self_pipe() has been called or even while it is
  123. # running. Guard for self._csock being None or closed. When
  124. # a socket is closed, send() raises OSError (with errno set to
  125. # EBADF, but let's not rely on the exact error code).
  126. csock = self._csock
  127. if csock is None:
  128. return
  129. try:
  130. csock.send(b'\0')
  131. except OSError:
  132. if self._debug:
  133. logger.debug("Fail to write a null byte into the "
  134. "self-pipe socket",
  135. exc_info=True)
  136. def _start_serving(self, protocol_factory, sock,
  137. sslcontext=None, server=None, backlog=100,
  138. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  139. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  140. self._add_reader(sock.fileno(), self._accept_connection,
  141. protocol_factory, sock, sslcontext, server, backlog,
  142. ssl_handshake_timeout, ssl_shutdown_timeout)
  143. def _accept_connection(
  144. self, protocol_factory, sock,
  145. sslcontext=None, server=None, backlog=100,
  146. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  147. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  148. # This method is only called once for each event loop tick where the
  149. # listening socket has triggered an EVENT_READ. There may be multiple
  150. # connections waiting for an .accept() so it is called in a loop.
  151. # See https://bugs.python.org/issue27906 for more details.
  152. for _ in range(backlog):
  153. try:
  154. conn, addr = sock.accept()
  155. if self._debug:
  156. logger.debug("%r got a new connection from %r: %r",
  157. server, addr, conn)
  158. conn.setblocking(False)
  159. except (BlockingIOError, InterruptedError, ConnectionAbortedError):
  160. # Early exit because the socket accept buffer is empty.
  161. return None
  162. except OSError as exc:
  163. # There's nowhere to send the error, so just log it.
  164. if exc.errno in (errno.EMFILE, errno.ENFILE,
  165. errno.ENOBUFS, errno.ENOMEM):
  166. # Some platforms (e.g. Linux keep reporting the FD as
  167. # ready, so we remove the read handler temporarily.
  168. # We'll try again in a while.
  169. self.call_exception_handler({
  170. 'message': 'socket.accept() out of system resource',
  171. 'exception': exc,
  172. 'socket': trsock.TransportSocket(sock),
  173. })
  174. self._remove_reader(sock.fileno())
  175. self.call_later(constants.ACCEPT_RETRY_DELAY,
  176. self._start_serving,
  177. protocol_factory, sock, sslcontext, server,
  178. backlog, ssl_handshake_timeout,
  179. ssl_shutdown_timeout)
  180. else:
  181. raise # The event loop will catch, log and ignore it.
  182. else:
  183. extra = {'peername': addr}
  184. accept = self._accept_connection2(
  185. protocol_factory, conn, extra, sslcontext, server,
  186. ssl_handshake_timeout, ssl_shutdown_timeout)
  187. self.create_task(accept)
  188. async def _accept_connection2(
  189. self, protocol_factory, conn, extra,
  190. sslcontext=None, server=None,
  191. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
  192. ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
  193. protocol = None
  194. transport = None
  195. try:
  196. protocol = protocol_factory()
  197. waiter = self.create_future()
  198. if sslcontext:
  199. transport = self._make_ssl_transport(
  200. conn, protocol, sslcontext, waiter=waiter,
  201. server_side=True, extra=extra, server=server,
  202. ssl_handshake_timeout=ssl_handshake_timeout,
  203. ssl_shutdown_timeout=ssl_shutdown_timeout)
  204. else:
  205. transport = self._make_socket_transport(
  206. conn, protocol, waiter=waiter, extra=extra,
  207. server=server)
  208. try:
  209. await waiter
  210. except BaseException:
  211. transport.close()
  212. # gh-109534: When an exception is raised by the SSLProtocol object the
  213. # exception set in this future can keep the protocol object alive and
  214. # cause a reference cycle.
  215. waiter = None
  216. raise
  217. # It's now up to the protocol to handle the connection.
  218. except (SystemExit, KeyboardInterrupt):
  219. raise
  220. except BaseException as exc:
  221. if self._debug:
  222. context = {
  223. 'message':
  224. 'Error on transport creation for incoming connection',
  225. 'exception': exc,
  226. }
  227. if protocol is not None:
  228. context['protocol'] = protocol
  229. if transport is not None:
  230. context['transport'] = transport
  231. self.call_exception_handler(context)
  232. def _ensure_fd_no_transport(self, fd):
  233. fileno = fd
  234. if not isinstance(fileno, int):
  235. try:
  236. fileno = int(fileno.fileno())
  237. except (AttributeError, TypeError, ValueError):
  238. # This code matches selectors._fileobj_to_fd function.
  239. raise ValueError(f"Invalid file object: {fd!r}") from None
  240. try:
  241. transport = self._transports[fileno]
  242. except KeyError:
  243. pass
  244. else:
  245. if not transport.is_closing():
  246. raise RuntimeError(
  247. f'File descriptor {fd!r} is used by transport '
  248. f'{transport!r}')
  249. def _add_reader(self, fd, callback, *args):
  250. self._check_closed()
  251. handle = events.Handle(callback, args, self, None)
  252. try:
  253. key = self._selector.get_key(fd)
  254. except KeyError:
  255. self._selector.register(fd, selectors.EVENT_READ,
  256. (handle, None))
  257. else:
  258. mask, (reader, writer) = key.events, key.data
  259. self._selector.modify(fd, mask | selectors.EVENT_READ,
  260. (handle, writer))
  261. if reader is not None:
  262. reader.cancel()
  263. return handle
  264. def _remove_reader(self, fd):
  265. if self.is_closed():
  266. return False
  267. try:
  268. key = self._selector.get_key(fd)
  269. except KeyError:
  270. return False
  271. else:
  272. mask, (reader, writer) = key.events, key.data
  273. mask &= ~selectors.EVENT_READ
  274. if not mask:
  275. self._selector.unregister(fd)
  276. else:
  277. self._selector.modify(fd, mask, (None, writer))
  278. if reader is not None:
  279. reader.cancel()
  280. return True
  281. else:
  282. return False
  283. def _add_writer(self, fd, callback, *args):
  284. self._check_closed()
  285. handle = events.Handle(callback, args, self, None)
  286. try:
  287. key = self._selector.get_key(fd)
  288. except KeyError:
  289. self._selector.register(fd, selectors.EVENT_WRITE,
  290. (None, handle))
  291. else:
  292. mask, (reader, writer) = key.events, key.data
  293. self._selector.modify(fd, mask | selectors.EVENT_WRITE,
  294. (reader, handle))
  295. if writer is not None:
  296. writer.cancel()
  297. return handle
  298. def _remove_writer(self, fd):
  299. """Remove a writer callback."""
  300. if self.is_closed():
  301. return False
  302. try:
  303. key = self._selector.get_key(fd)
  304. except KeyError:
  305. return False
  306. else:
  307. mask, (reader, writer) = key.events, key.data
  308. # Remove both writer and connector.
  309. mask &= ~selectors.EVENT_WRITE
  310. if not mask:
  311. self._selector.unregister(fd)
  312. else:
  313. self._selector.modify(fd, mask, (reader, None))
  314. if writer is not None:
  315. writer.cancel()
  316. return True
  317. else:
  318. return False
  319. def add_reader(self, fd, callback, *args):
  320. """Add a reader callback."""
  321. self._ensure_fd_no_transport(fd)
  322. self._add_reader(fd, callback, *args)
  323. def remove_reader(self, fd):
  324. """Remove a reader callback."""
  325. self._ensure_fd_no_transport(fd)
  326. return self._remove_reader(fd)
  327. def add_writer(self, fd, callback, *args):
  328. """Add a writer callback.."""
  329. self._ensure_fd_no_transport(fd)
  330. self._add_writer(fd, callback, *args)
  331. def remove_writer(self, fd):
  332. """Remove a writer callback."""
  333. self._ensure_fd_no_transport(fd)
  334. return self._remove_writer(fd)
  335. async def sock_recv(self, sock, n):
  336. """Receive data from the socket.
  337. The return value is a bytes object representing the data received.
  338. The maximum amount of data to be received at once is specified by
  339. nbytes.
  340. """
  341. base_events._check_ssl_socket(sock)
  342. if self._debug and sock.gettimeout() != 0:
  343. raise ValueError("the socket must be non-blocking")
  344. try:
  345. return sock.recv(n)
  346. except (BlockingIOError, InterruptedError):
  347. pass
  348. fut = self.create_future()
  349. fd = sock.fileno()
  350. self._ensure_fd_no_transport(fd)
  351. handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
  352. fut.add_done_callback(
  353. functools.partial(self._sock_read_done, fd, handle=handle))
  354. return await fut
  355. def _sock_read_done(self, fd, fut, handle=None):
  356. if handle is None or not handle.cancelled():
  357. self.remove_reader(fd)
  358. def _sock_recv(self, fut, sock, n):
  359. # _sock_recv() can add itself as an I/O callback if the operation can't
  360. # be done immediately. Don't use it directly, call sock_recv().
  361. if fut.done():
  362. return
  363. try:
  364. data = sock.recv(n)
  365. except (BlockingIOError, InterruptedError):
  366. return # try again next time
  367. except (SystemExit, KeyboardInterrupt):
  368. raise
  369. except BaseException as exc:
  370. fut.set_exception(exc)
  371. else:
  372. fut.set_result(data)
  373. async def sock_recv_into(self, sock, buf):
  374. """Receive data from the socket.
  375. The received data is written into *buf* (a writable buffer).
  376. The return value is the number of bytes written.
  377. """
  378. base_events._check_ssl_socket(sock)
  379. if self._debug and sock.gettimeout() != 0:
  380. raise ValueError("the socket must be non-blocking")
  381. try:
  382. return sock.recv_into(buf)
  383. except (BlockingIOError, InterruptedError):
  384. pass
  385. fut = self.create_future()
  386. fd = sock.fileno()
  387. self._ensure_fd_no_transport(fd)
  388. handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
  389. fut.add_done_callback(
  390. functools.partial(self._sock_read_done, fd, handle=handle))
  391. return await fut
  392. def _sock_recv_into(self, fut, sock, buf):
  393. # _sock_recv_into() can add itself as an I/O callback if the operation
  394. # can't be done immediately. Don't use it directly, call
  395. # sock_recv_into().
  396. if fut.done():
  397. return
  398. try:
  399. nbytes = sock.recv_into(buf)
  400. except (BlockingIOError, InterruptedError):
  401. return # try again next time
  402. except (SystemExit, KeyboardInterrupt):
  403. raise
  404. except BaseException as exc:
  405. fut.set_exception(exc)
  406. else:
  407. fut.set_result(nbytes)
  408. async def sock_recvfrom(self, sock, bufsize):
  409. """Receive a datagram from a datagram socket.
  410. The return value is a tuple of (bytes, address) representing the
  411. datagram received and the address it came from.
  412. The maximum amount of data to be received at once is specified by
  413. nbytes.
  414. """
  415. base_events._check_ssl_socket(sock)
  416. if self._debug and sock.gettimeout() != 0:
  417. raise ValueError("the socket must be non-blocking")
  418. try:
  419. return sock.recvfrom(bufsize)
  420. except (BlockingIOError, InterruptedError):
  421. pass
  422. fut = self.create_future()
  423. fd = sock.fileno()
  424. self._ensure_fd_no_transport(fd)
  425. handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
  426. fut.add_done_callback(
  427. functools.partial(self._sock_read_done, fd, handle=handle))
  428. return await fut
  429. def _sock_recvfrom(self, fut, sock, bufsize):
  430. # _sock_recvfrom() can add itself as an I/O callback if the operation
  431. # can't be done immediately. Don't use it directly, call
  432. # sock_recvfrom().
  433. if fut.done():
  434. return
  435. try:
  436. result = sock.recvfrom(bufsize)
  437. except (BlockingIOError, InterruptedError):
  438. return # try again next time
  439. except (SystemExit, KeyboardInterrupt):
  440. raise
  441. except BaseException as exc:
  442. fut.set_exception(exc)
  443. else:
  444. fut.set_result(result)
  445. async def sock_recvfrom_into(self, sock, buf, nbytes=0):
  446. """Receive data from the socket.
  447. The received data is written into *buf* (a writable buffer).
  448. The return value is a tuple of (number of bytes written, address).
  449. """
  450. base_events._check_ssl_socket(sock)
  451. if self._debug and sock.gettimeout() != 0:
  452. raise ValueError("the socket must be non-blocking")
  453. if not nbytes:
  454. nbytes = len(buf)
  455. try:
  456. return sock.recvfrom_into(buf, nbytes)
  457. except (BlockingIOError, InterruptedError):
  458. pass
  459. fut = self.create_future()
  460. fd = sock.fileno()
  461. self._ensure_fd_no_transport(fd)
  462. handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
  463. nbytes)
  464. fut.add_done_callback(
  465. functools.partial(self._sock_read_done, fd, handle=handle))
  466. return await fut
  467. def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
  468. # _sock_recv_into() can add itself as an I/O callback if the operation
  469. # can't be done immediately. Don't use it directly, call
  470. # sock_recv_into().
  471. if fut.done():
  472. return
  473. try:
  474. result = sock.recvfrom_into(buf, bufsize)
  475. except (BlockingIOError, InterruptedError):
  476. return # try again next time
  477. except (SystemExit, KeyboardInterrupt):
  478. raise
  479. except BaseException as exc:
  480. fut.set_exception(exc)
  481. else:
  482. fut.set_result(result)
  483. async def sock_sendall(self, sock, data):
  484. """Send data to the socket.
  485. The socket must be connected to a remote socket. This method continues
  486. to send data from data until either all data has been sent or an
  487. error occurs. None is returned on success. On error, an exception is
  488. raised, and there is no way to determine how much data, if any, was
  489. successfully processed by the receiving end of the connection.
  490. """
  491. base_events._check_ssl_socket(sock)
  492. if self._debug and sock.gettimeout() != 0:
  493. raise ValueError("the socket must be non-blocking")
  494. try:
  495. n = sock.send(data)
  496. except (BlockingIOError, InterruptedError):
  497. n = 0
  498. if n == len(data):
  499. # all data sent
  500. return
  501. fut = self.create_future()
  502. fd = sock.fileno()
  503. self._ensure_fd_no_transport(fd)
  504. # use a trick with a list in closure to store a mutable state
  505. handle = self._add_writer(fd, self._sock_sendall, fut, sock,
  506. memoryview(data), [n])
  507. fut.add_done_callback(
  508. functools.partial(self._sock_write_done, fd, handle=handle))
  509. return await fut
  510. def _sock_sendall(self, fut, sock, view, pos):
  511. if fut.done():
  512. # Future cancellation can be scheduled on previous loop iteration
  513. return
  514. start = pos[0]
  515. try:
  516. n = sock.send(view[start:])
  517. except (BlockingIOError, InterruptedError):
  518. return
  519. except (SystemExit, KeyboardInterrupt):
  520. raise
  521. except BaseException as exc:
  522. fut.set_exception(exc)
  523. return
  524. start += n
  525. if start == len(view):
  526. fut.set_result(None)
  527. else:
  528. pos[0] = start
  529. async def sock_sendto(self, sock, data, address):
  530. """Send data to the socket.
  531. The socket must be connected to a remote socket. This method continues
  532. to send data from data until either all data has been sent or an
  533. error occurs. None is returned on success. On error, an exception is
  534. raised, and there is no way to determine how much data, if any, was
  535. successfully processed by the receiving end of the connection.
  536. """
  537. base_events._check_ssl_socket(sock)
  538. if self._debug and sock.gettimeout() != 0:
  539. raise ValueError("the socket must be non-blocking")
  540. try:
  541. return sock.sendto(data, address)
  542. except (BlockingIOError, InterruptedError):
  543. pass
  544. fut = self.create_future()
  545. fd = sock.fileno()
  546. self._ensure_fd_no_transport(fd)
  547. # use a trick with a list in closure to store a mutable state
  548. handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
  549. address)
  550. fut.add_done_callback(
  551. functools.partial(self._sock_write_done, fd, handle=handle))
  552. return await fut
  553. def _sock_sendto(self, fut, sock, data, address):
  554. if fut.done():
  555. # Future cancellation can be scheduled on previous loop iteration
  556. return
  557. try:
  558. n = sock.sendto(data, 0, address)
  559. except (BlockingIOError, InterruptedError):
  560. return
  561. except (SystemExit, KeyboardInterrupt):
  562. raise
  563. except BaseException as exc:
  564. fut.set_exception(exc)
  565. else:
  566. fut.set_result(n)
  567. async def sock_connect(self, sock, address):
  568. """Connect to a remote socket at address.
  569. This method is a coroutine.
  570. """
  571. base_events._check_ssl_socket(sock)
  572. if self._debug and sock.gettimeout() != 0:
  573. raise ValueError("the socket must be non-blocking")
  574. if sock.family == socket.AF_INET or (
  575. base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
  576. resolved = await self._ensure_resolved(
  577. address, family=sock.family, type=sock.type, proto=sock.proto,
  578. loop=self,
  579. )
  580. _, _, _, _, address = resolved[0]
  581. fut = self.create_future()
  582. self._sock_connect(fut, sock, address)
  583. try:
  584. return await fut
  585. finally:
  586. # Needed to break cycles when an exception occurs.
  587. fut = None
  588. def _sock_connect(self, fut, sock, address):
  589. fd = sock.fileno()
  590. try:
  591. sock.connect(address)
  592. except (BlockingIOError, InterruptedError):
  593. # Issue #23618: When the C function connect() fails with EINTR, the
  594. # connection runs in background. We have to wait until the socket
  595. # becomes writable to be notified when the connection succeed or
  596. # fails.
  597. self._ensure_fd_no_transport(fd)
  598. handle = self._add_writer(
  599. fd, self._sock_connect_cb, fut, sock, address)
  600. fut.add_done_callback(
  601. functools.partial(self._sock_write_done, fd, handle=handle))
  602. except (SystemExit, KeyboardInterrupt):
  603. raise
  604. except BaseException as exc:
  605. fut.set_exception(exc)
  606. else:
  607. fut.set_result(None)
  608. finally:
  609. fut = None
  610. def _sock_write_done(self, fd, fut, handle=None):
  611. if handle is None or not handle.cancelled():
  612. self.remove_writer(fd)
  613. def _sock_connect_cb(self, fut, sock, address):
  614. if fut.done():
  615. return
  616. try:
  617. err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  618. if err != 0:
  619. # Jump to any except clause below.
  620. raise OSError(err, f'Connect call failed {address}')
  621. except (BlockingIOError, InterruptedError):
  622. # socket is still registered, the callback will be retried later
  623. pass
  624. except (SystemExit, KeyboardInterrupt):
  625. raise
  626. except BaseException as exc:
  627. fut.set_exception(exc)
  628. else:
  629. fut.set_result(None)
  630. finally:
  631. fut = None
  632. async def sock_accept(self, sock):
  633. """Accept a connection.
  634. The socket must be bound to an address and listening for connections.
  635. The return value is a pair (conn, address) where conn is a new socket
  636. object usable to send and receive data on the connection, and address
  637. is the address bound to the socket on the other end of the connection.
  638. """
  639. base_events._check_ssl_socket(sock)
  640. if self._debug and sock.gettimeout() != 0:
  641. raise ValueError("the socket must be non-blocking")
  642. fut = self.create_future()
  643. self._sock_accept(fut, sock)
  644. return await fut
  645. def _sock_accept(self, fut, sock):
  646. fd = sock.fileno()
  647. try:
  648. conn, address = sock.accept()
  649. conn.setblocking(False)
  650. except (BlockingIOError, InterruptedError):
  651. self._ensure_fd_no_transport(fd)
  652. handle = self._add_reader(fd, self._sock_accept, fut, sock)
  653. fut.add_done_callback(
  654. functools.partial(self._sock_read_done, fd, handle=handle))
  655. except (SystemExit, KeyboardInterrupt):
  656. raise
  657. except BaseException as exc:
  658. fut.set_exception(exc)
  659. else:
  660. fut.set_result((conn, address))
  661. async def _sendfile_native(self, transp, file, offset, count):
  662. del self._transports[transp._sock_fd]
  663. resume_reading = transp.is_reading()
  664. transp.pause_reading()
  665. await transp._make_empty_waiter()
  666. try:
  667. return await self.sock_sendfile(transp._sock, file, offset, count,
  668. fallback=False)
  669. finally:
  670. transp._reset_empty_waiter()
  671. if resume_reading:
  672. transp.resume_reading()
  673. self._transports[transp._sock_fd] = transp
  674. def _process_events(self, event_list):
  675. for key, mask in event_list:
  676. fileobj, (reader, writer) = key.fileobj, key.data
  677. if mask & selectors.EVENT_READ and reader is not None:
  678. if reader._cancelled:
  679. self._remove_reader(fileobj)
  680. else:
  681. self._add_callback(reader)
  682. if mask & selectors.EVENT_WRITE and writer is not None:
  683. if writer._cancelled:
  684. self._remove_writer(fileobj)
  685. else:
  686. self._add_callback(writer)
  687. def _stop_serving(self, sock):
  688. self._remove_reader(sock.fileno())
  689. sock.close()
  690. class _SelectorTransport(transports._FlowControlMixin,
  691. transports.Transport):
  692. max_size = 256 * 1024 # Buffer size passed to recv().
  693. # Attribute used in the destructor: it must be set even if the constructor
  694. # is not called (see _SelectorSslTransport which may start by raising an
  695. # exception)
  696. _sock = None
  697. def __init__(self, loop, sock, protocol, extra=None, server=None):
  698. super().__init__(extra, loop)
  699. self._extra['socket'] = trsock.TransportSocket(sock)
  700. try:
  701. self._extra['sockname'] = sock.getsockname()
  702. except OSError:
  703. self._extra['sockname'] = None
  704. if 'peername' not in self._extra:
  705. try:
  706. self._extra['peername'] = sock.getpeername()
  707. except socket.error:
  708. self._extra['peername'] = None
  709. self._sock = sock
  710. self._sock_fd = sock.fileno()
  711. self._protocol_connected = False
  712. self.set_protocol(protocol)
  713. self._server = server
  714. self._buffer = collections.deque()
  715. self._conn_lost = 0 # Set when call to connection_lost scheduled.
  716. self._closing = False # Set when close() called.
  717. self._paused = False # Set when pause_reading() called
  718. if self._server is not None:
  719. self._server._attach()
  720. loop._transports[self._sock_fd] = self
  721. def __repr__(self):
  722. info = [self.__class__.__name__]
  723. if self._sock is None:
  724. info.append('closed')
  725. elif self._closing:
  726. info.append('closing')
  727. info.append(f'fd={self._sock_fd}')
  728. # test if the transport was closed
  729. if self._loop is not None and not self._loop.is_closed():
  730. polling = _test_selector_event(self._loop._selector,
  731. self._sock_fd, selectors.EVENT_READ)
  732. if polling:
  733. info.append('read=polling')
  734. else:
  735. info.append('read=idle')
  736. polling = _test_selector_event(self._loop._selector,
  737. self._sock_fd,
  738. selectors.EVENT_WRITE)
  739. if polling:
  740. state = 'polling'
  741. else:
  742. state = 'idle'
  743. bufsize = self.get_write_buffer_size()
  744. info.append(f'write=<{state}, bufsize={bufsize}>')
  745. return '<{}>'.format(' '.join(info))
  746. def abort(self):
  747. self._force_close(None)
  748. def set_protocol(self, protocol):
  749. self._protocol = protocol
  750. self._protocol_connected = True
  751. def get_protocol(self):
  752. return self._protocol
  753. def is_closing(self):
  754. return self._closing
  755. def is_reading(self):
  756. return not self.is_closing() and not self._paused
  757. def pause_reading(self):
  758. if not self.is_reading():
  759. return
  760. self._paused = True
  761. self._loop._remove_reader(self._sock_fd)
  762. if self._loop.get_debug():
  763. logger.debug("%r pauses reading", self)
  764. def resume_reading(self):
  765. if self._closing or not self._paused:
  766. return
  767. self._paused = False
  768. self._add_reader(self._sock_fd, self._read_ready)
  769. if self._loop.get_debug():
  770. logger.debug("%r resumes reading", self)
  771. def close(self):
  772. if self._closing:
  773. return
  774. self._closing = True
  775. self._loop._remove_reader(self._sock_fd)
  776. if not self._buffer:
  777. self._conn_lost += 1
  778. self._loop._remove_writer(self._sock_fd)
  779. self._loop.call_soon(self._call_connection_lost, None)
  780. def __del__(self, _warn=warnings.warn):
  781. if self._sock is not None:
  782. _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
  783. self._sock.close()
  784. def _fatal_error(self, exc, message='Fatal error on transport'):
  785. # Should be called from exception handler only.
  786. if isinstance(exc, OSError):
  787. if self._loop.get_debug():
  788. logger.debug("%r: %s", self, message, exc_info=True)
  789. else:
  790. self._loop.call_exception_handler({
  791. 'message': message,
  792. 'exception': exc,
  793. 'transport': self,
  794. 'protocol': self._protocol,
  795. })
  796. self._force_close(exc)
  797. def _force_close(self, exc):
  798. if self._conn_lost:
  799. return
  800. if self._buffer:
  801. self._buffer.clear()
  802. self._loop._remove_writer(self._sock_fd)
  803. if not self._closing:
  804. self._closing = True
  805. self._loop._remove_reader(self._sock_fd)
  806. self._conn_lost += 1
  807. self._loop.call_soon(self._call_connection_lost, exc)
  808. def _call_connection_lost(self, exc):
  809. try:
  810. if self._protocol_connected:
  811. self._protocol.connection_lost(exc)
  812. finally:
  813. self._sock.close()
  814. self._sock = None
  815. self._protocol = None
  816. self._loop = None
  817. server = self._server
  818. if server is not None:
  819. server._detach()
  820. self._server = None
  821. def get_write_buffer_size(self):
  822. return sum(map(len, self._buffer))
  823. def _add_reader(self, fd, callback, *args):
  824. if not self.is_reading():
  825. return
  826. self._loop._add_reader(fd, callback, *args)
  827. class _SelectorSocketTransport(_SelectorTransport):
  828. _start_tls_compatible = True
  829. _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
  830. def __init__(self, loop, sock, protocol, waiter=None,
  831. extra=None, server=None):
  832. self._read_ready_cb = None
  833. super().__init__(loop, sock, protocol, extra, server)
  834. self._eof = False
  835. self._empty_waiter = None
  836. if _HAS_SENDMSG:
  837. self._write_ready = self._write_sendmsg
  838. else:
  839. self._write_ready = self._write_send
  840. # Disable the Nagle algorithm -- small writes will be
  841. # sent without waiting for the TCP ACK. This generally
  842. # decreases the latency (in some cases significantly.)
  843. base_events._set_nodelay(self._sock)
  844. self._loop.call_soon(self._protocol.connection_made, self)
  845. # only start reading when connection_made() has been called
  846. self._loop.call_soon(self._add_reader,
  847. self._sock_fd, self._read_ready)
  848. if waiter is not None:
  849. # only wake up the waiter when connection_made() has been called
  850. self._loop.call_soon(futures._set_result_unless_cancelled,
  851. waiter, None)
  852. def set_protocol(self, protocol):
  853. if isinstance(protocol, protocols.BufferedProtocol):
  854. self._read_ready_cb = self._read_ready__get_buffer
  855. else:
  856. self._read_ready_cb = self._read_ready__data_received
  857. super().set_protocol(protocol)
  858. def _read_ready(self):
  859. self._read_ready_cb()
  860. def _read_ready__get_buffer(self):
  861. if self._conn_lost:
  862. return
  863. try:
  864. buf = self._protocol.get_buffer(-1)
  865. if not len(buf):
  866. raise RuntimeError('get_buffer() returned an empty buffer')
  867. except (SystemExit, KeyboardInterrupt):
  868. raise
  869. except BaseException as exc:
  870. self._fatal_error(
  871. exc, 'Fatal error: protocol.get_buffer() call failed.')
  872. return
  873. try:
  874. nbytes = self._sock.recv_into(buf)
  875. except (BlockingIOError, InterruptedError):
  876. return
  877. except (SystemExit, KeyboardInterrupt):
  878. raise
  879. except BaseException as exc:
  880. self._fatal_error(exc, 'Fatal read error on socket transport')
  881. return
  882. if not nbytes:
  883. self._read_ready__on_eof()
  884. return
  885. try:
  886. self._protocol.buffer_updated(nbytes)
  887. except (SystemExit, KeyboardInterrupt):
  888. raise
  889. except BaseException as exc:
  890. self._fatal_error(
  891. exc, 'Fatal error: protocol.buffer_updated() call failed.')
  892. def _read_ready__data_received(self):
  893. if self._conn_lost:
  894. return
  895. try:
  896. data = self._sock.recv(self.max_size)
  897. except (BlockingIOError, InterruptedError):
  898. return
  899. except (SystemExit, KeyboardInterrupt):
  900. raise
  901. except BaseException as exc:
  902. self._fatal_error(exc, 'Fatal read error on socket transport')
  903. return
  904. if not data:
  905. self._read_ready__on_eof()
  906. return
  907. try:
  908. self._protocol.data_received(data)
  909. except (SystemExit, KeyboardInterrupt):
  910. raise
  911. except BaseException as exc:
  912. self._fatal_error(
  913. exc, 'Fatal error: protocol.data_received() call failed.')
  914. def _read_ready__on_eof(self):
  915. if self._loop.get_debug():
  916. logger.debug("%r received EOF", self)
  917. try:
  918. keep_open = self._protocol.eof_received()
  919. except (SystemExit, KeyboardInterrupt):
  920. raise
  921. except BaseException as exc:
  922. self._fatal_error(
  923. exc, 'Fatal error: protocol.eof_received() call failed.')
  924. return
  925. if keep_open:
  926. # We're keeping the connection open so the
  927. # protocol can write more, but we still can't
  928. # receive more, so remove the reader callback.
  929. self._loop._remove_reader(self._sock_fd)
  930. else:
  931. self.close()
  932. def write(self, data):
  933. if not isinstance(data, (bytes, bytearray, memoryview)):
  934. raise TypeError(f'data argument must be a bytes-like object, '
  935. f'not {type(data).__name__!r}')
  936. if self._eof:
  937. raise RuntimeError('Cannot call write() after write_eof()')
  938. if self._empty_waiter is not None:
  939. raise RuntimeError('unable to write; sendfile is in progress')
  940. if not data:
  941. return
  942. if self._conn_lost:
  943. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  944. logger.warning('socket.send() raised exception.')
  945. self._conn_lost += 1
  946. return
  947. if not self._buffer:
  948. # Optimization: try to send now.
  949. try:
  950. n = self._sock.send(data)
  951. except (BlockingIOError, InterruptedError):
  952. pass
  953. except (SystemExit, KeyboardInterrupt):
  954. raise
  955. except BaseException as exc:
  956. self._fatal_error(exc, 'Fatal write error on socket transport')
  957. return
  958. else:
  959. data = memoryview(data)[n:]
  960. if not data:
  961. return
  962. # Not all was written; register write handler.
  963. self._loop._add_writer(self._sock_fd, self._write_ready)
  964. # Add it to the buffer.
  965. self._buffer.append(data)
  966. self._maybe_pause_protocol()
  967. def _get_sendmsg_buffer(self):
  968. return itertools.islice(self._buffer, SC_IOV_MAX)
  969. def _write_sendmsg(self):
  970. assert self._buffer, 'Data should not be empty'
  971. if self._conn_lost:
  972. return
  973. try:
  974. nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
  975. self._adjust_leftover_buffer(nbytes)
  976. except (BlockingIOError, InterruptedError):
  977. pass
  978. except (SystemExit, KeyboardInterrupt):
  979. raise
  980. except BaseException as exc:
  981. self._loop._remove_writer(self._sock_fd)
  982. self._buffer.clear()
  983. self._fatal_error(exc, 'Fatal write error on socket transport')
  984. if self._empty_waiter is not None:
  985. self._empty_waiter.set_exception(exc)
  986. else:
  987. self._maybe_resume_protocol() # May append to buffer.
  988. if not self._buffer:
  989. self._loop._remove_writer(self._sock_fd)
  990. if self._empty_waiter is not None:
  991. self._empty_waiter.set_result(None)
  992. if self._closing:
  993. self._call_connection_lost(None)
  994. elif self._eof:
  995. self._sock.shutdown(socket.SHUT_WR)
  996. def _adjust_leftover_buffer(self, nbytes: int) -> None:
  997. buffer = self._buffer
  998. while nbytes:
  999. b = buffer.popleft()
  1000. b_len = len(b)
  1001. if b_len <= nbytes:
  1002. nbytes -= b_len
  1003. else:
  1004. buffer.appendleft(b[nbytes:])
  1005. break
  1006. def _write_send(self):
  1007. assert self._buffer, 'Data should not be empty'
  1008. if self._conn_lost:
  1009. return
  1010. try:
  1011. buffer = self._buffer.popleft()
  1012. n = self._sock.send(buffer)
  1013. if n != len(buffer):
  1014. # Not all data was written
  1015. self._buffer.appendleft(buffer[n:])
  1016. except (BlockingIOError, InterruptedError):
  1017. pass
  1018. except (SystemExit, KeyboardInterrupt):
  1019. raise
  1020. except BaseException as exc:
  1021. self._loop._remove_writer(self._sock_fd)
  1022. self._buffer.clear()
  1023. self._fatal_error(exc, 'Fatal write error on socket transport')
  1024. if self._empty_waiter is not None:
  1025. self._empty_waiter.set_exception(exc)
  1026. else:
  1027. self._maybe_resume_protocol() # May append to buffer.
  1028. if not self._buffer:
  1029. self._loop._remove_writer(self._sock_fd)
  1030. if self._empty_waiter is not None:
  1031. self._empty_waiter.set_result(None)
  1032. if self._closing:
  1033. self._call_connection_lost(None)
  1034. elif self._eof:
  1035. self._sock.shutdown(socket.SHUT_WR)
  1036. def write_eof(self):
  1037. if self._closing or self._eof:
  1038. return
  1039. self._eof = True
  1040. if not self._buffer:
  1041. self._sock.shutdown(socket.SHUT_WR)
  1042. def writelines(self, list_of_data):
  1043. if self._eof:
  1044. raise RuntimeError('Cannot call writelines() after write_eof()')
  1045. if self._empty_waiter is not None:
  1046. raise RuntimeError('unable to writelines; sendfile is in progress')
  1047. if not list_of_data:
  1048. return
  1049. self._buffer.extend([memoryview(data) for data in list_of_data])
  1050. self._write_ready()
  1051. # If the entire buffer couldn't be written, register a write handler
  1052. if self._buffer:
  1053. self._loop._add_writer(self._sock_fd, self._write_ready)
  1054. def can_write_eof(self):
  1055. return True
  1056. def _call_connection_lost(self, exc):
  1057. super()._call_connection_lost(exc)
  1058. if self._empty_waiter is not None:
  1059. self._empty_waiter.set_exception(
  1060. ConnectionError("Connection is closed by peer"))
  1061. def _make_empty_waiter(self):
  1062. if self._empty_waiter is not None:
  1063. raise RuntimeError("Empty waiter is already set")
  1064. self._empty_waiter = self._loop.create_future()
  1065. if not self._buffer:
  1066. self._empty_waiter.set_result(None)
  1067. return self._empty_waiter
  1068. def _reset_empty_waiter(self):
  1069. self._empty_waiter = None
  1070. def close(self):
  1071. self._read_ready_cb = None
  1072. self._write_ready = None
  1073. super().close()
  1074. class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
  1075. _buffer_factory = collections.deque
  1076. def __init__(self, loop, sock, protocol, address=None,
  1077. waiter=None, extra=None):
  1078. super().__init__(loop, sock, protocol, extra)
  1079. self._address = address
  1080. self._buffer_size = 0
  1081. self._loop.call_soon(self._protocol.connection_made, self)
  1082. # only start reading when connection_made() has been called
  1083. self._loop.call_soon(self._add_reader,
  1084. self._sock_fd, self._read_ready)
  1085. if waiter is not None:
  1086. # only wake up the waiter when connection_made() has been called
  1087. self._loop.call_soon(futures._set_result_unless_cancelled,
  1088. waiter, None)
  1089. def get_write_buffer_size(self):
  1090. return self._buffer_size
  1091. def _read_ready(self):
  1092. if self._conn_lost:
  1093. return
  1094. try:
  1095. data, addr = self._sock.recvfrom(self.max_size)
  1096. except (BlockingIOError, InterruptedError):
  1097. pass
  1098. except OSError as exc:
  1099. self._protocol.error_received(exc)
  1100. except (SystemExit, KeyboardInterrupt):
  1101. raise
  1102. except BaseException as exc:
  1103. self._fatal_error(exc, 'Fatal read error on datagram transport')
  1104. else:
  1105. self._protocol.datagram_received(data, addr)
  1106. def sendto(self, data, addr=None):
  1107. if not isinstance(data, (bytes, bytearray, memoryview)):
  1108. raise TypeError(f'data argument must be a bytes-like object, '
  1109. f'not {type(data).__name__!r}')
  1110. if not data:
  1111. return
  1112. if self._address:
  1113. if addr not in (None, self._address):
  1114. raise ValueError(
  1115. f'Invalid address: must be None or {self._address}')
  1116. addr = self._address
  1117. if self._conn_lost and self._address:
  1118. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  1119. logger.warning('socket.send() raised exception.')
  1120. self._conn_lost += 1
  1121. return
  1122. if not self._buffer:
  1123. # Attempt to send it right away first.
  1124. try:
  1125. if self._extra['peername']:
  1126. self._sock.send(data)
  1127. else:
  1128. self._sock.sendto(data, addr)
  1129. return
  1130. except (BlockingIOError, InterruptedError):
  1131. self._loop._add_writer(self._sock_fd, self._sendto_ready)
  1132. except OSError as exc:
  1133. self._protocol.error_received(exc)
  1134. return
  1135. except (SystemExit, KeyboardInterrupt):
  1136. raise
  1137. except BaseException as exc:
  1138. self._fatal_error(
  1139. exc, 'Fatal write error on datagram transport')
  1140. return
  1141. # Ensure that what we buffer is immutable.
  1142. self._buffer.append((bytes(data), addr))
  1143. self._buffer_size += len(data)
  1144. self._maybe_pause_protocol()
  1145. def _sendto_ready(self):
  1146. while self._buffer:
  1147. data, addr = self._buffer.popleft()
  1148. self._buffer_size -= len(data)
  1149. try:
  1150. if self._extra['peername']:
  1151. self._sock.send(data)
  1152. else:
  1153. self._sock.sendto(data, addr)
  1154. except (BlockingIOError, InterruptedError):
  1155. self._buffer.appendleft((data, addr)) # Try again later.
  1156. self._buffer_size += len(data)
  1157. break
  1158. except OSError as exc:
  1159. self._protocol.error_received(exc)
  1160. return
  1161. except (SystemExit, KeyboardInterrupt):
  1162. raise
  1163. except BaseException as exc:
  1164. self._fatal_error(
  1165. exc, 'Fatal write error on datagram transport')
  1166. return
  1167. self._maybe_resume_protocol() # May append to buffer.
  1168. if not self._buffer:
  1169. self._loop._remove_writer(self._sock_fd)
  1170. if self._closing:
  1171. self._call_connection_lost(None)