auto_suggest.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. import re
  2. import asyncio
  3. import tokenize
  4. from io import StringIO
  5. from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
  6. import warnings
  7. import prompt_toolkit
  8. from prompt_toolkit.buffer import Buffer
  9. from prompt_toolkit.key_binding import KeyPressEvent
  10. from prompt_toolkit.key_binding.bindings import named_commands as nc
  11. from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
  12. from prompt_toolkit.document import Document
  13. from prompt_toolkit.history import History
  14. from prompt_toolkit.shortcuts import PromptSession
  15. from prompt_toolkit.layout.processors import (
  16. Processor,
  17. Transformation,
  18. TransformationInput,
  19. )
  20. from IPython.core.getipython import get_ipython
  21. from IPython.utils.tokenutil import generate_tokens
  22. from .filters import pass_through
  23. try:
  24. import jupyter_ai_magics
  25. import jupyter_ai.completions.models as jai_models
  26. except ModuleNotFoundError:
  27. jai_models = None
  28. def _get_query(document: Document):
  29. return document.lines[document.cursor_position_row]
  30. class AppendAutoSuggestionInAnyLine(Processor):
  31. """
  32. Append the auto suggestion to lines other than the last (appending to the
  33. last line is natively supported by the prompt toolkit).
  34. This has a private `_debug` attribute that can be set to True to display
  35. debug information as virtual suggestion on the end of any line. You can do
  36. so with:
  37. >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
  38. >>> AppendAutoSuggestionInAnyLine._debug = True
  39. """
  40. _debug: ClassVar[bool] = False
  41. def __init__(self, style: str = "class:auto-suggestion") -> None:
  42. self.style = style
  43. def apply_transformation(self, ti: TransformationInput) -> Transformation:
  44. """
  45. Apply transformation to the line that is currently being edited.
  46. This is a variation of the original implementation in prompt toolkit
  47. that allows to not only append suggestions to any line, but also to show
  48. multi-line suggestions.
  49. As transformation are applied on a line-by-line basis; we need to trick
  50. a bit, and elide any line that is after the line we are currently
  51. editing, until we run out of completions. We cannot shift the existing
  52. lines
  53. There are multiple cases to handle:
  54. The completions ends before the end of the buffer:
  55. We can resume showing the normal line, and say that some code may
  56. be hidden.
  57. The completions ends at the end of the buffer
  58. We can just say that some code may be hidden.
  59. And separately:
  60. The completions ends beyond the end of the buffer
  61. We need to both say that some code may be hidden, and that some
  62. lines are not shown.
  63. """
  64. last_line_number = ti.document.line_count - 1
  65. is_last_line = ti.lineno == last_line_number
  66. noop = lambda text: Transformation(
  67. fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
  68. )
  69. if ti.document.line_count == 1:
  70. return noop("noop:oneline")
  71. if ti.document.cursor_position_row == last_line_number and is_last_line:
  72. # prompt toolkit already appends something; just leave it be
  73. return noop("noop:last line and cursor")
  74. # first everything before the current line is unchanged.
  75. if ti.lineno < ti.document.cursor_position_row:
  76. return noop("noop:before cursor")
  77. buffer = ti.buffer_control.buffer
  78. if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
  79. return noop("noop:not eol")
  80. delta = ti.lineno - ti.document.cursor_position_row
  81. suggestions = buffer.suggestion.text.splitlines()
  82. if len(suggestions) == 0:
  83. return noop("noop: no suggestions")
  84. suggestions_longer_than_buffer: bool = (
  85. len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
  86. )
  87. if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
  88. if ti.lineno == ti.document.cursor_position_row:
  89. return Transformation(
  90. fragments=ti.fragments
  91. + [
  92. (
  93. "red",
  94. "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
  95. )
  96. ]
  97. )
  98. else:
  99. return Transformation(fragments=ti.fragments)
  100. if delta == 0:
  101. suggestion = suggestions[0]
  102. return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
  103. if is_last_line:
  104. if delta < len(suggestions):
  105. extra = f"; {len(suggestions) - delta} line(s) hidden"
  106. suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
  107. return Transformation([(self.style, suggestion)])
  108. n_elided = len(suggestions)
  109. for i in range(len(suggestions)):
  110. ll = ti.get_line(last_line_number - i)
  111. el = "".join(l[1] for l in ll).strip()
  112. if el:
  113. break
  114. else:
  115. n_elided -= 1
  116. if n_elided:
  117. return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
  118. else:
  119. return Transformation(
  120. ti.get_line(last_line_number - len(suggestions) + 1)
  121. + ([(self.style, "shift-last-line")] if self._debug else [])
  122. )
  123. elif delta < len(suggestions):
  124. suggestion = suggestions[delta]
  125. return Transformation([(self.style, suggestion)])
  126. else:
  127. shift = ti.lineno - len(suggestions) + 1
  128. return Transformation(ti.get_line(shift))
  129. class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
  130. """
  131. A subclass of AutoSuggestFromHistory that allow navigation to next/previous
  132. suggestion from history. To do so it remembers the current position, but it
  133. state need to carefully be cleared on the right events.
  134. """
  135. skip_lines: int
  136. _connected_apps: list[PromptSession]
  137. # handle to the currently running llm task that appends suggestions to the
  138. # current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
  139. # another request.
  140. _llm_task: asyncio.Task | None = None
  141. # This is the instance of the LLM provider from jupyter-ai to which we forward the request
  142. # to generate inline completions.
  143. _llm_provider: Any | None
  144. def __init__(self):
  145. super().__init__()
  146. self.skip_lines = 0
  147. self._connected_apps = []
  148. self._llm_provider = None
  149. def reset_history_position(self, _: Buffer):
  150. self.skip_lines = 0
  151. def disconnect(self) -> None:
  152. self._cancel_running_llm_task()
  153. for pt_app in self._connected_apps:
  154. text_insert_event = pt_app.default_buffer.on_text_insert
  155. text_insert_event.remove_handler(self.reset_history_position)
  156. def connect(self, pt_app: PromptSession):
  157. self._connected_apps.append(pt_app)
  158. # note: `on_text_changed` could be used for a bit different behaviour
  159. # on character deletion (i.e. resetting history position on backspace)
  160. pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
  161. pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
  162. def get_suggestion(
  163. self, buffer: Buffer, document: Document
  164. ) -> Optional[Suggestion]:
  165. text = _get_query(document)
  166. if text.strip():
  167. for suggestion, _ in self._find_next_match(
  168. text, self.skip_lines, buffer.history
  169. ):
  170. return Suggestion(suggestion)
  171. return None
  172. def _dismiss(self, buffer, *args, **kwargs) -> None:
  173. self._cancel_running_llm_task()
  174. buffer.suggestion = None
  175. def _find_match(
  176. self, text: str, skip_lines: float, history: History, previous: bool
  177. ) -> Generator[Tuple[str, float], None, None]:
  178. """
  179. text : str
  180. Text content to find a match for, the user cursor is most of the
  181. time at the end of this text.
  182. skip_lines : float
  183. number of items to skip in the search, this is used to indicate how
  184. far in the list the user has navigated by pressing up or down.
  185. The float type is used as the base value is +inf
  186. history : History
  187. prompt_toolkit History instance to fetch previous entries from.
  188. previous : bool
  189. Direction of the search, whether we are looking previous match
  190. (True), or next match (False).
  191. Yields
  192. ------
  193. Tuple with:
  194. str:
  195. current suggestion.
  196. float:
  197. will actually yield only ints, which is passed back via skip_lines,
  198. which may be a +inf (float)
  199. """
  200. line_number = -1
  201. for string in reversed(list(history.get_strings())):
  202. for line in reversed(string.splitlines()):
  203. line_number += 1
  204. if not previous and line_number < skip_lines:
  205. continue
  206. # do not return empty suggestions as these
  207. # close the auto-suggestion overlay (and are useless)
  208. if line.startswith(text) and len(line) > len(text):
  209. yield line[len(text) :], line_number
  210. if previous and line_number >= skip_lines:
  211. return
  212. def _find_next_match(
  213. self, text: str, skip_lines: float, history: History
  214. ) -> Generator[Tuple[str, float], None, None]:
  215. return self._find_match(text, skip_lines, history, previous=False)
  216. def _find_previous_match(self, text: str, skip_lines: float, history: History):
  217. return reversed(
  218. list(self._find_match(text, skip_lines, history, previous=True))
  219. )
  220. def up(self, query: str, other_than: str, history: History) -> None:
  221. self._cancel_running_llm_task()
  222. for suggestion, line_number in self._find_next_match(
  223. query, self.skip_lines, history
  224. ):
  225. # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
  226. # we want to switch from 'very.b' to 'very.a' because a) if the
  227. # suggestion equals current text, prompt-toolkit aborts suggesting
  228. # b) user likely would not be interested in 'very' anyways (they
  229. # already typed it).
  230. if query + suggestion != other_than:
  231. self.skip_lines = line_number
  232. break
  233. else:
  234. # no matches found, cycle back to beginning
  235. self.skip_lines = 0
  236. def down(self, query: str, other_than: str, history: History) -> None:
  237. self._cancel_running_llm_task()
  238. for suggestion, line_number in self._find_previous_match(
  239. query, self.skip_lines, history
  240. ):
  241. if query + suggestion != other_than:
  242. self.skip_lines = line_number
  243. break
  244. else:
  245. # no matches found, cycle to end
  246. for suggestion, line_number in self._find_previous_match(
  247. query, float("Inf"), history
  248. ):
  249. if query + suggestion != other_than:
  250. self.skip_lines = line_number
  251. break
  252. def _cancel_running_llm_task(self) -> None:
  253. """
  254. Try to cancell the currently running llm_task if exists, and set it to None.
  255. """
  256. if self._llm_task is not None:
  257. if self._llm_task.done():
  258. self._llm_task = None
  259. return
  260. cancelled = self._llm_task.cancel()
  261. if cancelled:
  262. self._llm_task = None
  263. if not cancelled:
  264. warnings.warn(
  265. "LLM task not cancelled, does your provider support cancellation?"
  266. )
  267. async def _trigger_llm(self, buffer) -> None:
  268. """
  269. This will ask the current llm provider a suggestion for the current buffer.
  270. If there is a currently running llm task, it will cancel it.
  271. """
  272. # we likely want to store the current cursor position, and cancel if the cursor has moved.
  273. if not self._llm_provider:
  274. warnings.warn("No LLM provider found, cannot trigger LLM completions")
  275. return
  276. if jai_models is None:
  277. warnings.warn(
  278. "LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
  279. )
  280. self._cancel_running_llm_task()
  281. async def error_catcher(buffer):
  282. """
  283. This catches and log any errors, as otherwise this is just
  284. lost in the void of the future running task.
  285. """
  286. try:
  287. await self._trigger_llm_core(buffer)
  288. except Exception as e:
  289. get_ipython().log.error("error")
  290. raise
  291. # here we need a cancellable task so we can't just await the error catched
  292. self._llm_task = asyncio.create_task(error_catcher(buffer))
  293. await self._llm_task
  294. async def _trigger_llm_core(self, buffer: Buffer):
  295. """
  296. This is the core of the current llm request.
  297. Here we build a compatible `InlineCompletionRequest` and ask the llm
  298. provider to stream it's response back to us iteratively setting it as
  299. the suggestion on the current buffer.
  300. Unlike with JupyterAi, as we do not have multiple cell, the cell number
  301. is always set to `0`, note that we _could_ set it to a new number each
  302. time and ignore threply from past numbers.
  303. We set the prefix to the current cell content, but could also inset the
  304. rest of the history or even just the non-fail history.
  305. In the same way, we do not have cell id.
  306. LLM provider may return multiple suggestion stream, but for the time
  307. being we only support one.
  308. Here we make the assumption that the provider will have
  309. stream_inline_completions, I'm not sure it is the case for all
  310. providers.
  311. """
  312. request = jai_models.InlineCompletionRequest(
  313. number=0,
  314. prefix=buffer.document.text,
  315. suffix="",
  316. mime="text/x-python",
  317. stream=True,
  318. path=None,
  319. language="python",
  320. cell_id=None,
  321. )
  322. async for reply_and_chunks in self._llm_provider.stream_inline_completions(
  323. request
  324. ):
  325. if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
  326. if len(reply_and_chunks.list.items) > 1:
  327. raise ValueError(
  328. "Terminal IPython cannot deal with multiple LLM suggestions at once"
  329. )
  330. buffer.suggestion = Suggestion(
  331. reply_and_chunks.list.items[0].insertText
  332. )
  333. buffer.on_suggestion_set.fire()
  334. elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
  335. buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
  336. buffer.on_suggestion_set.fire()
  337. return
  338. _MIN_LINES = 5
  339. async def llm_autosuggestion(event: KeyPressEvent):
  340. """
  341. Ask the AutoSuggester from history to delegate to ask an LLM for completion
  342. This will first make sure that the current buffer have _MIN_LINES (7)
  343. available lines to insert the LLM completion
  344. Provisional as of 8.32, may change without warnigns
  345. """
  346. provider = get_ipython().auto_suggest
  347. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  348. return
  349. doc = event.current_buffer.document
  350. lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
  351. for _ in range(lines_to_insert):
  352. event.current_buffer.insert_text("\n", move_cursor=False)
  353. await provider._trigger_llm(event.current_buffer)
  354. def accept_or_jump_to_end(event: KeyPressEvent):
  355. """Apply autosuggestion or jump to end of line."""
  356. buffer = event.current_buffer
  357. d = buffer.document
  358. after_cursor = d.text[d.cursor_position :]
  359. lines = after_cursor.split("\n")
  360. end_of_current_line = lines[0].strip()
  361. suggestion = buffer.suggestion
  362. if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
  363. buffer.insert_text(suggestion.text)
  364. else:
  365. nc.end_of_line(event)
  366. def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
  367. """Accept autosuggestion or jump to end of line.
  368. .. deprecated:: 8.12
  369. Use `accept_or_jump_to_end` instead.
  370. """
  371. return accept_or_jump_to_end(event)
  372. def accept(event: KeyPressEvent):
  373. """Accept autosuggestion"""
  374. buffer = event.current_buffer
  375. suggestion = buffer.suggestion
  376. if suggestion:
  377. buffer.insert_text(suggestion.text)
  378. else:
  379. nc.forward_char(event)
  380. def discard(event: KeyPressEvent):
  381. """Discard autosuggestion"""
  382. buffer = event.current_buffer
  383. buffer.suggestion = None
  384. def accept_word(event: KeyPressEvent):
  385. """Fill partial autosuggestion by word"""
  386. buffer = event.current_buffer
  387. suggestion = buffer.suggestion
  388. if suggestion:
  389. t = re.split(r"(\S+\s+)", suggestion.text)
  390. buffer.insert_text(next((x for x in t if x), ""))
  391. else:
  392. nc.forward_word(event)
  393. def accept_character(event: KeyPressEvent):
  394. """Fill partial autosuggestion by character"""
  395. b = event.current_buffer
  396. suggestion = b.suggestion
  397. if suggestion and suggestion.text:
  398. b.insert_text(suggestion.text[0])
  399. def accept_and_keep_cursor(event: KeyPressEvent):
  400. """Accept autosuggestion and keep cursor in place"""
  401. buffer = event.current_buffer
  402. old_position = buffer.cursor_position
  403. suggestion = buffer.suggestion
  404. if suggestion:
  405. buffer.insert_text(suggestion.text)
  406. buffer.cursor_position = old_position
  407. def accept_and_move_cursor_left(event: KeyPressEvent):
  408. """Accept autosuggestion and move cursor left in place"""
  409. accept_and_keep_cursor(event)
  410. nc.backward_char(event)
  411. def _update_hint(buffer: Buffer):
  412. if buffer.auto_suggest:
  413. suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
  414. buffer.suggestion = suggestion
  415. def backspace_and_resume_hint(event: KeyPressEvent):
  416. """Resume autosuggestions after deleting last character"""
  417. nc.backward_delete_char(event)
  418. _update_hint(event.current_buffer)
  419. def resume_hinting(event: KeyPressEvent):
  420. """Resume autosuggestions"""
  421. pass_through.reply(event)
  422. # Order matters: if update happened first and event reply second, the
  423. # suggestion would be auto-accepted if both actions are bound to same key.
  424. _update_hint(event.current_buffer)
  425. def up_and_update_hint(event: KeyPressEvent):
  426. """Go up and update hint"""
  427. current_buffer = event.current_buffer
  428. current_buffer.auto_up(count=event.arg)
  429. _update_hint(current_buffer)
  430. def down_and_update_hint(event: KeyPressEvent):
  431. """Go down and update hint"""
  432. current_buffer = event.current_buffer
  433. current_buffer.auto_down(count=event.arg)
  434. _update_hint(current_buffer)
  435. def accept_token(event: KeyPressEvent):
  436. """Fill partial autosuggestion by token"""
  437. b = event.current_buffer
  438. suggestion = b.suggestion
  439. if suggestion:
  440. prefix = _get_query(b.document)
  441. text = prefix + suggestion.text
  442. tokens: List[Optional[str]] = [None, None, None]
  443. substrings = [""]
  444. i = 0
  445. for token in generate_tokens(StringIO(text).readline):
  446. if token.type == tokenize.NEWLINE:
  447. index = len(text)
  448. else:
  449. index = text.index(token[1], len(substrings[-1]))
  450. substrings.append(text[:index])
  451. tokenized_so_far = substrings[-1]
  452. if tokenized_so_far.startswith(prefix):
  453. if i == 0 and len(tokenized_so_far) > len(prefix):
  454. tokens[0] = tokenized_so_far[len(prefix) :]
  455. substrings.append(tokenized_so_far)
  456. i += 1
  457. tokens[i] = token[1]
  458. if i == 2:
  459. break
  460. i += 1
  461. if tokens[0]:
  462. to_insert: str
  463. insert_text = substrings[-2]
  464. if tokens[1] and len(tokens[1]) == 1:
  465. insert_text = substrings[-1]
  466. to_insert = insert_text[len(prefix) :]
  467. b.insert_text(to_insert)
  468. return
  469. nc.forward_word(event)
  470. Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
  471. def _swap_autosuggestion(
  472. buffer: Buffer,
  473. provider: NavigableAutoSuggestFromHistory,
  474. direction_method: Callable,
  475. ):
  476. """
  477. We skip most recent history entry (in either direction) if it equals the
  478. current autosuggestion because if user cycles when auto-suggestion is shown
  479. they most likely want something else than what was suggested (otherwise
  480. they would have accepted the suggestion).
  481. """
  482. suggestion = buffer.suggestion
  483. if not suggestion:
  484. return
  485. query = _get_query(buffer.document)
  486. current = query + suggestion.text
  487. direction_method(query=query, other_than=current, history=buffer.history)
  488. new_suggestion = provider.get_suggestion(buffer, buffer.document)
  489. buffer.suggestion = new_suggestion
  490. def swap_autosuggestion_up(event: KeyPressEvent):
  491. """Get next autosuggestion from history."""
  492. shell = get_ipython()
  493. provider = shell.auto_suggest
  494. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  495. return
  496. return _swap_autosuggestion(
  497. buffer=event.current_buffer, provider=provider, direction_method=provider.up
  498. )
  499. def swap_autosuggestion_down(event: KeyPressEvent):
  500. """Get previous autosuggestion from history."""
  501. shell = get_ipython()
  502. provider = shell.auto_suggest
  503. if not isinstance(provider, NavigableAutoSuggestFromHistory):
  504. return
  505. return _swap_autosuggestion(
  506. buffer=event.current_buffer,
  507. provider=provider,
  508. direction_method=provider.down,
  509. )
  510. def __getattr__(key):
  511. if key == "accept_in_vi_insert_mode":
  512. warnings.warn(
  513. "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
  514. "renamed to `accept_or_jump_to_end`. Please update your configuration "
  515. "accordingly",
  516. DeprecationWarning,
  517. stacklevel=2,
  518. )
  519. return _deprected_accept_in_vi_insert_mode
  520. raise AttributeError