epollreactor.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. An epoll() based implementation of the twisted main loop.
  5. To install the event loop (and you should do this before any connections,
  6. listeners or connectors are added)::
  7. from twisted.internet import epollreactor
  8. epollreactor.install()
  9. """
  10. import errno
  11. import select
  12. from zope.interface import implementer
  13. from twisted.internet import posixbase
  14. from twisted.internet.interfaces import IReactorFDSet
  15. from twisted.python import log
  16. try:
  17. # This is to keep mypy from complaining
  18. # We don't use type: ignore[attr-defined] on import, because mypy only complains
  19. # on on some platforms, and then the unused ignore is an issue if the undefined
  20. # attribute isn't.
  21. epoll = getattr(select, "epoll")
  22. EPOLLHUP = getattr(select, "EPOLLHUP")
  23. EPOLLERR = getattr(select, "EPOLLERR")
  24. EPOLLIN = getattr(select, "EPOLLIN")
  25. EPOLLOUT = getattr(select, "EPOLLOUT")
  26. except AttributeError as e:
  27. raise ImportError(e)
  28. @implementer(IReactorFDSet)
  29. class EPollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):
  30. """
  31. A reactor that uses epoll(7).
  32. @ivar _poller: A C{epoll} which will be used to check for I/O
  33. readiness.
  34. @ivar _selectables: A dictionary mapping integer file descriptors to
  35. instances of C{FileDescriptor} which have been registered with the
  36. reactor. All C{FileDescriptors} which are currently receiving read or
  37. write readiness notifications will be present as values in this
  38. dictionary.
  39. @ivar _reads: A set containing integer file descriptors. Values in this
  40. set will be registered with C{_poller} for read readiness notifications
  41. which will be dispatched to the corresponding C{FileDescriptor}
  42. instances in C{_selectables}.
  43. @ivar _writes: A set containing integer file descriptors. Values in this
  44. set will be registered with C{_poller} for write readiness
  45. notifications which will be dispatched to the corresponding
  46. C{FileDescriptor} instances in C{_selectables}.
  47. @ivar _continuousPolling: A L{_ContinuousPolling} instance, used to handle
  48. file descriptors (e.g. filesystem files) that are not supported by
  49. C{epoll(7)}.
  50. """
  51. # Attributes for _PollLikeMixin
  52. _POLL_DISCONNECTED = EPOLLHUP | EPOLLERR
  53. _POLL_IN = EPOLLIN
  54. _POLL_OUT = EPOLLOUT
  55. def __init__(self):
  56. """
  57. Initialize epoll object, file descriptor tracking dictionaries, and the
  58. base class.
  59. """
  60. # Create the poller we're going to use. The 1024 here is just a hint
  61. # to the kernel, it is not a hard maximum. After Linux 2.6.8, the size
  62. # argument is completely ignored.
  63. self._poller = epoll(1024)
  64. self._reads = set()
  65. self._writes = set()
  66. self._selectables = {}
  67. self._continuousPolling = posixbase._ContinuousPolling(self)
  68. posixbase.PosixReactorBase.__init__(self)
  69. def _add(self, xer, primary, other, selectables, event, antievent):
  70. """
  71. Private method for adding a descriptor from the event loop.
  72. It takes care of adding it if new or modifying it if already added
  73. for another state (read -> read/write for example).
  74. """
  75. fd = xer.fileno()
  76. if fd not in primary:
  77. flags = event
  78. # epoll_ctl can raise all kinds of IOErrors, and every one
  79. # indicates a bug either in the reactor or application-code.
  80. # Let them all through so someone sees a traceback and fixes
  81. # something. We'll do the same thing for every other call to
  82. # this method in this file.
  83. if fd in other:
  84. flags |= antievent
  85. self._poller.modify(fd, flags)
  86. else:
  87. self._poller.register(fd, flags)
  88. # Update our own tracking state *only* after the epoll call has
  89. # succeeded. Otherwise we may get out of sync.
  90. primary.add(fd)
  91. selectables[fd] = xer
  92. def addReader(self, reader):
  93. """
  94. Add a FileDescriptor for notification of data available to read.
  95. """
  96. try:
  97. self._add(
  98. reader, self._reads, self._writes, self._selectables, EPOLLIN, EPOLLOUT
  99. )
  100. except OSError as e:
  101. if e.errno == errno.EPERM:
  102. # epoll(7) doesn't support certain file descriptors,
  103. # e.g. filesystem files, so for those we just poll
  104. # continuously:
  105. self._continuousPolling.addReader(reader)
  106. else:
  107. raise
  108. def addWriter(self, writer):
  109. """
  110. Add a FileDescriptor for notification of data available to write.
  111. """
  112. try:
  113. self._add(
  114. writer, self._writes, self._reads, self._selectables, EPOLLOUT, EPOLLIN
  115. )
  116. except OSError as e:
  117. if e.errno == errno.EPERM:
  118. # epoll(7) doesn't support certain file descriptors,
  119. # e.g. filesystem files, so for those we just poll
  120. # continuously:
  121. self._continuousPolling.addWriter(writer)
  122. else:
  123. raise
  124. def _remove(self, xer, primary, other, selectables, event, antievent):
  125. """
  126. Private method for removing a descriptor from the event loop.
  127. It does the inverse job of _add, and also add a check in case of the fd
  128. has gone away.
  129. """
  130. fd = xer.fileno()
  131. if fd == -1:
  132. for fd, fdes in selectables.items():
  133. if xer is fdes:
  134. break
  135. else:
  136. return
  137. if fd in primary:
  138. if fd in other:
  139. flags = antievent
  140. # See comment above modify call in _add.
  141. self._poller.modify(fd, flags)
  142. else:
  143. del selectables[fd]
  144. # See comment above _control call in _add.
  145. self._poller.unregister(fd)
  146. primary.remove(fd)
  147. def removeReader(self, reader):
  148. """
  149. Remove a Selectable for notification of data available to read.
  150. """
  151. if self._continuousPolling.isReading(reader):
  152. self._continuousPolling.removeReader(reader)
  153. return
  154. self._remove(
  155. reader, self._reads, self._writes, self._selectables, EPOLLIN, EPOLLOUT
  156. )
  157. def removeWriter(self, writer):
  158. """
  159. Remove a Selectable for notification of data available to write.
  160. """
  161. if self._continuousPolling.isWriting(writer):
  162. self._continuousPolling.removeWriter(writer)
  163. return
  164. self._remove(
  165. writer, self._writes, self._reads, self._selectables, EPOLLOUT, EPOLLIN
  166. )
  167. def removeAll(self):
  168. """
  169. Remove all selectables, and return a list of them.
  170. """
  171. return (
  172. self._removeAll(
  173. [self._selectables[fd] for fd in self._reads],
  174. [self._selectables[fd] for fd in self._writes],
  175. )
  176. + self._continuousPolling.removeAll()
  177. )
  178. def getReaders(self):
  179. return [
  180. self._selectables[fd] for fd in self._reads
  181. ] + self._continuousPolling.getReaders()
  182. def getWriters(self):
  183. return [
  184. self._selectables[fd] for fd in self._writes
  185. ] + self._continuousPolling.getWriters()
  186. def doPoll(self, timeout):
  187. """
  188. Poll the poller for new events.
  189. """
  190. if timeout is None:
  191. timeout = -1 # Wait indefinitely.
  192. try:
  193. # Limit the number of events to the number of io objects we're
  194. # currently tracking (because that's maybe a good heuristic) and
  195. # the amount of time we block to the value specified by our
  196. # caller.
  197. l = self._poller.poll(timeout, len(self._selectables))
  198. except OSError as err:
  199. if err.errno == errno.EINTR:
  200. return
  201. # See epoll_wait(2) for documentation on the other conditions
  202. # under which this can fail. They can only be due to a serious
  203. # programming error on our part, so let's just announce them
  204. # loudly.
  205. raise
  206. _drdw = self._doReadOrWrite
  207. for fd, event in l:
  208. try:
  209. selectable = self._selectables[fd]
  210. except KeyError:
  211. pass
  212. else:
  213. log.callWithLogger(selectable, _drdw, selectable, fd, event)
  214. doIteration = doPoll
  215. def install():
  216. """
  217. Install the epoll() reactor.
  218. """
  219. p = EPollReactor()
  220. from twisted.internet.main import installReactor
  221. installReactor(p)
  222. __all__ = ["EPollReactor", "install"]