123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648 |
- import re
- import asyncio
- import tokenize
- from io import StringIO
- from typing import Callable, List, Optional, Union, Generator, Tuple, ClassVar, Any
- import warnings
- import prompt_toolkit
- from prompt_toolkit.buffer import Buffer
- from prompt_toolkit.key_binding import KeyPressEvent
- from prompt_toolkit.key_binding.bindings import named_commands as nc
- from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion, AutoSuggest
- from prompt_toolkit.document import Document
- from prompt_toolkit.history import History
- from prompt_toolkit.shortcuts import PromptSession
- from prompt_toolkit.layout.processors import (
- Processor,
- Transformation,
- TransformationInput,
- )
- from IPython.core.getipython import get_ipython
- from IPython.utils.tokenutil import generate_tokens
- from .filters import pass_through
- try:
- import jupyter_ai_magics
- import jupyter_ai.completions.models as jai_models
- except ModuleNotFoundError:
- jai_models = None
- def _get_query(document: Document):
- return document.lines[document.cursor_position_row]
- class AppendAutoSuggestionInAnyLine(Processor):
- """
- Append the auto suggestion to lines other than the last (appending to the
- last line is natively supported by the prompt toolkit).
- This has a private `_debug` attribute that can be set to True to display
- debug information as virtual suggestion on the end of any line. You can do
- so with:
- >>> from IPython.terminal.shortcuts.auto_suggest import AppendAutoSuggestionInAnyLine
- >>> AppendAutoSuggestionInAnyLine._debug = True
- """
- _debug: ClassVar[bool] = False
- def __init__(self, style: str = "class:auto-suggestion") -> None:
- self.style = style
- def apply_transformation(self, ti: TransformationInput) -> Transformation:
- """
- Apply transformation to the line that is currently being edited.
- This is a variation of the original implementation in prompt toolkit
- that allows to not only append suggestions to any line, but also to show
- multi-line suggestions.
- As transformation are applied on a line-by-line basis; we need to trick
- a bit, and elide any line that is after the line we are currently
- editing, until we run out of completions. We cannot shift the existing
- lines
- There are multiple cases to handle:
- The completions ends before the end of the buffer:
- We can resume showing the normal line, and say that some code may
- be hidden.
- The completions ends at the end of the buffer
- We can just say that some code may be hidden.
- And separately:
- The completions ends beyond the end of the buffer
- We need to both say that some code may be hidden, and that some
- lines are not shown.
- """
- last_line_number = ti.document.line_count - 1
- is_last_line = ti.lineno == last_line_number
- noop = lambda text: Transformation(
- fragments=ti.fragments + [(self.style, " " + text if self._debug else "")]
- )
- if ti.document.line_count == 1:
- return noop("noop:oneline")
- if ti.document.cursor_position_row == last_line_number and is_last_line:
- # prompt toolkit already appends something; just leave it be
- return noop("noop:last line and cursor")
- # first everything before the current line is unchanged.
- if ti.lineno < ti.document.cursor_position_row:
- return noop("noop:before cursor")
- buffer = ti.buffer_control.buffer
- if not buffer.suggestion or not ti.document.is_cursor_at_the_end_of_line:
- return noop("noop:not eol")
- delta = ti.lineno - ti.document.cursor_position_row
- suggestions = buffer.suggestion.text.splitlines()
- if len(suggestions) == 0:
- return noop("noop: no suggestions")
- suggestions_longer_than_buffer: bool = (
- len(suggestions) + ti.document.cursor_position_row > ti.document.line_count
- )
- if len(suggestions) >= 1 and prompt_toolkit.VERSION < (3, 0, 49):
- if ti.lineno == ti.document.cursor_position_row:
- return Transformation(
- fragments=ti.fragments
- + [
- (
- "red",
- "(Cannot show multiline suggestion; requires prompt_toolkit > 3.0.49)",
- )
- ]
- )
- else:
- return Transformation(fragments=ti.fragments)
- if delta == 0:
- suggestion = suggestions[0]
- return Transformation(fragments=ti.fragments + [(self.style, suggestion)])
- if is_last_line:
- if delta < len(suggestions):
- extra = f"; {len(suggestions) - delta} line(s) hidden"
- suggestion = f"… rest of suggestion ({len(suggestions) - delta} lines) and code hidden"
- return Transformation([(self.style, suggestion)])
- n_elided = len(suggestions)
- for i in range(len(suggestions)):
- ll = ti.get_line(last_line_number - i)
- el = "".join(l[1] for l in ll).strip()
- if el:
- break
- else:
- n_elided -= 1
- if n_elided:
- return Transformation([(self.style, f"… {n_elided} line(s) hidden")])
- else:
- return Transformation(
- ti.get_line(last_line_number - len(suggestions) + 1)
- + ([(self.style, "shift-last-line")] if self._debug else [])
- )
- elif delta < len(suggestions):
- suggestion = suggestions[delta]
- return Transformation([(self.style, suggestion)])
- else:
- shift = ti.lineno - len(suggestions) + 1
- return Transformation(ti.get_line(shift))
- class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory):
- """
- A subclass of AutoSuggestFromHistory that allow navigation to next/previous
- suggestion from history. To do so it remembers the current position, but it
- state need to carefully be cleared on the right events.
- """
- skip_lines: int
- _connected_apps: list[PromptSession]
- # handle to the currently running llm task that appends suggestions to the
- # current buffer; we keep a handle to it in order to cancell it when there is a cursor movement, or
- # another request.
- _llm_task: asyncio.Task | None = None
- # This is the instance of the LLM provider from jupyter-ai to which we forward the request
- # to generate inline completions.
- _llm_provider: Any | None
- def __init__(self):
- super().__init__()
- self.skip_lines = 0
- self._connected_apps = []
- self._llm_provider = None
- def reset_history_position(self, _: Buffer):
- self.skip_lines = 0
- def disconnect(self) -> None:
- self._cancel_running_llm_task()
- for pt_app in self._connected_apps:
- text_insert_event = pt_app.default_buffer.on_text_insert
- text_insert_event.remove_handler(self.reset_history_position)
- def connect(self, pt_app: PromptSession):
- self._connected_apps.append(pt_app)
- # note: `on_text_changed` could be used for a bit different behaviour
- # on character deletion (i.e. resetting history position on backspace)
- pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position)
- pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss)
- def get_suggestion(
- self, buffer: Buffer, document: Document
- ) -> Optional[Suggestion]:
- text = _get_query(document)
- if text.strip():
- for suggestion, _ in self._find_next_match(
- text, self.skip_lines, buffer.history
- ):
- return Suggestion(suggestion)
- return None
- def _dismiss(self, buffer, *args, **kwargs) -> None:
- self._cancel_running_llm_task()
- buffer.suggestion = None
- def _find_match(
- self, text: str, skip_lines: float, history: History, previous: bool
- ) -> Generator[Tuple[str, float], None, None]:
- """
- text : str
- Text content to find a match for, the user cursor is most of the
- time at the end of this text.
- skip_lines : float
- number of items to skip in the search, this is used to indicate how
- far in the list the user has navigated by pressing up or down.
- The float type is used as the base value is +inf
- history : History
- prompt_toolkit History instance to fetch previous entries from.
- previous : bool
- Direction of the search, whether we are looking previous match
- (True), or next match (False).
- Yields
- ------
- Tuple with:
- str:
- current suggestion.
- float:
- will actually yield only ints, which is passed back via skip_lines,
- which may be a +inf (float)
- """
- line_number = -1
- for string in reversed(list(history.get_strings())):
- for line in reversed(string.splitlines()):
- line_number += 1
- if not previous and line_number < skip_lines:
- continue
- # do not return empty suggestions as these
- # close the auto-suggestion overlay (and are useless)
- if line.startswith(text) and len(line) > len(text):
- yield line[len(text) :], line_number
- if previous and line_number >= skip_lines:
- return
- def _find_next_match(
- self, text: str, skip_lines: float, history: History
- ) -> Generator[Tuple[str, float], None, None]:
- return self._find_match(text, skip_lines, history, previous=False)
- def _find_previous_match(self, text: str, skip_lines: float, history: History):
- return reversed(
- list(self._find_match(text, skip_lines, history, previous=True))
- )
- def up(self, query: str, other_than: str, history: History) -> None:
- self._cancel_running_llm_task()
- for suggestion, line_number in self._find_next_match(
- query, self.skip_lines, history
- ):
- # if user has history ['very.a', 'very', 'very.b'] and typed 'very'
- # we want to switch from 'very.b' to 'very.a' because a) if the
- # suggestion equals current text, prompt-toolkit aborts suggesting
- # b) user likely would not be interested in 'very' anyways (they
- # already typed it).
- if query + suggestion != other_than:
- self.skip_lines = line_number
- break
- else:
- # no matches found, cycle back to beginning
- self.skip_lines = 0
- def down(self, query: str, other_than: str, history: History) -> None:
- self._cancel_running_llm_task()
- for suggestion, line_number in self._find_previous_match(
- query, self.skip_lines, history
- ):
- if query + suggestion != other_than:
- self.skip_lines = line_number
- break
- else:
- # no matches found, cycle to end
- for suggestion, line_number in self._find_previous_match(
- query, float("Inf"), history
- ):
- if query + suggestion != other_than:
- self.skip_lines = line_number
- break
- def _cancel_running_llm_task(self) -> None:
- """
- Try to cancell the currently running llm_task if exists, and set it to None.
- """
- if self._llm_task is not None:
- if self._llm_task.done():
- self._llm_task = None
- return
- cancelled = self._llm_task.cancel()
- if cancelled:
- self._llm_task = None
- if not cancelled:
- warnings.warn(
- "LLM task not cancelled, does your provider support cancellation?"
- )
- async def _trigger_llm(self, buffer) -> None:
- """
- This will ask the current llm provider a suggestion for the current buffer.
- If there is a currently running llm task, it will cancel it.
- """
- # we likely want to store the current cursor position, and cancel if the cursor has moved.
- if not self._llm_provider:
- warnings.warn("No LLM provider found, cannot trigger LLM completions")
- return
- if jai_models is None:
- warnings.warn(
- "LLM Completion requires `jupyter_ai_magics` and `jupyter_ai` to be installed"
- )
- self._cancel_running_llm_task()
- async def error_catcher(buffer):
- """
- This catches and log any errors, as otherwise this is just
- lost in the void of the future running task.
- """
- try:
- await self._trigger_llm_core(buffer)
- except Exception as e:
- get_ipython().log.error("error")
- raise
- # here we need a cancellable task so we can't just await the error catched
- self._llm_task = asyncio.create_task(error_catcher(buffer))
- await self._llm_task
- async def _trigger_llm_core(self, buffer: Buffer):
- """
- This is the core of the current llm request.
- Here we build a compatible `InlineCompletionRequest` and ask the llm
- provider to stream it's response back to us iteratively setting it as
- the suggestion on the current buffer.
- Unlike with JupyterAi, as we do not have multiple cell, the cell number
- is always set to `0`, note that we _could_ set it to a new number each
- time and ignore threply from past numbers.
- We set the prefix to the current cell content, but could also inset the
- rest of the history or even just the non-fail history.
- In the same way, we do not have cell id.
- LLM provider may return multiple suggestion stream, but for the time
- being we only support one.
- Here we make the assumption that the provider will have
- stream_inline_completions, I'm not sure it is the case for all
- providers.
- """
- request = jai_models.InlineCompletionRequest(
- number=0,
- prefix=buffer.document.text,
- suffix="",
- mime="text/x-python",
- stream=True,
- path=None,
- language="python",
- cell_id=None,
- )
- async for reply_and_chunks in self._llm_provider.stream_inline_completions(
- request
- ):
- if isinstance(reply_and_chunks, jai_models.InlineCompletionReply):
- if len(reply_and_chunks.list.items) > 1:
- raise ValueError(
- "Terminal IPython cannot deal with multiple LLM suggestions at once"
- )
- buffer.suggestion = Suggestion(
- reply_and_chunks.list.items[0].insertText
- )
- buffer.on_suggestion_set.fire()
- elif isinstance(reply_and_chunks, jai_models.InlineCompletionStreamChunk):
- buffer.suggestion = Suggestion(reply_and_chunks.response.insertText)
- buffer.on_suggestion_set.fire()
- return
- _MIN_LINES = 5
- async def llm_autosuggestion(event: KeyPressEvent):
- """
- Ask the AutoSuggester from history to delegate to ask an LLM for completion
- This will first make sure that the current buffer have _MIN_LINES (7)
- available lines to insert the LLM completion
- Provisional as of 8.32, may change without warnigns
- """
- provider = get_ipython().auto_suggest
- if not isinstance(provider, NavigableAutoSuggestFromHistory):
- return
- doc = event.current_buffer.document
- lines_to_insert = max(0, _MIN_LINES - doc.line_count + doc.cursor_position_row)
- for _ in range(lines_to_insert):
- event.current_buffer.insert_text("\n", move_cursor=False)
- await provider._trigger_llm(event.current_buffer)
- def accept_or_jump_to_end(event: KeyPressEvent):
- """Apply autosuggestion or jump to end of line."""
- buffer = event.current_buffer
- d = buffer.document
- after_cursor = d.text[d.cursor_position :]
- lines = after_cursor.split("\n")
- end_of_current_line = lines[0].strip()
- suggestion = buffer.suggestion
- if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""):
- buffer.insert_text(suggestion.text)
- else:
- nc.end_of_line(event)
- def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent):
- """Accept autosuggestion or jump to end of line.
- .. deprecated:: 8.12
- Use `accept_or_jump_to_end` instead.
- """
- return accept_or_jump_to_end(event)
- def accept(event: KeyPressEvent):
- """Accept autosuggestion"""
- buffer = event.current_buffer
- suggestion = buffer.suggestion
- if suggestion:
- buffer.insert_text(suggestion.text)
- else:
- nc.forward_char(event)
- def discard(event: KeyPressEvent):
- """Discard autosuggestion"""
- buffer = event.current_buffer
- buffer.suggestion = None
- def accept_word(event: KeyPressEvent):
- """Fill partial autosuggestion by word"""
- buffer = event.current_buffer
- suggestion = buffer.suggestion
- if suggestion:
- t = re.split(r"(\S+\s+)", suggestion.text)
- buffer.insert_text(next((x for x in t if x), ""))
- else:
- nc.forward_word(event)
- def accept_character(event: KeyPressEvent):
- """Fill partial autosuggestion by character"""
- b = event.current_buffer
- suggestion = b.suggestion
- if suggestion and suggestion.text:
- b.insert_text(suggestion.text[0])
- def accept_and_keep_cursor(event: KeyPressEvent):
- """Accept autosuggestion and keep cursor in place"""
- buffer = event.current_buffer
- old_position = buffer.cursor_position
- suggestion = buffer.suggestion
- if suggestion:
- buffer.insert_text(suggestion.text)
- buffer.cursor_position = old_position
- def accept_and_move_cursor_left(event: KeyPressEvent):
- """Accept autosuggestion and move cursor left in place"""
- accept_and_keep_cursor(event)
- nc.backward_char(event)
- def _update_hint(buffer: Buffer):
- if buffer.auto_suggest:
- suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document)
- buffer.suggestion = suggestion
- def backspace_and_resume_hint(event: KeyPressEvent):
- """Resume autosuggestions after deleting last character"""
- nc.backward_delete_char(event)
- _update_hint(event.current_buffer)
- def resume_hinting(event: KeyPressEvent):
- """Resume autosuggestions"""
- pass_through.reply(event)
- # Order matters: if update happened first and event reply second, the
- # suggestion would be auto-accepted if both actions are bound to same key.
- _update_hint(event.current_buffer)
- def up_and_update_hint(event: KeyPressEvent):
- """Go up and update hint"""
- current_buffer = event.current_buffer
- current_buffer.auto_up(count=event.arg)
- _update_hint(current_buffer)
- def down_and_update_hint(event: KeyPressEvent):
- """Go down and update hint"""
- current_buffer = event.current_buffer
- current_buffer.auto_down(count=event.arg)
- _update_hint(current_buffer)
- def accept_token(event: KeyPressEvent):
- """Fill partial autosuggestion by token"""
- b = event.current_buffer
- suggestion = b.suggestion
- if suggestion:
- prefix = _get_query(b.document)
- text = prefix + suggestion.text
- tokens: List[Optional[str]] = [None, None, None]
- substrings = [""]
- i = 0
- for token in generate_tokens(StringIO(text).readline):
- if token.type == tokenize.NEWLINE:
- index = len(text)
- else:
- index = text.index(token[1], len(substrings[-1]))
- substrings.append(text[:index])
- tokenized_so_far = substrings[-1]
- if tokenized_so_far.startswith(prefix):
- if i == 0 and len(tokenized_so_far) > len(prefix):
- tokens[0] = tokenized_so_far[len(prefix) :]
- substrings.append(tokenized_so_far)
- i += 1
- tokens[i] = token[1]
- if i == 2:
- break
- i += 1
- if tokens[0]:
- to_insert: str
- insert_text = substrings[-2]
- if tokens[1] and len(tokens[1]) == 1:
- insert_text = substrings[-1]
- to_insert = insert_text[len(prefix) :]
- b.insert_text(to_insert)
- return
- nc.forward_word(event)
- Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None]
- def _swap_autosuggestion(
- buffer: Buffer,
- provider: NavigableAutoSuggestFromHistory,
- direction_method: Callable,
- ):
- """
- We skip most recent history entry (in either direction) if it equals the
- current autosuggestion because if user cycles when auto-suggestion is shown
- they most likely want something else than what was suggested (otherwise
- they would have accepted the suggestion).
- """
- suggestion = buffer.suggestion
- if not suggestion:
- return
- query = _get_query(buffer.document)
- current = query + suggestion.text
- direction_method(query=query, other_than=current, history=buffer.history)
- new_suggestion = provider.get_suggestion(buffer, buffer.document)
- buffer.suggestion = new_suggestion
- def swap_autosuggestion_up(event: KeyPressEvent):
- """Get next autosuggestion from history."""
- shell = get_ipython()
- provider = shell.auto_suggest
- if not isinstance(provider, NavigableAutoSuggestFromHistory):
- return
- return _swap_autosuggestion(
- buffer=event.current_buffer, provider=provider, direction_method=provider.up
- )
- def swap_autosuggestion_down(event: KeyPressEvent):
- """Get previous autosuggestion from history."""
- shell = get_ipython()
- provider = shell.auto_suggest
- if not isinstance(provider, NavigableAutoSuggestFromHistory):
- return
- return _swap_autosuggestion(
- buffer=event.current_buffer,
- provider=provider,
- direction_method=provider.down,
- )
- def __getattr__(key):
- if key == "accept_in_vi_insert_mode":
- warnings.warn(
- "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and "
- "renamed to `accept_or_jump_to_end`. Please update your configuration "
- "accordingly",
- DeprecationWarning,
- stacklevel=2,
- )
- return _deprected_accept_in_vi_insert_mode
- raise AttributeError
|