1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from typing import Union
- from clickhouse_connect.dbapi.cursor import Cursor
- from clickhouse_connect.driver import create_client
- from clickhouse_connect.driver.query import QueryResult
- class Connection:
- """
- See :ref:`https://peps.python.org/pep-0249/`
- """
- # pylint: disable=too-many-arguments
- def __init__(self,
- dsn: str = None,
- username: str = '',
- password: str = '',
- host: str = None,
- database: str = None,
- interface: str = None,
- port: int = 0,
- secure: Union[bool, str] = False,
- **kwargs):
- self.client = create_client(host=host,
- username=username,
- password=password,
- database=database,
- interface=interface,
- port=port,
- secure=secure,
- dsn=dsn,
- generic_args=kwargs)
- self.timezone = self.client.server_tz
- def close(self):
- self.client.close()
- def commit(self):
- pass
- def rollback(self):
- pass
- def command(self, cmd: str):
- return self.client.command(cmd)
- def raw_query(self, query: str) -> QueryResult:
- return self.client.query(query)
- def cursor(self):
- return Cursor(self.client)
|