windows_events.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. """Selector and proactor event loops for Windows."""
  2. import sys
  3. if sys.platform != 'win32': # pragma: no cover
  4. raise ImportError('win32 only')
  5. import _overlapped
  6. import _winapi
  7. import errno
  8. from functools import partial
  9. import math
  10. import msvcrt
  11. import socket
  12. import struct
  13. import time
  14. import weakref
  15. from . import events
  16. from . import base_subprocess
  17. from . import futures
  18. from . import exceptions
  19. from . import proactor_events
  20. from . import selector_events
  21. from . import tasks
  22. from . import windows_utils
  23. from .log import logger
  24. __all__ = (
  25. 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
  26. 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
  27. 'WindowsProactorEventLoopPolicy',
  28. )
  29. NULL = _winapi.NULL
  30. INFINITE = _winapi.INFINITE
  31. ERROR_CONNECTION_REFUSED = 1225
  32. ERROR_CONNECTION_ABORTED = 1236
  33. # Initial delay in seconds for connect_pipe() before retrying to connect
  34. CONNECT_PIPE_INIT_DELAY = 0.001
  35. # Maximum delay in seconds for connect_pipe() before retrying to connect
  36. CONNECT_PIPE_MAX_DELAY = 0.100
  37. class _OverlappedFuture(futures.Future):
  38. """Subclass of Future which represents an overlapped operation.
  39. Cancelling it will immediately cancel the overlapped operation.
  40. """
  41. def __init__(self, ov, *, loop=None):
  42. super().__init__(loop=loop)
  43. if self._source_traceback:
  44. del self._source_traceback[-1]
  45. self._ov = ov
  46. def _repr_info(self):
  47. info = super()._repr_info()
  48. if self._ov is not None:
  49. state = 'pending' if self._ov.pending else 'completed'
  50. info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
  51. return info
  52. def _cancel_overlapped(self):
  53. if self._ov is None:
  54. return
  55. try:
  56. self._ov.cancel()
  57. except OSError as exc:
  58. context = {
  59. 'message': 'Cancelling an overlapped future failed',
  60. 'exception': exc,
  61. 'future': self,
  62. }
  63. if self._source_traceback:
  64. context['source_traceback'] = self._source_traceback
  65. self._loop.call_exception_handler(context)
  66. self._ov = None
  67. def cancel(self, msg=None):
  68. self._cancel_overlapped()
  69. return super().cancel(msg=msg)
  70. def set_exception(self, exception):
  71. super().set_exception(exception)
  72. self._cancel_overlapped()
  73. def set_result(self, result):
  74. super().set_result(result)
  75. self._ov = None
  76. class _BaseWaitHandleFuture(futures.Future):
  77. """Subclass of Future which represents a wait handle."""
  78. def __init__(self, ov, handle, wait_handle, *, loop=None):
  79. super().__init__(loop=loop)
  80. if self._source_traceback:
  81. del self._source_traceback[-1]
  82. # Keep a reference to the Overlapped object to keep it alive until the
  83. # wait is unregistered
  84. self._ov = ov
  85. self._handle = handle
  86. self._wait_handle = wait_handle
  87. # Should we call UnregisterWaitEx() if the wait completes
  88. # or is cancelled?
  89. self._registered = True
  90. def _poll(self):
  91. # non-blocking wait: use a timeout of 0 millisecond
  92. return (_winapi.WaitForSingleObject(self._handle, 0) ==
  93. _winapi.WAIT_OBJECT_0)
  94. def _repr_info(self):
  95. info = super()._repr_info()
  96. info.append(f'handle={self._handle:#x}')
  97. if self._handle is not None:
  98. state = 'signaled' if self._poll() else 'waiting'
  99. info.append(state)
  100. if self._wait_handle is not None:
  101. info.append(f'wait_handle={self._wait_handle:#x}')
  102. return info
  103. def _unregister_wait_cb(self, fut):
  104. # The wait was unregistered: it's not safe to destroy the Overlapped
  105. # object
  106. self._ov = None
  107. def _unregister_wait(self):
  108. if not self._registered:
  109. return
  110. self._registered = False
  111. wait_handle = self._wait_handle
  112. self._wait_handle = None
  113. try:
  114. _overlapped.UnregisterWait(wait_handle)
  115. except OSError as exc:
  116. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  117. context = {
  118. 'message': 'Failed to unregister the wait handle',
  119. 'exception': exc,
  120. 'future': self,
  121. }
  122. if self._source_traceback:
  123. context['source_traceback'] = self._source_traceback
  124. self._loop.call_exception_handler(context)
  125. return
  126. # ERROR_IO_PENDING means that the unregister is pending
  127. self._unregister_wait_cb(None)
  128. def cancel(self, msg=None):
  129. self._unregister_wait()
  130. return super().cancel(msg=msg)
  131. def set_exception(self, exception):
  132. self._unregister_wait()
  133. super().set_exception(exception)
  134. def set_result(self, result):
  135. self._unregister_wait()
  136. super().set_result(result)
  137. class _WaitCancelFuture(_BaseWaitHandleFuture):
  138. """Subclass of Future which represents a wait for the cancellation of a
  139. _WaitHandleFuture using an event.
  140. """
  141. def __init__(self, ov, event, wait_handle, *, loop=None):
  142. super().__init__(ov, event, wait_handle, loop=loop)
  143. self._done_callback = None
  144. def cancel(self):
  145. raise RuntimeError("_WaitCancelFuture must not be cancelled")
  146. def set_result(self, result):
  147. super().set_result(result)
  148. if self._done_callback is not None:
  149. self._done_callback(self)
  150. def set_exception(self, exception):
  151. super().set_exception(exception)
  152. if self._done_callback is not None:
  153. self._done_callback(self)
  154. class _WaitHandleFuture(_BaseWaitHandleFuture):
  155. def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
  156. super().__init__(ov, handle, wait_handle, loop=loop)
  157. self._proactor = proactor
  158. self._unregister_proactor = True
  159. self._event = _overlapped.CreateEvent(None, True, False, None)
  160. self._event_fut = None
  161. def _unregister_wait_cb(self, fut):
  162. if self._event is not None:
  163. _winapi.CloseHandle(self._event)
  164. self._event = None
  165. self._event_fut = None
  166. # If the wait was cancelled, the wait may never be signalled, so
  167. # it's required to unregister it. Otherwise, IocpProactor.close() will
  168. # wait forever for an event which will never come.
  169. #
  170. # If the IocpProactor already received the event, it's safe to call
  171. # _unregister() because we kept a reference to the Overlapped object
  172. # which is used as a unique key.
  173. self._proactor._unregister(self._ov)
  174. self._proactor = None
  175. super()._unregister_wait_cb(fut)
  176. def _unregister_wait(self):
  177. if not self._registered:
  178. return
  179. self._registered = False
  180. wait_handle = self._wait_handle
  181. self._wait_handle = None
  182. try:
  183. _overlapped.UnregisterWaitEx(wait_handle, self._event)
  184. except OSError as exc:
  185. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  186. context = {
  187. 'message': 'Failed to unregister the wait handle',
  188. 'exception': exc,
  189. 'future': self,
  190. }
  191. if self._source_traceback:
  192. context['source_traceback'] = self._source_traceback
  193. self._loop.call_exception_handler(context)
  194. return
  195. # ERROR_IO_PENDING is not an error, the wait was unregistered
  196. self._event_fut = self._proactor._wait_cancel(self._event,
  197. self._unregister_wait_cb)
  198. class PipeServer(object):
  199. """Class representing a pipe server.
  200. This is much like a bound, listening socket.
  201. """
  202. def __init__(self, address):
  203. self._address = address
  204. self._free_instances = weakref.WeakSet()
  205. # initialize the pipe attribute before calling _server_pipe_handle()
  206. # because this function can raise an exception and the destructor calls
  207. # the close() method
  208. self._pipe = None
  209. self._accept_pipe_future = None
  210. self._pipe = self._server_pipe_handle(True)
  211. def _get_unconnected_pipe(self):
  212. # Create new instance and return previous one. This ensures
  213. # that (until the server is closed) there is always at least
  214. # one pipe handle for address. Therefore if a client attempt
  215. # to connect it will not fail with FileNotFoundError.
  216. tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
  217. return tmp
  218. def _server_pipe_handle(self, first):
  219. # Return a wrapper for a new pipe handle.
  220. if self.closed():
  221. return None
  222. flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
  223. if first:
  224. flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
  225. h = _winapi.CreateNamedPipe(
  226. self._address, flags,
  227. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  228. _winapi.PIPE_WAIT,
  229. _winapi.PIPE_UNLIMITED_INSTANCES,
  230. windows_utils.BUFSIZE, windows_utils.BUFSIZE,
  231. _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
  232. pipe = windows_utils.PipeHandle(h)
  233. self._free_instances.add(pipe)
  234. return pipe
  235. def closed(self):
  236. return (self._address is None)
  237. def close(self):
  238. if self._accept_pipe_future is not None:
  239. self._accept_pipe_future.cancel()
  240. self._accept_pipe_future = None
  241. # Close all instances which have not been connected to by a client.
  242. if self._address is not None:
  243. for pipe in self._free_instances:
  244. pipe.close()
  245. self._pipe = None
  246. self._address = None
  247. self._free_instances.clear()
  248. __del__ = close
  249. class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  250. """Windows version of selector event loop."""
  251. class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
  252. """Windows version of proactor event loop using IOCP."""
  253. def __init__(self, proactor=None):
  254. if proactor is None:
  255. proactor = IocpProactor()
  256. super().__init__(proactor)
  257. def run_forever(self):
  258. try:
  259. assert self._self_reading_future is None
  260. self.call_soon(self._loop_self_reading)
  261. super().run_forever()
  262. finally:
  263. if self._self_reading_future is not None:
  264. ov = self._self_reading_future._ov
  265. self._self_reading_future.cancel()
  266. # self_reading_future always uses IOCP, so even though it's
  267. # been cancelled, we need to make sure that the IOCP message
  268. # is received so that the kernel is not holding on to the
  269. # memory, possibly causing memory corruption later. Only
  270. # unregister it if IO is complete in all respects. Otherwise
  271. # we need another _poll() later to complete the IO.
  272. if ov is not None and not ov.pending:
  273. self._proactor._unregister(ov)
  274. self._self_reading_future = None
  275. async def create_pipe_connection(self, protocol_factory, address):
  276. f = self._proactor.connect_pipe(address)
  277. pipe = await f
  278. protocol = protocol_factory()
  279. trans = self._make_duplex_pipe_transport(pipe, protocol,
  280. extra={'addr': address})
  281. return trans, protocol
  282. async def start_serving_pipe(self, protocol_factory, address):
  283. server = PipeServer(address)
  284. def loop_accept_pipe(f=None):
  285. pipe = None
  286. try:
  287. if f:
  288. pipe = f.result()
  289. server._free_instances.discard(pipe)
  290. if server.closed():
  291. # A client connected before the server was closed:
  292. # drop the client (close the pipe) and exit
  293. pipe.close()
  294. return
  295. protocol = protocol_factory()
  296. self._make_duplex_pipe_transport(
  297. pipe, protocol, extra={'addr': address})
  298. pipe = server._get_unconnected_pipe()
  299. if pipe is None:
  300. return
  301. f = self._proactor.accept_pipe(pipe)
  302. except BrokenPipeError:
  303. if pipe and pipe.fileno() != -1:
  304. pipe.close()
  305. self.call_soon(loop_accept_pipe)
  306. except OSError as exc:
  307. if pipe and pipe.fileno() != -1:
  308. self.call_exception_handler({
  309. 'message': 'Pipe accept failed',
  310. 'exception': exc,
  311. 'pipe': pipe,
  312. })
  313. pipe.close()
  314. elif self._debug:
  315. logger.warning("Accept pipe failed on pipe %r",
  316. pipe, exc_info=True)
  317. self.call_soon(loop_accept_pipe)
  318. except exceptions.CancelledError:
  319. if pipe:
  320. pipe.close()
  321. else:
  322. server._accept_pipe_future = f
  323. f.add_done_callback(loop_accept_pipe)
  324. self.call_soon(loop_accept_pipe)
  325. return [server]
  326. async def _make_subprocess_transport(self, protocol, args, shell,
  327. stdin, stdout, stderr, bufsize,
  328. extra=None, **kwargs):
  329. waiter = self.create_future()
  330. transp = _WindowsSubprocessTransport(self, protocol, args, shell,
  331. stdin, stdout, stderr, bufsize,
  332. waiter=waiter, extra=extra,
  333. **kwargs)
  334. try:
  335. await waiter
  336. except (SystemExit, KeyboardInterrupt):
  337. raise
  338. except BaseException:
  339. transp.close()
  340. await transp._wait()
  341. raise
  342. return transp
  343. class IocpProactor:
  344. """Proactor implementation using IOCP."""
  345. def __init__(self, concurrency=INFINITE):
  346. self._loop = None
  347. self._results = []
  348. self._iocp = _overlapped.CreateIoCompletionPort(
  349. _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
  350. self._cache = {}
  351. self._registered = weakref.WeakSet()
  352. self._unregistered = []
  353. self._stopped_serving = weakref.WeakSet()
  354. def _check_closed(self):
  355. if self._iocp is None:
  356. raise RuntimeError('IocpProactor is closed')
  357. def __repr__(self):
  358. info = ['overlapped#=%s' % len(self._cache),
  359. 'result#=%s' % len(self._results)]
  360. if self._iocp is None:
  361. info.append('closed')
  362. return '<%s %s>' % (self.__class__.__name__, " ".join(info))
  363. def set_loop(self, loop):
  364. self._loop = loop
  365. def select(self, timeout=None):
  366. if not self._results:
  367. self._poll(timeout)
  368. tmp = self._results
  369. self._results = []
  370. try:
  371. return tmp
  372. finally:
  373. # Needed to break cycles when an exception occurs.
  374. tmp = None
  375. def _result(self, value):
  376. fut = self._loop.create_future()
  377. fut.set_result(value)
  378. return fut
  379. @staticmethod
  380. def finish_socket_func(trans, key, ov):
  381. try:
  382. return ov.getresult()
  383. except OSError as exc:
  384. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  385. _overlapped.ERROR_OPERATION_ABORTED):
  386. raise ConnectionResetError(*exc.args)
  387. else:
  388. raise
  389. @classmethod
  390. def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
  391. try:
  392. return cls.finish_socket_func(trans, key, ov)
  393. except OSError as exc:
  394. # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
  395. # socket is used to send to an address that is not listening.
  396. if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
  397. return empty_result, None
  398. else:
  399. raise
  400. def recv(self, conn, nbytes, flags=0):
  401. self._register_with_iocp(conn)
  402. ov = _overlapped.Overlapped(NULL)
  403. try:
  404. if isinstance(conn, socket.socket):
  405. ov.WSARecv(conn.fileno(), nbytes, flags)
  406. else:
  407. ov.ReadFile(conn.fileno(), nbytes)
  408. except BrokenPipeError:
  409. return self._result(b'')
  410. return self._register(ov, conn, self.finish_socket_func)
  411. def recv_into(self, conn, buf, flags=0):
  412. self._register_with_iocp(conn)
  413. ov = _overlapped.Overlapped(NULL)
  414. try:
  415. if isinstance(conn, socket.socket):
  416. ov.WSARecvInto(conn.fileno(), buf, flags)
  417. else:
  418. ov.ReadFileInto(conn.fileno(), buf)
  419. except BrokenPipeError:
  420. return self._result(0)
  421. return self._register(ov, conn, self.finish_socket_func)
  422. def recvfrom(self, conn, nbytes, flags=0):
  423. self._register_with_iocp(conn)
  424. ov = _overlapped.Overlapped(NULL)
  425. try:
  426. ov.WSARecvFrom(conn.fileno(), nbytes, flags)
  427. except BrokenPipeError:
  428. return self._result((b'', None))
  429. return self._register(ov, conn, partial(self._finish_recvfrom,
  430. empty_result=b''))
  431. def recvfrom_into(self, conn, buf, flags=0):
  432. self._register_with_iocp(conn)
  433. ov = _overlapped.Overlapped(NULL)
  434. try:
  435. ov.WSARecvFromInto(conn.fileno(), buf, flags)
  436. except BrokenPipeError:
  437. return self._result((0, None))
  438. return self._register(ov, conn, partial(self._finish_recvfrom,
  439. empty_result=0))
  440. def sendto(self, conn, buf, flags=0, addr=None):
  441. self._register_with_iocp(conn)
  442. ov = _overlapped.Overlapped(NULL)
  443. ov.WSASendTo(conn.fileno(), buf, flags, addr)
  444. return self._register(ov, conn, self.finish_socket_func)
  445. def send(self, conn, buf, flags=0):
  446. self._register_with_iocp(conn)
  447. ov = _overlapped.Overlapped(NULL)
  448. if isinstance(conn, socket.socket):
  449. ov.WSASend(conn.fileno(), buf, flags)
  450. else:
  451. ov.WriteFile(conn.fileno(), buf)
  452. return self._register(ov, conn, self.finish_socket_func)
  453. def accept(self, listener):
  454. self._register_with_iocp(listener)
  455. conn = self._get_accept_socket(listener.family)
  456. ov = _overlapped.Overlapped(NULL)
  457. ov.AcceptEx(listener.fileno(), conn.fileno())
  458. def finish_accept(trans, key, ov):
  459. ov.getresult()
  460. # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
  461. buf = struct.pack('@P', listener.fileno())
  462. conn.setsockopt(socket.SOL_SOCKET,
  463. _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
  464. conn.settimeout(listener.gettimeout())
  465. return conn, conn.getpeername()
  466. async def accept_coro(future, conn):
  467. # Coroutine closing the accept socket if the future is cancelled
  468. try:
  469. await future
  470. except exceptions.CancelledError:
  471. conn.close()
  472. raise
  473. future = self._register(ov, listener, finish_accept)
  474. coro = accept_coro(future, conn)
  475. tasks.ensure_future(coro, loop=self._loop)
  476. return future
  477. def connect(self, conn, address):
  478. if conn.type == socket.SOCK_DGRAM:
  479. # WSAConnect will complete immediately for UDP sockets so we don't
  480. # need to register any IOCP operation
  481. _overlapped.WSAConnect(conn.fileno(), address)
  482. fut = self._loop.create_future()
  483. fut.set_result(None)
  484. return fut
  485. self._register_with_iocp(conn)
  486. # The socket needs to be locally bound before we call ConnectEx().
  487. try:
  488. _overlapped.BindLocal(conn.fileno(), conn.family)
  489. except OSError as e:
  490. if e.winerror != errno.WSAEINVAL:
  491. raise
  492. # Probably already locally bound; check using getsockname().
  493. if conn.getsockname()[1] == 0:
  494. raise
  495. ov = _overlapped.Overlapped(NULL)
  496. ov.ConnectEx(conn.fileno(), address)
  497. def finish_connect(trans, key, ov):
  498. ov.getresult()
  499. # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
  500. conn.setsockopt(socket.SOL_SOCKET,
  501. _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
  502. return conn
  503. return self._register(ov, conn, finish_connect)
  504. def sendfile(self, sock, file, offset, count):
  505. self._register_with_iocp(sock)
  506. ov = _overlapped.Overlapped(NULL)
  507. offset_low = offset & 0xffff_ffff
  508. offset_high = (offset >> 32) & 0xffff_ffff
  509. ov.TransmitFile(sock.fileno(),
  510. msvcrt.get_osfhandle(file.fileno()),
  511. offset_low, offset_high,
  512. count, 0, 0)
  513. return self._register(ov, sock, self.finish_socket_func)
  514. def accept_pipe(self, pipe):
  515. self._register_with_iocp(pipe)
  516. ov = _overlapped.Overlapped(NULL)
  517. connected = ov.ConnectNamedPipe(pipe.fileno())
  518. if connected:
  519. # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
  520. # that the pipe is connected. There is no need to wait for the
  521. # completion of the connection.
  522. return self._result(pipe)
  523. def finish_accept_pipe(trans, key, ov):
  524. ov.getresult()
  525. return pipe
  526. return self._register(ov, pipe, finish_accept_pipe)
  527. async def connect_pipe(self, address):
  528. delay = CONNECT_PIPE_INIT_DELAY
  529. while True:
  530. # Unfortunately there is no way to do an overlapped connect to
  531. # a pipe. Call CreateFile() in a loop until it doesn't fail with
  532. # ERROR_PIPE_BUSY.
  533. try:
  534. handle = _overlapped.ConnectPipe(address)
  535. break
  536. except OSError as exc:
  537. if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
  538. raise
  539. # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
  540. delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
  541. await tasks.sleep(delay)
  542. return windows_utils.PipeHandle(handle)
  543. def wait_for_handle(self, handle, timeout=None):
  544. """Wait for a handle.
  545. Return a Future object. The result of the future is True if the wait
  546. completed, or False if the wait did not complete (on timeout).
  547. """
  548. return self._wait_for_handle(handle, timeout, False)
  549. def _wait_cancel(self, event, done_callback):
  550. fut = self._wait_for_handle(event, None, True)
  551. # add_done_callback() cannot be used because the wait may only complete
  552. # in IocpProactor.close(), while the event loop is not running.
  553. fut._done_callback = done_callback
  554. return fut
  555. def _wait_for_handle(self, handle, timeout, _is_cancel):
  556. self._check_closed()
  557. if timeout is None:
  558. ms = _winapi.INFINITE
  559. else:
  560. # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
  561. # round away from zero to wait *at least* timeout seconds.
  562. ms = math.ceil(timeout * 1e3)
  563. # We only create ov so we can use ov.address as a key for the cache.
  564. ov = _overlapped.Overlapped(NULL)
  565. wait_handle = _overlapped.RegisterWaitWithQueue(
  566. handle, self._iocp, ov.address, ms)
  567. if _is_cancel:
  568. f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
  569. else:
  570. f = _WaitHandleFuture(ov, handle, wait_handle, self,
  571. loop=self._loop)
  572. if f._source_traceback:
  573. del f._source_traceback[-1]
  574. def finish_wait_for_handle(trans, key, ov):
  575. # Note that this second wait means that we should only use
  576. # this with handles types where a successful wait has no
  577. # effect. So events or processes are all right, but locks
  578. # or semaphores are not. Also note if the handle is
  579. # signalled and then quickly reset, then we may return
  580. # False even though we have not timed out.
  581. return f._poll()
  582. self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
  583. return f
  584. def _register_with_iocp(self, obj):
  585. # To get notifications of finished ops on this objects sent to the
  586. # completion port, were must register the handle.
  587. if obj not in self._registered:
  588. self._registered.add(obj)
  589. _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
  590. # XXX We could also use SetFileCompletionNotificationModes()
  591. # to avoid sending notifications to completion port of ops
  592. # that succeed immediately.
  593. def _register(self, ov, obj, callback):
  594. self._check_closed()
  595. # Return a future which will be set with the result of the
  596. # operation when it completes. The future's value is actually
  597. # the value returned by callback().
  598. f = _OverlappedFuture(ov, loop=self._loop)
  599. if f._source_traceback:
  600. del f._source_traceback[-1]
  601. if not ov.pending:
  602. # The operation has completed, so no need to postpone the
  603. # work. We cannot take this short cut if we need the
  604. # NumberOfBytes, CompletionKey values returned by
  605. # PostQueuedCompletionStatus().
  606. try:
  607. value = callback(None, None, ov)
  608. except OSError as e:
  609. f.set_exception(e)
  610. else:
  611. f.set_result(value)
  612. # Even if GetOverlappedResult() was called, we have to wait for the
  613. # notification of the completion in GetQueuedCompletionStatus().
  614. # Register the overlapped operation to keep a reference to the
  615. # OVERLAPPED object, otherwise the memory is freed and Windows may
  616. # read uninitialized memory.
  617. # Register the overlapped operation for later. Note that
  618. # we only store obj to prevent it from being garbage
  619. # collected too early.
  620. self._cache[ov.address] = (f, ov, obj, callback)
  621. return f
  622. def _unregister(self, ov):
  623. """Unregister an overlapped object.
  624. Call this method when its future has been cancelled. The event can
  625. already be signalled (pending in the proactor event queue). It is also
  626. safe if the event is never signalled (because it was cancelled).
  627. """
  628. self._check_closed()
  629. self._unregistered.append(ov)
  630. def _get_accept_socket(self, family):
  631. s = socket.socket(family)
  632. s.settimeout(0)
  633. return s
  634. def _poll(self, timeout=None):
  635. if timeout is None:
  636. ms = INFINITE
  637. elif timeout < 0:
  638. raise ValueError("negative timeout")
  639. else:
  640. # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
  641. # round away from zero to wait *at least* timeout seconds.
  642. ms = math.ceil(timeout * 1e3)
  643. if ms >= INFINITE:
  644. raise ValueError("timeout too big")
  645. while True:
  646. status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
  647. if status is None:
  648. break
  649. ms = 0
  650. err, transferred, key, address = status
  651. try:
  652. f, ov, obj, callback = self._cache.pop(address)
  653. except KeyError:
  654. if self._loop.get_debug():
  655. self._loop.call_exception_handler({
  656. 'message': ('GetQueuedCompletionStatus() returned an '
  657. 'unexpected event'),
  658. 'status': ('err=%s transferred=%s key=%#x address=%#x'
  659. % (err, transferred, key, address)),
  660. })
  661. # key is either zero, or it is used to return a pipe
  662. # handle which should be closed to avoid a leak.
  663. if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
  664. _winapi.CloseHandle(key)
  665. continue
  666. if obj in self._stopped_serving:
  667. f.cancel()
  668. # Don't call the callback if _register() already read the result or
  669. # if the overlapped has been cancelled
  670. elif not f.done():
  671. try:
  672. value = callback(transferred, key, ov)
  673. except OSError as e:
  674. f.set_exception(e)
  675. self._results.append(f)
  676. else:
  677. f.set_result(value)
  678. self._results.append(f)
  679. finally:
  680. f = None
  681. # Remove unregistered futures
  682. for ov in self._unregistered:
  683. self._cache.pop(ov.address, None)
  684. self._unregistered.clear()
  685. def _stop_serving(self, obj):
  686. # obj is a socket or pipe handle. It will be closed in
  687. # BaseProactorEventLoop._stop_serving() which will make any
  688. # pending operations fail quickly.
  689. self._stopped_serving.add(obj)
  690. def close(self):
  691. if self._iocp is None:
  692. # already closed
  693. return
  694. # Cancel remaining registered operations.
  695. for fut, ov, obj, callback in list(self._cache.values()):
  696. if fut.cancelled():
  697. # Nothing to do with cancelled futures
  698. pass
  699. elif isinstance(fut, _WaitCancelFuture):
  700. # _WaitCancelFuture must not be cancelled
  701. pass
  702. else:
  703. try:
  704. fut.cancel()
  705. except OSError as exc:
  706. if self._loop is not None:
  707. context = {
  708. 'message': 'Cancelling a future failed',
  709. 'exception': exc,
  710. 'future': fut,
  711. }
  712. if fut._source_traceback:
  713. context['source_traceback'] = fut._source_traceback
  714. self._loop.call_exception_handler(context)
  715. # Wait until all cancelled overlapped complete: don't exit with running
  716. # overlapped to prevent a crash. Display progress every second if the
  717. # loop is still running.
  718. msg_update = 1.0
  719. start_time = time.monotonic()
  720. next_msg = start_time + msg_update
  721. while self._cache:
  722. if next_msg <= time.monotonic():
  723. logger.debug('%r is running after closing for %.1f seconds',
  724. self, time.monotonic() - start_time)
  725. next_msg = time.monotonic() + msg_update
  726. # handle a few events, or timeout
  727. self._poll(msg_update)
  728. self._results = []
  729. _winapi.CloseHandle(self._iocp)
  730. self._iocp = None
  731. def __del__(self):
  732. self.close()
  733. class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  734. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  735. self._proc = windows_utils.Popen(
  736. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  737. bufsize=bufsize, **kwargs)
  738. def callback(f):
  739. returncode = self._proc.poll()
  740. self._process_exited(returncode)
  741. f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
  742. f.add_done_callback(callback)
  743. SelectorEventLoop = _WindowsSelectorEventLoop
  744. class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  745. _loop_factory = SelectorEventLoop
  746. class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  747. _loop_factory = ProactorEventLoop
  748. DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy