asyncio_posix.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. """
  2. Posix asyncio event loop.
  3. """
  4. from __future__ import unicode_literals
  5. from ..terminal.vt100_input import InputStream
  6. from .asyncio_base import AsyncioTimeout
  7. from .base import EventLoop, INPUT_TIMEOUT
  8. from .callbacks import EventLoopCallbacks
  9. from .posix_utils import PosixStdinReader
  10. import asyncio
  11. import signal
  12. __all__ = (
  13. 'PosixAsyncioEventLoop',
  14. )
  15. class PosixAsyncioEventLoop(EventLoop):
  16. def __init__(self, loop=None):
  17. self.loop = loop or asyncio.get_event_loop()
  18. self.closed = False
  19. self._stopped_f = asyncio.Future(loop=self.loop)
  20. @asyncio.coroutine
  21. def run_as_coroutine(self, stdin, callbacks):
  22. """
  23. The input 'event loop'.
  24. """
  25. assert isinstance(callbacks, EventLoopCallbacks)
  26. # Create reader class.
  27. stdin_reader = PosixStdinReader(stdin.fileno())
  28. if self.closed:
  29. raise Exception('Event loop already closed.')
  30. inputstream = InputStream(callbacks.feed_key)
  31. try:
  32. # Create a new Future every time.
  33. self._stopped_f = asyncio.Future(loop=self.loop)
  34. # Handle input timouts
  35. def timeout_handler():
  36. """
  37. When no input has been received for INPUT_TIMEOUT seconds,
  38. flush the input stream and fire the timeout event.
  39. """
  40. inputstream.flush()
  41. callbacks.input_timeout()
  42. timeout = AsyncioTimeout(INPUT_TIMEOUT, timeout_handler, self.loop)
  43. # Catch sigwinch
  44. def received_winch():
  45. self.call_from_executor(callbacks.terminal_size_changed)
  46. self.loop.add_signal_handler(signal.SIGWINCH, received_winch)
  47. # Read input data.
  48. def stdin_ready():
  49. data = stdin_reader.read()
  50. inputstream.feed(data)
  51. timeout.reset()
  52. # Quit when the input stream was closed.
  53. if stdin_reader.closed:
  54. self.stop()
  55. self.loop.add_reader(stdin.fileno(), stdin_ready)
  56. # Block this coroutine until stop() has been called.
  57. for f in self._stopped_f:
  58. yield f
  59. finally:
  60. # Clean up.
  61. self.loop.remove_reader(stdin.fileno())
  62. self.loop.remove_signal_handler(signal.SIGWINCH)
  63. # Don't trigger any timeout events anymore.
  64. timeout.stop()
  65. def stop(self):
  66. # Trigger the 'Stop' future.
  67. self._stopped_f.set_result(True)
  68. def close(self):
  69. # Note: we should not close the asyncio loop itself, because that one
  70. # was not created here.
  71. self.closed = True
  72. def run_in_executor(self, callback):
  73. self.loop.run_in_executor(None, callback)
  74. def call_from_executor(self, callback, _max_postpone_until=None):
  75. """
  76. Call this function in the main event loop.
  77. Similar to Twisted's ``callFromThread``.
  78. """
  79. self.loop.call_soon_threadsafe(callback)
  80. def add_reader(self, fd, callback):
  81. " Start watching the file descriptor for read availability. "
  82. self.loop.add_reader(fd, callback)
  83. def remove_reader(self, fd):
  84. " Stop watching the file descriptor for read availability. "
  85. self.loop.remove_reader(fd)