kqreactor.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. # -*- test-case-name: twisted.test.test_kqueuereactor -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. A kqueue()/kevent() based implementation of the Twisted main loop.
  6. To use this reactor, start your application specifying the kqueue reactor::
  7. twistd --reactor kqueue ...
  8. To install the event loop from code (and you should do this before any
  9. connections, listeners or connectors are added)::
  10. from twisted.internet import kqreactor
  11. kqreactor.install()
  12. """
  13. from __future__ import division, absolute_import
  14. import errno
  15. import select
  16. from select import KQ_FILTER_READ, KQ_FILTER_WRITE
  17. from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF
  18. from zope.interface import implementer, declarations, Interface, Attribute
  19. from twisted.internet import main, posixbase
  20. from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize
  21. from twisted.python import log, failure
  22. class _IKQueue(Interface):
  23. """
  24. An interface for KQueue implementations.
  25. """
  26. kqueue = Attribute("An implementation of kqueue(2).")
  27. kevent = Attribute("An implementation of kevent(2).")
  28. declarations.directlyProvides(select, _IKQueue)
  29. @implementer(IReactorFDSet, IReactorDaemonize)
  30. class KQueueReactor(posixbase.PosixReactorBase):
  31. """
  32. A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher
  33. which has built in support for kqueue in the select module.
  34. @ivar _kq: A C{kqueue} which will be used to check for I/O readiness.
  35. @ivar _impl: The implementation of L{_IKQueue} to use.
  36. @ivar _selectables: A dictionary mapping integer file descriptors to
  37. instances of L{FileDescriptor} which have been registered with the
  38. reactor. All L{FileDescriptor}s which are currently receiving read or
  39. write readiness notifications will be present as values in this
  40. dictionary.
  41. @ivar _reads: A set containing integer file descriptors. Values in this
  42. set will be registered with C{_kq} for read readiness notifications
  43. which will be dispatched to the corresponding L{FileDescriptor}
  44. instances in C{_selectables}.
  45. @ivar _writes: A set containing integer file descriptors. Values in this
  46. set will be registered with C{_kq} for write readiness notifications
  47. which will be dispatched to the corresponding L{FileDescriptor}
  48. instances in C{_selectables}.
  49. """
  50. def __init__(self, _kqueueImpl=select):
  51. """
  52. Initialize kqueue object, file descriptor tracking dictionaries, and
  53. the base class.
  54. See:
  55. - http://docs.python.org/library/select.html
  56. - www.freebsd.org/cgi/man.cgi?query=kqueue
  57. - people.freebsd.org/~jlemon/papers/kqueue.pdf
  58. @param _kqueueImpl: The implementation of L{_IKQueue} to use. A
  59. hook for testing.
  60. """
  61. self._impl = _kqueueImpl
  62. self._kq = self._impl.kqueue()
  63. self._reads = set()
  64. self._writes = set()
  65. self._selectables = {}
  66. posixbase.PosixReactorBase.__init__(self)
  67. def _updateRegistration(self, fd, filter, op):
  68. """
  69. Private method for changing kqueue registration on a given FD
  70. filtering for events given filter/op. This will never block and
  71. returns nothing.
  72. """
  73. self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0)
  74. def beforeDaemonize(self):
  75. """
  76. Implement L{IReactorDaemonize.beforeDaemonize}.
  77. """
  78. # Twisted-internal method called during daemonization (when application
  79. # is started via twistd). This is called right before the magic double
  80. # forking done for daemonization. We cleanly close the kqueue() and later
  81. # recreate it. This is needed since a) kqueue() are not inherited across
  82. # forks and b) twistd will create the reactor already before daemonization
  83. # (and will also add at least 1 reader to the reactor, an instance of
  84. # twisted.internet.posixbase._UnixWaker).
  85. #
  86. # See: twisted.scripts._twistd_unix.daemonize()
  87. self._kq.close()
  88. self._kq = None
  89. def afterDaemonize(self):
  90. """
  91. Implement L{IReactorDaemonize.afterDaemonize}.
  92. """
  93. # Twisted-internal method called during daemonization. This is called right
  94. # after daemonization and recreates the kqueue() and any readers/writers
  95. # that were added before. Note that you MUST NOT call any reactor methods
  96. # in between beforeDaemonize() and afterDaemonize()!
  97. self._kq = self._impl.kqueue()
  98. for fd in self._reads:
  99. self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
  100. for fd in self._writes:
  101. self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
  102. def addReader(self, reader):
  103. """
  104. Implement L{IReactorFDSet.addReader}.
  105. """
  106. fd = reader.fileno()
  107. if fd not in self._reads:
  108. try:
  109. self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD)
  110. except OSError:
  111. pass
  112. finally:
  113. self._selectables[fd] = reader
  114. self._reads.add(fd)
  115. def addWriter(self, writer):
  116. """
  117. Implement L{IReactorFDSet.addWriter}.
  118. """
  119. fd = writer.fileno()
  120. if fd not in self._writes:
  121. try:
  122. self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD)
  123. except OSError:
  124. pass
  125. finally:
  126. self._selectables[fd] = writer
  127. self._writes.add(fd)
  128. def removeReader(self, reader):
  129. """
  130. Implement L{IReactorFDSet.removeReader}.
  131. """
  132. wasLost = False
  133. try:
  134. fd = reader.fileno()
  135. except:
  136. fd = -1
  137. if fd == -1:
  138. for fd, fdes in self._selectables.items():
  139. if reader is fdes:
  140. wasLost = True
  141. break
  142. else:
  143. return
  144. if fd in self._reads:
  145. self._reads.remove(fd)
  146. if fd not in self._writes:
  147. del self._selectables[fd]
  148. if not wasLost:
  149. try:
  150. self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE)
  151. except OSError:
  152. pass
  153. def removeWriter(self, writer):
  154. """
  155. Implement L{IReactorFDSet.removeWriter}.
  156. """
  157. wasLost = False
  158. try:
  159. fd = writer.fileno()
  160. except:
  161. fd = -1
  162. if fd == -1:
  163. for fd, fdes in self._selectables.items():
  164. if writer is fdes:
  165. wasLost = True
  166. break
  167. else:
  168. return
  169. if fd in self._writes:
  170. self._writes.remove(fd)
  171. if fd not in self._reads:
  172. del self._selectables[fd]
  173. if not wasLost:
  174. try:
  175. self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
  176. except OSError:
  177. pass
  178. def removeAll(self):
  179. """
  180. Implement L{IReactorFDSet.removeAll}.
  181. """
  182. return self._removeAll(
  183. [self._selectables[fd] for fd in self._reads],
  184. [self._selectables[fd] for fd in self._writes])
  185. def getReaders(self):
  186. """
  187. Implement L{IReactorFDSet.getReaders}.
  188. """
  189. return [self._selectables[fd] for fd in self._reads]
  190. def getWriters(self):
  191. """
  192. Implement L{IReactorFDSet.getWriters}.
  193. """
  194. return [self._selectables[fd] for fd in self._writes]
  195. def doKEvent(self, timeout):
  196. """
  197. Poll the kqueue for new events.
  198. """
  199. if timeout is None:
  200. timeout = 1
  201. try:
  202. events = self._kq.control([], len(self._selectables), timeout)
  203. except OSError as e:
  204. # Since this command blocks for potentially a while, it's possible
  205. # EINTR can be raised for various reasons (for example, if the user
  206. # hits ^C).
  207. if e.errno == errno.EINTR:
  208. return
  209. else:
  210. raise
  211. _drdw = self._doWriteOrRead
  212. for event in events:
  213. fd = event.ident
  214. try:
  215. selectable = self._selectables[fd]
  216. except KeyError:
  217. # Handles the infrequent case where one selectable's
  218. # handler disconnects another.
  219. continue
  220. else:
  221. log.callWithLogger(selectable, _drdw, selectable, fd, event)
  222. def _doWriteOrRead(self, selectable, fd, event):
  223. """
  224. Private method called when a FD is ready for reading, writing or was
  225. lost. Do the work and raise errors where necessary.
  226. """
  227. why = None
  228. inRead = False
  229. (filter, flags, data, fflags) = (
  230. event.filter, event.flags, event.data, event.fflags)
  231. if flags & KQ_EV_EOF and data and fflags:
  232. why = main.CONNECTION_LOST
  233. else:
  234. try:
  235. if selectable.fileno() == -1:
  236. inRead = False
  237. why = posixbase._NO_FILEDESC
  238. else:
  239. if filter == KQ_FILTER_READ:
  240. inRead = True
  241. why = selectable.doRead()
  242. if filter == KQ_FILTER_WRITE:
  243. inRead = False
  244. why = selectable.doWrite()
  245. except:
  246. # Any exception from application code gets logged and will
  247. # cause us to disconnect the selectable.
  248. why = failure.Failure()
  249. log.err(why, "An exception was raised from application code" \
  250. " while processing a reactor selectable")
  251. if why:
  252. self._disconnectSelectable(selectable, why, inRead)
  253. doIteration = doKEvent
  254. def install():
  255. """
  256. Install the kqueue() reactor.
  257. """
  258. p = KQueueReactor()
  259. from twisted.internet.main import installReactor
  260. installReactor(p)
  261. __all__ = ["KQueueReactor", "install"]