connection.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from typing import Union
  2. from clickhouse_connect.dbapi.cursor import Cursor
  3. from clickhouse_connect.driver import create_client
  4. from clickhouse_connect.driver.query import QueryResult
  5. class Connection:
  6. """
  7. See :ref:`https://peps.python.org/pep-0249/`
  8. """
  9. # pylint: disable=too-many-arguments
  10. def __init__(self,
  11. dsn: str = None,
  12. username: str = '',
  13. password: str = '',
  14. host: str = None,
  15. database: str = None,
  16. interface: str = None,
  17. port: int = 0,
  18. secure: Union[bool, str] = False,
  19. **kwargs):
  20. self.client = create_client(host=host,
  21. username=username,
  22. password=password,
  23. database=database,
  24. interface=interface,
  25. port=port,
  26. secure=secure,
  27. dsn=dsn,
  28. generic_args=kwargs)
  29. self.timezone = self.client.server_tz
  30. def close(self):
  31. self.client.close()
  32. def commit(self):
  33. pass
  34. def rollback(self):
  35. pass
  36. def command(self, cmd: str):
  37. return self.client.command(cmd)
  38. def raw_query(self, query: str) -> QueryResult:
  39. return self.client.query(query)
  40. def cursor(self):
  41. return Cursor(self.client)