123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- """A simple SQLite CLI for the sqlite3 module.
- Apart from using 'argparse' for the command-line interface,
- this module implements the REPL as a thin wrapper around
- the InteractiveConsole class from the 'code' stdlib module.
- """
- import sqlite3
- import sys
- from argparse import ArgumentParser
- from code import InteractiveConsole
- from textwrap import dedent
- def execute(c, sql, suppress_errors=True):
- """Helper that wraps execution of SQL code.
- This is used both by the REPL and by direct execution from the CLI.
- 'c' may be a cursor or a connection.
- 'sql' is the SQL string to execute.
- """
- try:
- for row in c.execute(sql):
- print(row)
- except sqlite3.Error as e:
- tp = type(e).__name__
- try:
- print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
- except AttributeError:
- print(f"{tp}: {e}", file=sys.stderr)
- if not suppress_errors:
- sys.exit(1)
- class SqliteInteractiveConsole(InteractiveConsole):
- """A simple SQLite REPL."""
- def __init__(self, connection):
- super().__init__()
- self._con = connection
- self._cur = connection.cursor()
- def runsource(self, source, filename="<input>", symbol="single"):
- """Override runsource, the core of the InteractiveConsole REPL.
- Return True if more input is needed; buffering is done automatically.
- Return False is input is a complete statement ready for execution.
- """
- match source:
- case ".version":
- print(f"{sqlite3.sqlite_version}")
- case ".help":
- print("Enter SQL code and press enter.")
- case ".quit":
- sys.exit(0)
- case _:
- if not sqlite3.complete_statement(source):
- return True
- execute(self._cur, source)
- return False
- def main(*args):
- parser = ArgumentParser(
- description="Python sqlite3 CLI",
- prog="python -m sqlite3",
- )
- parser.add_argument(
- "filename", type=str, default=":memory:", nargs="?",
- help=(
- "SQLite database to open (defaults to ':memory:'). "
- "A new database is created if the file does not previously exist."
- ),
- )
- parser.add_argument(
- "sql", type=str, nargs="?",
- help=(
- "An SQL query to execute. "
- "Any returned rows are printed to stdout."
- ),
- )
- parser.add_argument(
- "-v", "--version", action="version",
- version=f"SQLite version {sqlite3.sqlite_version}",
- help="Print underlying SQLite library version",
- )
- args = parser.parse_args(*args)
- if args.filename == ":memory:":
- db_name = "a transient in-memory database"
- else:
- db_name = repr(args.filename)
- # Prepare REPL banner and prompts.
- if sys.platform == "win32" and "idlelib.run" not in sys.modules:
- eofkey = "CTRL-Z"
- else:
- eofkey = "CTRL-D"
- banner = dedent(f"""
- sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
- Connected to {db_name}
- Each command will be run using execute() on the cursor.
- Type ".help" for more information; type ".quit" or {eofkey} to quit.
- """).strip()
- sys.ps1 = "sqlite> "
- sys.ps2 = " ... "
- con = sqlite3.connect(args.filename, isolation_level=None)
- try:
- if args.sql:
- # SQL statement provided on the command-line; execute it directly.
- execute(con, args.sql, suppress_errors=False)
- else:
- # No SQL provided; start the REPL.
- console = SqliteInteractiveConsole(con)
- console.interact(banner, exitmsg="")
- finally:
- con.close()
- sys.exit(0)
- if __name__ == "__main__":
- main(sys.argv[1:])
|