async_generator.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. """
  2. Implementation for async generators.
  3. """
  4. from __future__ import annotations
  5. from asyncio import get_running_loop
  6. from contextlib import asynccontextmanager
  7. from queue import Empty, Full, Queue
  8. from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar
  9. from .utils import run_in_executor_with_context
  10. __all__ = [
  11. "aclosing",
  12. "generator_to_async_generator",
  13. ]
  14. _T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None])
  15. @asynccontextmanager
  16. async def aclosing(
  17. thing: _T_Generator,
  18. ) -> AsyncGenerator[_T_Generator, None]:
  19. "Similar to `contextlib.aclosing`, in Python 3.10."
  20. try:
  21. yield thing
  22. finally:
  23. await thing.aclose()
  24. # By default, choose a buffer size that's a good balance between having enough
  25. # throughput, but not consuming too much memory. We use this to consume a sync
  26. # generator of completions as an async generator. If the queue size is very
  27. # small (like 1), consuming the completions goes really slow (when there are a
  28. # lot of items). If the queue size would be unlimited or too big, this can
  29. # cause overconsumption of memory, and cause CPU time spent producing items
  30. # that are no longer needed (if the consumption of the async generator stops at
  31. # some point). We need a fixed size in order to get some back pressure from the
  32. # async consumer to the sync producer. We choose 1000 by default here. If we
  33. # have around 50k completions, measurements show that 1000 is still
  34. # significantly faster than a buffer of 100.
  35. DEFAULT_BUFFER_SIZE: int = 1000
  36. _T = TypeVar("_T")
  37. class _Done:
  38. pass
  39. async def generator_to_async_generator(
  40. get_iterable: Callable[[], Iterable[_T]],
  41. buffer_size: int = DEFAULT_BUFFER_SIZE,
  42. ) -> AsyncGenerator[_T, None]:
  43. """
  44. Turn a generator or iterable into an async generator.
  45. This works by running the generator in a background thread.
  46. :param get_iterable: Function that returns a generator or iterable when
  47. called.
  48. :param buffer_size: Size of the queue between the async consumer and the
  49. synchronous generator that produces items.
  50. """
  51. quitting = False
  52. # NOTE: We are limiting the queue size in order to have back-pressure.
  53. q: Queue[_T | _Done] = Queue(maxsize=buffer_size)
  54. loop = get_running_loop()
  55. def runner() -> None:
  56. """
  57. Consume the generator in background thread.
  58. When items are received, they'll be pushed to the queue.
  59. """
  60. try:
  61. for item in get_iterable():
  62. # When this async generator was cancelled (closed), stop this
  63. # thread.
  64. if quitting:
  65. return
  66. while True:
  67. try:
  68. q.put(item, timeout=1)
  69. except Full:
  70. if quitting:
  71. return
  72. continue
  73. else:
  74. break
  75. finally:
  76. while True:
  77. try:
  78. q.put(_Done(), timeout=1)
  79. except Full:
  80. if quitting:
  81. return
  82. continue
  83. else:
  84. break
  85. # Start background thread.
  86. runner_f = run_in_executor_with_context(runner)
  87. try:
  88. while True:
  89. try:
  90. item = q.get_nowait()
  91. except Empty:
  92. item = await loop.run_in_executor(None, q.get)
  93. if isinstance(item, _Done):
  94. break
  95. else:
  96. yield item
  97. finally:
  98. # When this async generator is closed (GeneratorExit exception, stop
  99. # the background thread as well. - we don't need that anymore.)
  100. quitting = True
  101. # Wait for the background thread to finish. (should happen right after
  102. # the last item is yielded).
  103. await runner_f