asyncioreactor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- test-case-name: twisted.test.test_internet -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. asyncio-based reactor implementation.
  6. """
  7. import errno
  8. import sys
  9. from asyncio import AbstractEventLoop, get_event_loop
  10. from typing import Dict, Optional, Type
  11. from zope.interface import implementer
  12. from twisted.internet.abstract import FileDescriptor
  13. from twisted.internet.interfaces import IReactorFDSet
  14. from twisted.internet.posixbase import (
  15. _NO_FILEDESC,
  16. PosixReactorBase,
  17. _ContinuousPolling,
  18. )
  19. from twisted.logger import Logger
  20. from twisted.python.log import callWithLogger
  21. @implementer(IReactorFDSet)
  22. class AsyncioSelectorReactor(PosixReactorBase):
  23. """
  24. Reactor running on top of L{asyncio.SelectorEventLoop}.
  25. On POSIX platforms, the default event loop is
  26. L{asyncio.SelectorEventLoop}.
  27. On Windows, the default event loop on Python 3.7 and older
  28. is C{asyncio.WindowsSelectorEventLoop}, but on Python 3.8 and newer
  29. the default event loop is C{asyncio.WindowsProactorEventLoop} which
  30. is incompatible with L{AsyncioSelectorReactor}.
  31. Applications that use L{AsyncioSelectorReactor} on Windows
  32. with Python 3.8+ must call
  33. C{asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())}
  34. before instantiating and running L{AsyncioSelectorReactor}.
  35. """
  36. _asyncClosed = False
  37. _log = Logger()
  38. def __init__(self, eventloop: Optional[AbstractEventLoop] = None):
  39. if eventloop is None:
  40. _eventloop: AbstractEventLoop = get_event_loop()
  41. else:
  42. _eventloop = eventloop
  43. # On Python 3.8+, asyncio.get_event_loop() on
  44. # Windows was changed to return a ProactorEventLoop
  45. # unless the loop policy has been changed.
  46. if sys.platform == "win32":
  47. from asyncio import ProactorEventLoop
  48. if isinstance(_eventloop, ProactorEventLoop):
  49. raise TypeError(
  50. f"ProactorEventLoop is not supported, got: {_eventloop}"
  51. )
  52. self._asyncioEventloop: AbstractEventLoop = _eventloop
  53. self._writers: Dict[Type[FileDescriptor], int] = {}
  54. self._readers: Dict[Type[FileDescriptor], int] = {}
  55. self._continuousPolling = _ContinuousPolling(self)
  56. self._scheduledAt = None
  57. self._timerHandle = None
  58. super().__init__()
  59. def _unregisterFDInAsyncio(self, fd):
  60. """
  61. Compensate for a bug in asyncio where it will not unregister a FD that
  62. it cannot handle in the epoll loop. It touches internal asyncio code.
  63. A description of the bug by markrwilliams:
  64. The C{add_writer} method of asyncio event loops isn't atomic because
  65. all the Selector classes in the selector module internally record a
  66. file object before passing it to the platform's selector
  67. implementation. If the platform's selector decides the file object
  68. isn't acceptable, the resulting exception doesn't cause the Selector to
  69. un-track the file object.
  70. The failing/hanging stdio test goes through the following sequence of
  71. events (roughly):
  72. * The first C{connection.write(intToByte(value))} call hits the asyncio
  73. reactor's C{addWriter} method.
  74. * C{addWriter} calls the asyncio loop's C{add_writer} method, which
  75. happens to live on C{_BaseSelectorEventLoop}.
  76. * The asyncio loop's C{add_writer} method checks if the file object has
  77. been registered before via the selector's C{get_key} method.
  78. * It hasn't, so the KeyError block runs and calls the selector's
  79. register method
  80. * Code examples that follow use EpollSelector, but the code flow holds
  81. true for any other selector implementation. The selector's register
  82. method first calls through to the next register method in the MRO
  83. * That next method is always C{_BaseSelectorImpl.register} which
  84. creates a C{SelectorKey} instance for the file object, stores it under
  85. the file object's file descriptor, and then returns it.
  86. * Control returns to the concrete selector implementation, which asks
  87. the operating system to track the file descriptor using the right API.
  88. * The operating system refuses! An exception is raised that, in this
  89. case, the asyncio reactor handles by creating a C{_ContinuousPolling}
  90. object to watch the file descriptor.
  91. * The second C{connection.write(intToByte(value))} call hits the
  92. asyncio reactor's C{addWriter} method, which hits the C{add_writer}
  93. method. But the loop's selector's get_key method now returns a
  94. C{SelectorKey}! Now the asyncio reactor's C{addWriter} method thinks
  95. the asyncio loop will watch the file descriptor, even though it won't.
  96. """
  97. try:
  98. self._asyncioEventloop._selector.unregister(fd)
  99. except BaseException:
  100. pass
  101. def _readOrWrite(self, selectable, read):
  102. method = selectable.doRead if read else selectable.doWrite
  103. if selectable.fileno() == -1:
  104. self._disconnectSelectable(selectable, _NO_FILEDESC, read)
  105. return
  106. try:
  107. why = method()
  108. except Exception as e:
  109. why = e
  110. self._log.failure(None)
  111. if why:
  112. self._disconnectSelectable(selectable, why, read)
  113. def addReader(self, reader):
  114. if reader in self._readers.keys() or reader in self._continuousPolling._readers:
  115. return
  116. fd = reader.fileno()
  117. try:
  118. self._asyncioEventloop.add_reader(
  119. fd, callWithLogger, reader, self._readOrWrite, reader, True
  120. )
  121. self._readers[reader] = fd
  122. except OSError as e:
  123. self._unregisterFDInAsyncio(fd)
  124. if e.errno == errno.EPERM:
  125. # epoll(7) doesn't support certain file descriptors,
  126. # e.g. filesystem files, so for those we just poll
  127. # continuously:
  128. self._continuousPolling.addReader(reader)
  129. else:
  130. raise
  131. def addWriter(self, writer):
  132. if writer in self._writers.keys() or writer in self._continuousPolling._writers:
  133. return
  134. fd = writer.fileno()
  135. try:
  136. self._asyncioEventloop.add_writer(
  137. fd, callWithLogger, writer, self._readOrWrite, writer, False
  138. )
  139. self._writers[writer] = fd
  140. except PermissionError:
  141. self._unregisterFDInAsyncio(fd)
  142. # epoll(7) doesn't support certain file descriptors,
  143. # e.g. filesystem files, so for those we just poll
  144. # continuously:
  145. self._continuousPolling.addWriter(writer)
  146. except BrokenPipeError:
  147. # The kqueuereactor will raise this if there is a broken pipe
  148. self._unregisterFDInAsyncio(fd)
  149. except BaseException:
  150. self._unregisterFDInAsyncio(fd)
  151. raise
  152. def removeReader(self, reader):
  153. # First, see if they're trying to remove a reader that we don't have.
  154. if not (
  155. reader in self._readers.keys() or self._continuousPolling.isReading(reader)
  156. ):
  157. # We don't have it, so just return OK.
  158. return
  159. # If it was a cont. polling reader, check there first.
  160. if self._continuousPolling.isReading(reader):
  161. self._continuousPolling.removeReader(reader)
  162. return
  163. fd = reader.fileno()
  164. if fd == -1:
  165. # If the FD is -1, we want to know what its original FD was, to
  166. # remove it.
  167. fd = self._readers.pop(reader)
  168. else:
  169. self._readers.pop(reader)
  170. self._asyncioEventloop.remove_reader(fd)
  171. def removeWriter(self, writer):
  172. # First, see if they're trying to remove a writer that we don't have.
  173. if not (
  174. writer in self._writers.keys() or self._continuousPolling.isWriting(writer)
  175. ):
  176. # We don't have it, so just return OK.
  177. return
  178. # If it was a cont. polling writer, check there first.
  179. if self._continuousPolling.isWriting(writer):
  180. self._continuousPolling.removeWriter(writer)
  181. return
  182. fd = writer.fileno()
  183. if fd == -1:
  184. # If the FD is -1, we want to know what its original FD was, to
  185. # remove it.
  186. fd = self._writers.pop(writer)
  187. else:
  188. self._writers.pop(writer)
  189. self._asyncioEventloop.remove_writer(fd)
  190. def removeAll(self):
  191. return (
  192. self._removeAll(self._readers.keys(), self._writers.keys())
  193. + self._continuousPolling.removeAll()
  194. )
  195. def getReaders(self):
  196. return list(self._readers.keys()) + self._continuousPolling.getReaders()
  197. def getWriters(self):
  198. return list(self._writers.keys()) + self._continuousPolling.getWriters()
  199. def iterate(self, timeout):
  200. self._asyncioEventloop.call_later(timeout + 0.01, self._asyncioEventloop.stop)
  201. self._asyncioEventloop.run_forever()
  202. def run(self, installSignalHandlers=True):
  203. self.startRunning(installSignalHandlers=installSignalHandlers)
  204. self._asyncioEventloop.run_forever()
  205. if self._justStopped:
  206. self._justStopped = False
  207. def stop(self):
  208. super().stop()
  209. # This will cause runUntilCurrent which in its turn
  210. # will call fireSystemEvent("shutdown")
  211. self.callLater(0, lambda: None)
  212. def crash(self):
  213. super().crash()
  214. self._asyncioEventloop.stop()
  215. def _onTimer(self):
  216. self._scheduledAt = None
  217. self.runUntilCurrent()
  218. self._reschedule()
  219. def _reschedule(self):
  220. timeout = self.timeout()
  221. if timeout is not None:
  222. abs_time = self._asyncioEventloop.time() + timeout
  223. self._scheduledAt = abs_time
  224. if self._timerHandle is not None:
  225. self._timerHandle.cancel()
  226. self._timerHandle = self._asyncioEventloop.call_at(abs_time, self._onTimer)
  227. def _moveCallLaterSooner(self, tple):
  228. PosixReactorBase._moveCallLaterSooner(self, tple)
  229. self._reschedule()
  230. def callLater(self, seconds, f, *args, **kwargs):
  231. dc = PosixReactorBase.callLater(self, seconds, f, *args, **kwargs)
  232. abs_time = self._asyncioEventloop.time() + self.timeout()
  233. if self._scheduledAt is None or abs_time < self._scheduledAt:
  234. self._reschedule()
  235. return dc
  236. def callFromThread(self, f, *args, **kwargs):
  237. g = lambda: self.callLater(0, f, *args, **kwargs)
  238. self._asyncioEventloop.call_soon_threadsafe(g)
  239. def install(eventloop=None):
  240. """
  241. Install an asyncio-based reactor.
  242. @param eventloop: The asyncio eventloop to wrap. If default, the global one
  243. is selected.
  244. """
  245. reactor = AsyncioSelectorReactor(eventloop)
  246. from twisted.internet.main import installReactor
  247. installReactor(reactor)