history.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """
  2. Implementations for the history of a `Buffer`.
  3. NOTE: There is no `DynamicHistory`:
  4. This doesn't work well, because the `Buffer` needs to be able to attach
  5. an event handler to the event when a history entry is loaded. This
  6. loading can be done asynchronously and making the history swappable would
  7. probably break this.
  8. """
  9. from __future__ import annotations
  10. import datetime
  11. import os
  12. import threading
  13. from abc import ABCMeta, abstractmethod
  14. from asyncio import get_running_loop
  15. from typing import AsyncGenerator, Iterable, Sequence, Union
  16. __all__ = [
  17. "History",
  18. "ThreadedHistory",
  19. "DummyHistory",
  20. "FileHistory",
  21. "InMemoryHistory",
  22. ]
  23. class History(metaclass=ABCMeta):
  24. """
  25. Base ``History`` class.
  26. This also includes abstract methods for loading/storing history.
  27. """
  28. def __init__(self) -> None:
  29. # In memory storage for strings.
  30. self._loaded = False
  31. # History that's loaded already, in reverse order. Latest, most recent
  32. # item first.
  33. self._loaded_strings: list[str] = []
  34. #
  35. # Methods expected by `Buffer`.
  36. #
  37. async def load(self) -> AsyncGenerator[str, None]:
  38. """
  39. Load the history and yield all the entries in reverse order (latest,
  40. most recent history entry first).
  41. This method can be called multiple times from the `Buffer` to
  42. repopulate the history when prompting for a new input. So we are
  43. responsible here for both caching, and making sure that strings that
  44. were were appended to the history will be incorporated next time this
  45. method is called.
  46. """
  47. if not self._loaded:
  48. self._loaded_strings = list(self.load_history_strings())
  49. self._loaded = True
  50. for item in self._loaded_strings:
  51. yield item
  52. def get_strings(self) -> list[str]:
  53. """
  54. Get the strings from the history that are loaded so far.
  55. (In order. Oldest item first.)
  56. """
  57. return self._loaded_strings[::-1]
  58. def append_string(self, string: str) -> None:
  59. "Add string to the history."
  60. self._loaded_strings.insert(0, string)
  61. self.store_string(string)
  62. #
  63. # Implementation for specific backends.
  64. #
  65. @abstractmethod
  66. def load_history_strings(self) -> Iterable[str]:
  67. """
  68. This should be a generator that yields `str` instances.
  69. It should yield the most recent items first, because they are the most
  70. important. (The history can already be used, even when it's only
  71. partially loaded.)
  72. """
  73. while False:
  74. yield
  75. @abstractmethod
  76. def store_string(self, string: str) -> None:
  77. """
  78. Store the string in persistent storage.
  79. """
  80. class ThreadedHistory(History):
  81. """
  82. Wrapper around `History` implementations that run the `load()` generator in
  83. a thread.
  84. Use this to increase the start-up time of prompt_toolkit applications.
  85. History entries are available as soon as they are loaded. We don't have to
  86. wait for everything to be loaded.
  87. """
  88. def __init__(self, history: History) -> None:
  89. super().__init__()
  90. self.history = history
  91. self._load_thread: threading.Thread | None = None
  92. # Lock for accessing/manipulating `_loaded_strings` and `_loaded`
  93. # together in a consistent state.
  94. self._lock = threading.Lock()
  95. # Events created by each `load()` call. Used to wait for new history
  96. # entries from the loader thread.
  97. self._string_load_events: list[threading.Event] = []
  98. async def load(self) -> AsyncGenerator[str, None]:
  99. """
  100. Like `History.load(), but call `self.load_history_strings()` in a
  101. background thread.
  102. """
  103. # Start the load thread, if this is called for the first time.
  104. if not self._load_thread:
  105. self._load_thread = threading.Thread(
  106. target=self._in_load_thread,
  107. daemon=True,
  108. )
  109. self._load_thread.start()
  110. # Consume the `_loaded_strings` list, using asyncio.
  111. loop = get_running_loop()
  112. # Create threading Event so that we can wait for new items.
  113. event = threading.Event()
  114. event.set()
  115. self._string_load_events.append(event)
  116. items_yielded = 0
  117. try:
  118. while True:
  119. # Wait for new items to be available.
  120. # (Use a timeout, because the executor thread is not a daemon
  121. # thread. The "slow-history.py" example would otherwise hang if
  122. # Control-C is pressed before the history is fully loaded,
  123. # because there's still this non-daemon executor thread waiting
  124. # for this event.)
  125. got_timeout = await loop.run_in_executor(
  126. None, lambda: event.wait(timeout=0.5)
  127. )
  128. if not got_timeout:
  129. continue
  130. # Read new items (in lock).
  131. def in_executor() -> tuple[list[str], bool]:
  132. with self._lock:
  133. new_items = self._loaded_strings[items_yielded:]
  134. done = self._loaded
  135. event.clear()
  136. return new_items, done
  137. new_items, done = await loop.run_in_executor(None, in_executor)
  138. items_yielded += len(new_items)
  139. for item in new_items:
  140. yield item
  141. if done:
  142. break
  143. finally:
  144. self._string_load_events.remove(event)
  145. def _in_load_thread(self) -> None:
  146. try:
  147. # Start with an empty list. In case `append_string()` was called
  148. # before `load()` happened. Then `.store_string()` will have
  149. # written these entries back to disk and we will reload it.
  150. self._loaded_strings = []
  151. for item in self.history.load_history_strings():
  152. with self._lock:
  153. self._loaded_strings.append(item)
  154. for event in self._string_load_events:
  155. event.set()
  156. finally:
  157. with self._lock:
  158. self._loaded = True
  159. for event in self._string_load_events:
  160. event.set()
  161. def append_string(self, string: str) -> None:
  162. with self._lock:
  163. self._loaded_strings.insert(0, string)
  164. self.store_string(string)
  165. # All of the following are proxied to `self.history`.
  166. def load_history_strings(self) -> Iterable[str]:
  167. return self.history.load_history_strings()
  168. def store_string(self, string: str) -> None:
  169. self.history.store_string(string)
  170. def __repr__(self) -> str:
  171. return f"ThreadedHistory({self.history!r})"
  172. class InMemoryHistory(History):
  173. """
  174. :class:`.History` class that keeps a list of all strings in memory.
  175. In order to prepopulate the history, it's possible to call either
  176. `append_string` for all items or pass a list of strings to `__init__` here.
  177. """
  178. def __init__(self, history_strings: Sequence[str] | None = None) -> None:
  179. super().__init__()
  180. # Emulating disk storage.
  181. if history_strings is None:
  182. self._storage = []
  183. else:
  184. self._storage = list(history_strings)
  185. def load_history_strings(self) -> Iterable[str]:
  186. yield from self._storage[::-1]
  187. def store_string(self, string: str) -> None:
  188. self._storage.append(string)
  189. class DummyHistory(History):
  190. """
  191. :class:`.History` object that doesn't remember anything.
  192. """
  193. def load_history_strings(self) -> Iterable[str]:
  194. return []
  195. def store_string(self, string: str) -> None:
  196. pass
  197. def append_string(self, string: str) -> None:
  198. # Don't remember this.
  199. pass
  200. _StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
  201. class FileHistory(History):
  202. """
  203. :class:`.History` class that stores all strings in a file.
  204. """
  205. def __init__(self, filename: _StrOrBytesPath) -> None:
  206. self.filename = filename
  207. super().__init__()
  208. def load_history_strings(self) -> Iterable[str]:
  209. strings: list[str] = []
  210. lines: list[str] = []
  211. def add() -> None:
  212. if lines:
  213. # Join and drop trailing newline.
  214. string = "".join(lines)[:-1]
  215. strings.append(string)
  216. if os.path.exists(self.filename):
  217. with open(self.filename, "rb") as f:
  218. for line_bytes in f:
  219. line = line_bytes.decode("utf-8", errors="replace")
  220. if line.startswith("+"):
  221. lines.append(line[1:])
  222. else:
  223. add()
  224. lines = []
  225. add()
  226. # Reverse the order, because newest items have to go first.
  227. return reversed(strings)
  228. def store_string(self, string: str) -> None:
  229. # Save to file.
  230. with open(self.filename, "ab") as f:
  231. def write(t: str) -> None:
  232. f.write(t.encode("utf-8"))
  233. write(f"\n# {datetime.datetime.now()}\n")
  234. for line in string.split("\n"):
  235. write(f"+{line}\n")