asyncioreactor.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # -*- test-case-name: twisted.test.test_internet -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. asyncio-based reactor implementation.
  6. """
  7. from __future__ import absolute_import, division
  8. import errno
  9. from zope.interface import implementer
  10. from twisted.logger import Logger
  11. from twisted.internet.base import DelayedCall
  12. from twisted.internet.posixbase import (PosixReactorBase, _NO_FILEDESC,
  13. _ContinuousPolling)
  14. from twisted.python.log import callWithLogger
  15. from twisted.internet.interfaces import IReactorFDSet
  16. try:
  17. from asyncio import get_event_loop
  18. except ImportError:
  19. raise ImportError("Requires asyncio.")
  20. # As per ImportError above, this module is never imported on python 2, but
  21. # pyflakes still runs on python 2, so let's tell it where the errors come from.
  22. from builtins import PermissionError, BrokenPipeError
  23. class _DCHandle(object):
  24. """
  25. Wraps ephemeral L{asyncio.Handle} instances. Callbacks can close
  26. over this and use it as a mutable reference to asyncio C{Handles}.
  27. @ivar handle: The current L{asyncio.Handle}
  28. """
  29. def __init__(self, handle):
  30. self.handle = handle
  31. def cancel(self):
  32. """
  33. Cancel the inner L{asyncio.Handle}.
  34. """
  35. self.handle.cancel()
  36. @implementer(IReactorFDSet)
  37. class AsyncioSelectorReactor(PosixReactorBase):
  38. """
  39. Reactor running on top of L{asyncio.SelectorEventLoop}.
  40. """
  41. _asyncClosed = False
  42. _log = Logger()
  43. def __init__(self, eventloop=None):
  44. if eventloop is None:
  45. eventloop = get_event_loop()
  46. self._asyncioEventloop = eventloop
  47. self._writers = {}
  48. self._readers = {}
  49. self._delayedCalls = set()
  50. self._continuousPolling = _ContinuousPolling(self)
  51. super().__init__()
  52. def _unregisterFDInAsyncio(self, fd):
  53. """
  54. Compensate for a bug in asyncio where it will not unregister a FD that
  55. it cannot handle in the epoll loop. It touches internal asyncio code.
  56. A description of the bug by markrwilliams:
  57. The C{add_writer} method of asyncio event loops isn't atomic because
  58. all the Selector classes in the selector module internally record a
  59. file object before passing it to the platform's selector
  60. implementation. If the platform's selector decides the file object
  61. isn't acceptable, the resulting exception doesn't cause the Selector to
  62. un-track the file object.
  63. The failing/hanging stdio test goes through the following sequence of
  64. events (roughly):
  65. * The first C{connection.write(intToByte(value))} call hits the asyncio
  66. reactor's C{addWriter} method.
  67. * C{addWriter} calls the asyncio loop's C{add_writer} method, which
  68. happens to live on C{_BaseSelectorEventLoop}.
  69. * The asyncio loop's C{add_writer} method checks if the file object has
  70. been registered before via the selector's C{get_key} method.
  71. * It hasn't, so the KeyError block runs and calls the selector's
  72. register method
  73. * Code examples that follow use EpollSelector, but the code flow holds
  74. true for any other selector implementation. The selector's register
  75. method first calls through to the next register method in the MRO
  76. * That next method is always C{_BaseSelectorImpl.register} which
  77. creates a C{SelectorKey} instance for the file object, stores it under
  78. the file object's file descriptor, and then returns it.
  79. * Control returns to the concrete selector implementation, which asks
  80. the operating system to track the file descriptor using the right API.
  81. * The operating system refuses! An exception is raised that, in this
  82. case, the asyncio reactor handles by creating a C{_ContinuousPolling}
  83. object to watch the file descriptor.
  84. * The second C{connection.write(intToByte(value))} call hits the
  85. asyncio reactor's C{addWriter} method, which hits the C{add_writer}
  86. method. But the loop's selector's get_key method now returns a
  87. C{SelectorKey}! Now the asyncio reactor's C{addWriter} method thinks
  88. the asyncio loop will watch the file descriptor, even though it won't.
  89. """
  90. try:
  91. self._asyncioEventloop._selector.unregister(fd)
  92. except:
  93. pass
  94. def _readOrWrite(self, selectable, read):
  95. method = selectable.doRead if read else selectable.doWrite
  96. if selectable.fileno() == -1:
  97. self._disconnectSelectable(selectable, _NO_FILEDESC, read)
  98. return
  99. try:
  100. why = method()
  101. except Exception as e:
  102. why = e
  103. self._log.failure(None)
  104. if why:
  105. self._disconnectSelectable(selectable, why, read)
  106. def addReader(self, reader):
  107. if reader in self._readers.keys() or \
  108. reader in self._continuousPolling._readers:
  109. return
  110. fd = reader.fileno()
  111. try:
  112. self._asyncioEventloop.add_reader(fd, callWithLogger, reader,
  113. self._readOrWrite, reader,
  114. True)
  115. self._readers[reader] = fd
  116. except IOError as e:
  117. self._unregisterFDInAsyncio(fd)
  118. if e.errno == errno.EPERM:
  119. # epoll(7) doesn't support certain file descriptors,
  120. # e.g. filesystem files, so for those we just poll
  121. # continuously:
  122. self._continuousPolling.addReader(reader)
  123. else:
  124. raise
  125. def addWriter(self, writer):
  126. if writer in self._writers.keys() or \
  127. writer in self._continuousPolling._writers:
  128. return
  129. fd = writer.fileno()
  130. try:
  131. self._asyncioEventloop.add_writer(fd, callWithLogger, writer,
  132. self._readOrWrite, writer,
  133. False)
  134. self._writers[writer] = fd
  135. except PermissionError:
  136. self._unregisterFDInAsyncio(fd)
  137. # epoll(7) doesn't support certain file descriptors,
  138. # e.g. filesystem files, so for those we just poll
  139. # continuously:
  140. self._continuousPolling.addWriter(writer)
  141. except BrokenPipeError:
  142. # The kqueuereactor will raise this if there is a broken pipe
  143. self._unregisterFDInAsyncio(fd)
  144. except:
  145. self._unregisterFDInAsyncio(fd)
  146. raise
  147. def removeReader(self, reader):
  148. # First, see if they're trying to remove a reader that we don't have.
  149. if not (reader in self._readers.keys() \
  150. or self._continuousPolling.isReading(reader)):
  151. # We don't have it, so just return OK.
  152. return
  153. # If it was a cont. polling reader, check there first.
  154. if self._continuousPolling.isReading(reader):
  155. self._continuousPolling.removeReader(reader)
  156. return
  157. fd = reader.fileno()
  158. if fd == -1:
  159. # If the FD is -1, we want to know what its original FD was, to
  160. # remove it.
  161. fd = self._readers.pop(reader)
  162. else:
  163. self._readers.pop(reader)
  164. self._asyncioEventloop.remove_reader(fd)
  165. def removeWriter(self, writer):
  166. # First, see if they're trying to remove a writer that we don't have.
  167. if not (writer in self._writers.keys() \
  168. or self._continuousPolling.isWriting(writer)):
  169. # We don't have it, so just return OK.
  170. return
  171. # If it was a cont. polling writer, check there first.
  172. if self._continuousPolling.isWriting(writer):
  173. self._continuousPolling.removeWriter(writer)
  174. return
  175. fd = writer.fileno()
  176. if fd == -1:
  177. # If the FD is -1, we want to know what its original FD was, to
  178. # remove it.
  179. fd = self._writers.pop(writer)
  180. else:
  181. self._writers.pop(writer)
  182. self._asyncioEventloop.remove_writer(fd)
  183. def removeAll(self):
  184. return (self._removeAll(self._readers.keys(), self._writers.keys()) +
  185. self._continuousPolling.removeAll())
  186. def getReaders(self):
  187. return (list(self._readers.keys()) +
  188. self._continuousPolling.getReaders())
  189. def getWriters(self):
  190. return (list(self._writers.keys()) +
  191. self._continuousPolling.getWriters())
  192. def getDelayedCalls(self):
  193. return list(self._delayedCalls)
  194. def iterate(self, timeout):
  195. self._asyncioEventloop.call_later(timeout + 0.01,
  196. self._asyncioEventloop.stop)
  197. self._asyncioEventloop.run_forever()
  198. def run(self, installSignalHandlers=True):
  199. self.startRunning(installSignalHandlers=installSignalHandlers)
  200. self._asyncioEventloop.run_forever()
  201. if self._justStopped:
  202. self._justStopped = False
  203. def stop(self):
  204. super().stop()
  205. self.callLater(0, self.fireSystemEvent, "shutdown")
  206. def crash(self):
  207. super().crash()
  208. self._asyncioEventloop.stop()
  209. def seconds(self):
  210. return self._asyncioEventloop.time()
  211. def callLater(self, seconds, f, *args, **kwargs):
  212. def run():
  213. dc.called = True
  214. self._delayedCalls.remove(dc)
  215. f(*args, **kwargs)
  216. handle = self._asyncioEventloop.call_later(seconds, run)
  217. dchandle = _DCHandle(handle)
  218. def cancel(dc):
  219. self._delayedCalls.remove(dc)
  220. dchandle.cancel()
  221. def reset(dc):
  222. dchandle.handle = self._asyncioEventloop.call_at(dc.time, run)
  223. dc = DelayedCall(self.seconds() + seconds, run, (), {},
  224. cancel, reset, seconds=self.seconds)
  225. self._delayedCalls.add(dc)
  226. return dc
  227. def callFromThread(self, f, *args, **kwargs):
  228. g = lambda: self.callLater(0, f, *args, **kwargs)
  229. self._asyncioEventloop.call_soon_threadsafe(g)
  230. def install(eventloop=None):
  231. """
  232. Install an asyncio-based reactor.
  233. @param eventloop: The asyncio eventloop to wrap. If default, the global one
  234. is selected.
  235. """
  236. reactor = AsyncioSelectorReactor(eventloop)
  237. from twisted.internet.main import installReactor
  238. installReactor(reactor)