client.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. import io
  2. import logging
  3. from datetime import tzinfo
  4. import pytz
  5. from abc import ABC, abstractmethod
  6. from typing import Iterable, Optional, Any, Union, Sequence, Dict, Generator, BinaryIO
  7. from pytz.exceptions import UnknownTimeZoneError
  8. from clickhouse_connect import common
  9. from clickhouse_connect.common import version
  10. from clickhouse_connect.datatypes.registry import get_from_name
  11. from clickhouse_connect.datatypes.base import ClickHouseType
  12. from clickhouse_connect.datatypes import dynamic as dynamic_module
  13. from clickhouse_connect.driver import tzutil
  14. from clickhouse_connect.driver.common import dict_copy, StreamContext, coerce_int, coerce_bool
  15. from clickhouse_connect.driver.constants import CH_VERSION_WITH_PROTOCOL, PROTOCOL_VERSION_WITH_LOW_CARD
  16. from clickhouse_connect.driver.exceptions import ProgrammingError, OperationalError
  17. from clickhouse_connect.driver.external import ExternalData
  18. from clickhouse_connect.driver.insert import InsertContext
  19. from clickhouse_connect.driver.summary import QuerySummary
  20. from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus
  21. from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer
  22. from clickhouse_connect.driver.binding import quote_identifier
  23. io.DEFAULT_BUFFER_SIZE = 1024 * 256
  24. logger = logging.getLogger(__name__)
  25. arrow_str_setting = 'output_format_arrow_string_as_string'
  26. # pylint: disable=too-many-public-methods,too-many-arguments,too-many-positional-arguments,too-many-instance-attributes
  27. class Client(ABC):
  28. """
  29. Base ClickHouse Connect client
  30. """
  31. compression: str = None
  32. write_compression: str = None
  33. protocol_version = 0
  34. valid_transport_settings = set()
  35. optional_transport_settings = set()
  36. database = None
  37. max_error_message = 0
  38. apply_server_timezone = False
  39. show_clickhouse_errors = True
  40. def __init__(self,
  41. database: str,
  42. query_limit: int,
  43. uri: str,
  44. query_retries: int,
  45. server_host_name: Optional[str],
  46. apply_server_timezone: Optional[Union[str, bool]],
  47. show_clickhouse_errors: Optional[bool]):
  48. """
  49. Shared initialization of ClickHouse Connect client
  50. :param database: database name
  51. :param query_limit: default LIMIT for queries
  52. :param uri: uri for error messages
  53. """
  54. self.query_limit = coerce_int(query_limit)
  55. self.query_retries = coerce_int(query_retries)
  56. if database and not database == '__default__':
  57. self.database = database
  58. if show_clickhouse_errors is not None:
  59. self.show_clickhouse_errors = coerce_bool(show_clickhouse_errors)
  60. self.server_host_name = server_host_name
  61. self.uri = uri
  62. self._init_common_settings(apply_server_timezone)
  63. def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]] ):
  64. self.server_tz, dst_safe = pytz.UTC, True
  65. self.server_version, server_tz = \
  66. tuple(self.command('SELECT version(), timezone()', use_database=False))
  67. try:
  68. server_tz = pytz.timezone(server_tz)
  69. server_tz, dst_safe = tzutil.normalize_timezone(server_tz)
  70. if apply_server_timezone is None:
  71. apply_server_timezone = dst_safe
  72. self.apply_server_timezone = apply_server_timezone == 'always' or coerce_bool(apply_server_timezone)
  73. self.server_tz = server_tz
  74. except UnknownTimeZoneError:
  75. logger.warning('Warning, server is using an unrecognized timezone %s, will use UTC default', server_tz)
  76. if not self.apply_server_timezone and not tzutil.local_tz_dst_safe:
  77. logger.warning('local timezone %s may return unexpected times due to Daylight Savings Time/' +
  78. 'Summer Time differences', tzutil.local_tz.tzname(None))
  79. readonly = 'readonly'
  80. if not self.min_version('19.17'):
  81. readonly = common.get_setting('readonly')
  82. server_settings = self.query(f'SELECT name, value, {readonly} as readonly FROM system.settings LIMIT 10000')
  83. self.server_settings = {row['name']: SettingDef(**row) for row in server_settings.named_results()}
  84. if self.min_version(CH_VERSION_WITH_PROTOCOL) and common.get_setting('use_protocol_version'):
  85. # Unfortunately we have to validate that the client protocol version is actually used by ClickHouse
  86. # since the query parameter could be stripped off (in particular, by CHProxy)
  87. test_data = self.raw_query('SELECT 1 AS check', fmt='Native', settings={
  88. 'client_protocol_version': PROTOCOL_VERSION_WITH_LOW_CARD
  89. })
  90. if test_data[8:16] == b'\x01\x01\x05check':
  91. self.protocol_version = PROTOCOL_VERSION_WITH_LOW_CARD
  92. if self._setting_status('date_time_input_format').is_writable:
  93. self.set_client_setting('date_time_input_format', 'best_effort')
  94. if self._setting_status('allow_experimental_json_type').is_set and \
  95. self._setting_status('cast_string_to_dynamic_user_inference').is_writable:
  96. self.set_client_setting('cast_string_to_dynamic_use_inference', '1')
  97. if self.min_version('24.8') and not self.min_version('24.10'):
  98. dynamic_module.json_serialization_format = 0
  99. def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, str]:
  100. """
  101. This strips any ClickHouse settings that are not recognized or are read only.
  102. :param settings: Dictionary of setting name and values
  103. :return: A filtered dictionary of settings with values rendered as strings
  104. """
  105. validated = {}
  106. invalid_action = common.get_setting('invalid_setting_action')
  107. for key, value in settings.items():
  108. str_value = self._validate_setting(key, value, invalid_action)
  109. if str_value is not None:
  110. validated[key] = value
  111. return validated
  112. def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]:
  113. if key not in self.valid_transport_settings:
  114. setting_def = self.server_settings.get(key)
  115. if setting_def is None or setting_def.readonly:
  116. if key in self.optional_transport_settings:
  117. return None
  118. if invalid_action == 'send':
  119. logger.warning('Attempting to send unrecognized or readonly setting %s', key)
  120. elif invalid_action == 'drop':
  121. logger.warning('Dropping unrecognized or readonly settings %s', key)
  122. return None
  123. else:
  124. raise ProgrammingError(f'Setting {key} is unknown or readonly') from None
  125. if isinstance(value, bool):
  126. return '1' if value else '0'
  127. return str(value)
  128. def _setting_status(self, key: str) -> SettingStatus:
  129. comp_setting = self.server_settings.get(key)
  130. if not comp_setting:
  131. return SettingStatus(False, False)
  132. return SettingStatus(comp_setting.value != '0', comp_setting.readonly != 1)
  133. def _prep_query(self, context: QueryContext):
  134. if context.is_select and not context.has_limit and self.query_limit:
  135. limit = f'\n LIMIT {self.query_limit}'
  136. if isinstance(context.query, bytes):
  137. return context.final_query + limit.encode()
  138. return context.final_query + limit
  139. return context.final_query
  140. def _check_tz_change(self, new_tz) -> Optional[tzinfo]:
  141. if new_tz:
  142. try:
  143. new_tzinfo = pytz.timezone(new_tz)
  144. if new_tzinfo != self.server_tz:
  145. return new_tzinfo
  146. except UnknownTimeZoneError:
  147. logger.warning('Unrecognized timezone %s received from ClickHouse', new_tz)
  148. return None
  149. @abstractmethod
  150. def _query_with_context(self, context: QueryContext):
  151. pass
  152. @abstractmethod
  153. def set_client_setting(self, key, value):
  154. """
  155. Set a clickhouse setting for the client after initialization. If a setting is not recognized by ClickHouse,
  156. or the setting is identified as "read_only", this call will either throw a Programming exception or attempt
  157. to send the setting anyway based on the common setting 'invalid_setting_action'
  158. :param key: ClickHouse setting name
  159. :param value: ClickHouse setting value
  160. """
  161. @abstractmethod
  162. def get_client_setting(self, key) -> Optional[str]:
  163. """
  164. :param key: The setting key
  165. :return: The string value of the setting, if it exists, or None
  166. """
  167. # pylint: disable=unused-argument,too-many-locals
  168. def query(self,
  169. query: Optional[str] = None,
  170. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  171. settings: Optional[Dict[str, Any]] = None,
  172. query_formats: Optional[Dict[str, str]] = None,
  173. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  174. encoding: Optional[str] = None,
  175. use_none: Optional[bool] = None,
  176. column_oriented: Optional[bool] = None,
  177. use_numpy: Optional[bool] = None,
  178. max_str_len: Optional[int] = None,
  179. context: QueryContext = None,
  180. query_tz: Optional[Union[str, tzinfo]] = None,
  181. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  182. external_data: Optional[ExternalData] = None) -> QueryResult:
  183. """
  184. Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For
  185. parameters, see the create_query_context method
  186. :return: QueryResult -- data and metadata from response
  187. """
  188. if query and query.lower().strip().startswith('select __connect_version__'):
  189. return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None,
  190. ('connect_version',), (get_from_name('String'),))
  191. kwargs = locals().copy()
  192. del kwargs['self']
  193. query_context = self.create_query_context(**kwargs)
  194. if query_context.is_command:
  195. response = self.command(query,
  196. parameters=query_context.parameters,
  197. settings=query_context.settings,
  198. external_data=query_context.external_data)
  199. if isinstance(response, QuerySummary):
  200. return response.as_query_result()
  201. return QueryResult([response] if isinstance(response, list) else [[response]])
  202. return self._query_with_context(query_context)
  203. def query_column_block_stream(self,
  204. query: Optional[str] = None,
  205. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  206. settings: Optional[Dict[str, Any]] = None,
  207. query_formats: Optional[Dict[str, str]] = None,
  208. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  209. encoding: Optional[str] = None,
  210. use_none: Optional[bool] = None,
  211. context: QueryContext = None,
  212. query_tz: Optional[Union[str, tzinfo]] = None,
  213. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  214. external_data: Optional[ExternalData] = None) -> StreamContext:
  215. """
  216. Variation of main query method that returns a stream of column oriented blocks. For
  217. parameters, see the create_query_context method.
  218. :return: StreamContext -- Iterable stream context that returns column oriented blocks
  219. """
  220. return self._context_query(locals(), use_numpy=False, streaming=True).column_block_stream
  221. def query_row_block_stream(self,
  222. query: Optional[str] = None,
  223. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  224. settings: Optional[Dict[str, Any]] = None,
  225. query_formats: Optional[Dict[str, str]] = None,
  226. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  227. encoding: Optional[str] = None,
  228. use_none: Optional[bool] = None,
  229. context: QueryContext = None,
  230. query_tz: Optional[Union[str, tzinfo]] = None,
  231. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  232. external_data: Optional[ExternalData] = None) -> StreamContext:
  233. """
  234. Variation of main query method that returns a stream of row oriented blocks. For
  235. parameters, see the create_query_context method.
  236. :return: StreamContext -- Iterable stream context that returns blocks of rows
  237. """
  238. return self._context_query(locals(), use_numpy=False, streaming=True).row_block_stream
  239. def query_rows_stream(self,
  240. query: Optional[str] = None,
  241. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  242. settings: Optional[Dict[str, Any]] = None,
  243. query_formats: Optional[Dict[str, str]] = None,
  244. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  245. encoding: Optional[str] = None,
  246. use_none: Optional[bool] = None,
  247. context: QueryContext = None,
  248. query_tz: Optional[Union[str, tzinfo]] = None,
  249. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  250. external_data: Optional[ExternalData] = None) -> StreamContext:
  251. """
  252. Variation of main query method that returns a stream of row oriented blocks. For
  253. parameters, see the create_query_context method.
  254. :return: StreamContext -- Iterable stream context that returns blocks of rows
  255. """
  256. return self._context_query(locals(), use_numpy=False, streaming=True).rows_stream
  257. @abstractmethod
  258. def raw_query(self, query: str,
  259. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  260. settings: Optional[Dict[str, Any]] = None,
  261. fmt: str = None,
  262. use_database: bool = True,
  263. external_data: Optional[ExternalData] = None) -> bytes:
  264. """
  265. Query method that simply returns the raw ClickHouse format bytes
  266. :param query: Query statement/format string
  267. :param parameters: Optional dictionary used to format the query
  268. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  269. :param fmt: ClickHouse output format
  270. :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
  271. database context.
  272. :param external_data External data to send with the query
  273. :return: bytes representing raw ClickHouse return value based on format
  274. """
  275. @abstractmethod
  276. def raw_stream(self, query: str,
  277. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  278. settings: Optional[Dict[str, Any]] = None,
  279. fmt: str = None,
  280. use_database: bool = True,
  281. external_data: Optional[ExternalData] = None) -> io.IOBase:
  282. """
  283. Query method that returns the result as an io.IOBase iterator
  284. :param query: Query statement/format string
  285. :param parameters: Optional dictionary used to format the query
  286. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  287. :param fmt: ClickHouse output format
  288. :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
  289. database context.
  290. :param external_data External data to send with the query
  291. :return: io.IOBase stream/iterator for the result
  292. """
  293. # pylint: disable=duplicate-code,unused-argument
  294. def query_np(self,
  295. query: Optional[str] = None,
  296. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  297. settings: Optional[Dict[str, Any]] = None,
  298. query_formats: Optional[Dict[str, str]] = None,
  299. column_formats: Optional[Dict[str, str]] = None,
  300. encoding: Optional[str] = None,
  301. use_none: Optional[bool] = None,
  302. max_str_len: Optional[int] = None,
  303. context: QueryContext = None,
  304. external_data: Optional[ExternalData] = None):
  305. """
  306. Query method that returns the results as a numpy array. For parameter values, see the
  307. create_query_context method
  308. :return: Numpy array representing the result set
  309. """
  310. return self._context_query(locals(), use_numpy=True).np_result
  311. # pylint: disable=duplicate-code,too-many-arguments,unused-argument
  312. def query_np_stream(self,
  313. query: Optional[str] = None,
  314. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  315. settings: Optional[Dict[str, Any]] = None,
  316. query_formats: Optional[Dict[str, str]] = None,
  317. column_formats: Optional[Dict[str, str]] = None,
  318. encoding: Optional[str] = None,
  319. use_none: Optional[bool] = None,
  320. max_str_len: Optional[int] = None,
  321. context: QueryContext = None,
  322. external_data: Optional[ExternalData] = None) -> StreamContext:
  323. """
  324. Query method that returns the results as a stream of numpy arrays. For parameter values, see the
  325. create_query_context method
  326. :return: Generator that yield a numpy array per block representing the result set
  327. """
  328. return self._context_query(locals(), use_numpy=True, streaming=True).np_stream
  329. # pylint: disable=duplicate-code,unused-argument
  330. def query_df(self,
  331. query: Optional[str] = None,
  332. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  333. settings: Optional[Dict[str, Any]] = None,
  334. query_formats: Optional[Dict[str, str]] = None,
  335. column_formats: Optional[Dict[str, str]] = None,
  336. encoding: Optional[str] = None,
  337. use_none: Optional[bool] = None,
  338. max_str_len: Optional[int] = None,
  339. use_na_values: Optional[bool] = None,
  340. query_tz: Optional[str] = None,
  341. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  342. context: QueryContext = None,
  343. external_data: Optional[ExternalData] = None,
  344. use_extended_dtypes: Optional[bool] = None):
  345. """
  346. Query method that results the results as a pandas dataframe. For parameter values, see the
  347. create_query_context method
  348. :return: Pandas dataframe representing the result set
  349. """
  350. return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result
  351. # pylint: disable=duplicate-code,unused-argument
  352. def query_df_stream(self,
  353. query: Optional[str] = None,
  354. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  355. settings: Optional[Dict[str, Any]] = None,
  356. query_formats: Optional[Dict[str, str]] = None,
  357. column_formats: Optional[Dict[str, str]] = None,
  358. encoding: Optional[str] = None,
  359. use_none: Optional[bool] = None,
  360. max_str_len: Optional[int] = None,
  361. use_na_values: Optional[bool] = None,
  362. query_tz: Optional[str] = None,
  363. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  364. context: QueryContext = None,
  365. external_data: Optional[ExternalData] = None,
  366. use_extended_dtypes: Optional[bool] = None) -> StreamContext:
  367. """
  368. Query method that returns the results as a StreamContext. For parameter values, see the
  369. create_query_context method
  370. :return: Generator that yields a Pandas dataframe per block representing the result set
  371. """
  372. return self._context_query(locals(), use_numpy=True,
  373. as_pandas=True,
  374. streaming=True).df_stream
  375. def create_query_context(self,
  376. query: Optional[Union[str, bytes]] = None,
  377. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  378. settings: Optional[Dict[str, Any]] = None,
  379. query_formats: Optional[Dict[str, str]] = None,
  380. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  381. encoding: Optional[str] = None,
  382. use_none: Optional[bool] = None,
  383. column_oriented: Optional[bool] = None,
  384. use_numpy: Optional[bool] = False,
  385. max_str_len: Optional[int] = 0,
  386. context: Optional[QueryContext] = None,
  387. query_tz: Optional[Union[str, tzinfo]] = None,
  388. column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
  389. use_na_values: Optional[bool] = None,
  390. streaming: bool = False,
  391. as_pandas: bool = False,
  392. external_data: Optional[ExternalData] = None,
  393. use_extended_dtypes: Optional[bool] = None) -> QueryContext:
  394. """
  395. Creates or updates a reusable QueryContext object
  396. :param query: Query statement/format string
  397. :param parameters: Optional dictionary used to format the query
  398. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  399. :param query_formats: See QueryContext __init__ docstring
  400. :param column_formats: See QueryContext __init__ docstring
  401. :param encoding: See QueryContext __init__ docstring
  402. :param use_none: Use None for ClickHouse NULL instead of default values. Note that using None in Numpy
  403. arrays will force the numpy array dtype to 'object', which is often inefficient. This effect also
  404. will impact the performance of Pandas dataframes.
  405. :param column_oriented: Deprecated. Controls orientation of the QueryResult result_set property
  406. :param use_numpy: Return QueryResult columns as one-dimensional numpy arrays
  407. :param max_str_len: Limit returned ClickHouse String values to this length, which allows a Numpy
  408. structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for
  409. String columns will always be object arrays
  410. :param context: An existing QueryContext to be updated with any provided parameter values
  411. :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
  412. Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime
  413. objects with the selected timezone.
  414. :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to
  415. tzinfo objects). The timezone will be applied to datetime objects returned in the query
  416. :param use_na_values: Deprecated alias for use_advanced_dtypes
  417. :param as_pandas Return the result columns as pandas.Series objects
  418. :param streaming Marker used to correctly configure streaming queries
  419. :param external_data ClickHouse "external data" to send with query
  420. :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as
  421. pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray
  422. and StringArray. Defaulted to True for query_df methods
  423. :return: Reusable QueryContext
  424. """
  425. if context:
  426. return context.updated_copy(query=query,
  427. parameters=parameters,
  428. settings=settings,
  429. query_formats=query_formats,
  430. column_formats=column_formats,
  431. encoding=encoding,
  432. server_tz=self.server_tz,
  433. use_none=use_none,
  434. column_oriented=column_oriented,
  435. use_numpy=use_numpy,
  436. max_str_len=max_str_len,
  437. query_tz=query_tz,
  438. column_tzs=column_tzs,
  439. as_pandas=as_pandas,
  440. use_extended_dtypes=use_extended_dtypes,
  441. streaming=streaming,
  442. external_data=external_data)
  443. if use_numpy and max_str_len is None:
  444. max_str_len = 0
  445. if use_extended_dtypes is None:
  446. use_extended_dtypes = use_na_values
  447. if as_pandas and use_extended_dtypes is None:
  448. use_extended_dtypes = True
  449. return QueryContext(query=query,
  450. parameters=parameters,
  451. settings=settings,
  452. query_formats=query_formats,
  453. column_formats=column_formats,
  454. encoding=encoding,
  455. server_tz=self.server_tz,
  456. use_none=use_none,
  457. column_oriented=column_oriented,
  458. use_numpy=use_numpy,
  459. max_str_len=max_str_len,
  460. query_tz=query_tz,
  461. column_tzs=column_tzs,
  462. use_extended_dtypes=use_extended_dtypes,
  463. as_pandas=as_pandas,
  464. streaming=streaming,
  465. apply_server_tz=self.apply_server_timezone,
  466. external_data=external_data)
  467. def query_arrow(self,
  468. query: str,
  469. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  470. settings: Optional[Dict[str, Any]] = None,
  471. use_strings: Optional[bool] = None,
  472. external_data: Optional[ExternalData] = None):
  473. """
  474. Query method using the ClickHouse Arrow format to return a PyArrow table
  475. :param query: Query statement/format string
  476. :param parameters: Optional dictionary used to format the query
  477. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  478. :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
  479. :param external_data ClickHouse "external data" to send with query
  480. :return: PyArrow.Table
  481. """
  482. settings = self._update_arrow_settings(settings, use_strings)
  483. return to_arrow(self.raw_query(query,
  484. parameters,
  485. settings,
  486. fmt='Arrow',
  487. external_data=external_data))
  488. def query_arrow_stream(self,
  489. query: str,
  490. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  491. settings: Optional[Dict[str, Any]] = None,
  492. use_strings: Optional[bool] = None,
  493. external_data: Optional[ExternalData] = None) -> StreamContext:
  494. """
  495. Query method that returns the results as a stream of Arrow tables
  496. :param query: Query statement/format string
  497. :param parameters: Optional dictionary used to format the query
  498. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  499. :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
  500. :param external_data ClickHouse "external data" to send with query
  501. :return: Generator that yields a PyArrow.Table for per block representing the result set
  502. """
  503. settings = self._update_arrow_settings(settings, use_strings)
  504. return to_arrow_batches(self.raw_stream(query,
  505. parameters,
  506. settings,
  507. fmt='ArrowStream',
  508. external_data=external_data))
  509. def _update_arrow_settings(self,
  510. settings: Optional[Dict[str, Any]],
  511. use_strings: Optional[bool]) -> Dict[str, Any]:
  512. settings = dict_copy(settings)
  513. if self.database:
  514. settings['database'] = self.database
  515. str_status = self._setting_status(arrow_str_setting)
  516. if use_strings is None:
  517. if str_status.is_writable and not str_status.is_set:
  518. settings[arrow_str_setting] = '1' # Default to returning strings if possible
  519. elif use_strings != str_status.is_set:
  520. if not str_status.is_writable:
  521. raise OperationalError(f'Cannot change readonly {arrow_str_setting} to {use_strings}')
  522. settings[arrow_str_setting] = '1' if use_strings else '0'
  523. return settings
  524. @abstractmethod
  525. def command(self,
  526. cmd: str,
  527. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  528. data: Union[str, bytes] = None,
  529. settings: Dict[str, Any] = None,
  530. use_database: bool = True,
  531. external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
  532. """
  533. Client method that returns a single value instead of a result set
  534. :param cmd: ClickHouse query/command as a python format string
  535. :param parameters: Optional dictionary of key/values pairs to be formatted
  536. :param data: Optional 'data' for the command (for INSERT INTO in particular)
  537. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  538. :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client
  539. database context. Otherwise, no database will be specified with the command. This is useful for determining
  540. the default user database
  541. :param external_data ClickHouse "external data" to send with command/query
  542. :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary
  543. if no data returned
  544. """
  545. @abstractmethod
  546. def ping(self) -> bool:
  547. """
  548. Validate the connection, does not throw an Exception (see debug logs)
  549. :return: ClickHouse server is up and reachable
  550. """
  551. def insert(self,
  552. table: Optional[str] = None,
  553. data: Sequence[Sequence[Any]] = None,
  554. column_names: Union[str, Iterable[str]] = '*',
  555. database: Optional[str] = None,
  556. column_types: Sequence[ClickHouseType] = None,
  557. column_type_names: Sequence[str] = None,
  558. column_oriented: bool = False,
  559. settings: Optional[Dict[str, Any]] = None,
  560. context: InsertContext = None) -> QuerySummary:
  561. """
  562. Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments
  563. other than data are ignored
  564. :param table: Target table
  565. :param data: Sequence of sequences of Python data
  566. :param column_names: Ordered list of column names or '*' if column types should be retrieved from the
  567. ClickHouse table definition
  568. :param database: Target database -- will use client default database if not specified.
  569. :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
  570. the server
  571. :param column_type_names: ClickHouse column type names. If set then column data does not need to be
  572. retrieved from the server
  573. :param column_oriented: If true the data is already "pivoted" in column form
  574. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  575. :param context: Optional reusable insert context to allow repeated inserts into the same table with
  576. different data batches
  577. :return: QuerySummary with summary information, throws exception if insert fails
  578. """
  579. if (context is None or context.empty) and data is None:
  580. raise ProgrammingError('No data specified for insert') from None
  581. if context is None:
  582. context = self.create_insert_context(table,
  583. column_names,
  584. database,
  585. column_types,
  586. column_type_names,
  587. column_oriented,
  588. settings)
  589. if data is not None:
  590. if not context.empty:
  591. raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None
  592. context.data = data
  593. return self.data_insert(context)
  594. def insert_df(self, table: str = None,
  595. df=None,
  596. database: Optional[str] = None,
  597. settings: Optional[Dict] = None,
  598. column_names: Optional[Sequence[str]] = None,
  599. column_types: Sequence[ClickHouseType] = None,
  600. column_type_names: Sequence[str] = None,
  601. context: InsertContext = None) -> QuerySummary:
  602. """
  603. Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored
  604. :param table: ClickHouse table
  605. :param df: two-dimensional pandas dataframe
  606. :param database: Optional ClickHouse database
  607. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  608. :param column_names: An optional list of ClickHouse column names. If not set, the DataFrame column names
  609. will be used
  610. :param column_types: ClickHouse column types. If set then column data does not need to be retrieved from
  611. the server
  612. :param column_type_names: ClickHouse column type names. If set then column data does not need to be
  613. retrieved from the server
  614. :param context: Optional reusable insert context to allow repeated inserts into the same table with
  615. different data batches
  616. :return: QuerySummary with summary information, throws exception if insert fails
  617. """
  618. if context is None:
  619. if column_names is None:
  620. column_names = df.columns
  621. elif len(column_names) != len(df.columns):
  622. raise ProgrammingError('DataFrame column count does not match insert_columns') from None
  623. return self.insert(table,
  624. df,
  625. column_names,
  626. database,
  627. column_types=column_types,
  628. column_type_names=column_type_names,
  629. settings=settings, context=context)
  630. def insert_arrow(self, table: str,
  631. arrow_table,
  632. database: str = None,
  633. settings: Optional[Dict] = None) -> QuerySummary:
  634. """
  635. Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format
  636. :param table: ClickHouse table
  637. :param arrow_table: PyArrow Table object
  638. :param database: Optional ClickHouse database
  639. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  640. :return: QuerySummary with summary information, throws exception if insert fails
  641. """
  642. full_table = table if '.' in table or not database else f'{database}.{table}'
  643. compression = self.write_compression if self.write_compression in ('zstd', 'lz4') else None
  644. column_names, insert_block = arrow_buffer(arrow_table, compression)
  645. return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow')
  646. def create_insert_context(self,
  647. table: str,
  648. column_names: Optional[Union[str, Sequence[str]]] = None,
  649. database: Optional[str] = None,
  650. column_types: Sequence[ClickHouseType] = None,
  651. column_type_names: Sequence[str] = None,
  652. column_oriented: bool = False,
  653. settings: Optional[Dict[str, Any]] = None,
  654. data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext:
  655. """
  656. Builds a reusable insert context to hold state for a duration of an insert
  657. :param table: Target table
  658. :param database: Target database. If not set, uses the client default database
  659. :param column_names: Optional ordered list of column names. If not set, all columns ('*') will be assumed
  660. in the order specified by the table definition
  661. :param database: Target database -- will use client default database if not specified
  662. :param column_types: ClickHouse column types. Optional Sequence of ClickHouseType objects. If neither column
  663. types nor column type names are set, actual column types will be retrieved from the server.
  664. :param column_type_names: ClickHouse column type names. Specified column types by name string
  665. :param column_oriented: If true the data is already "pivoted" in column form
  666. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  667. :param data: Initial dataset for insert
  668. :return Reusable insert context
  669. """
  670. full_table = table
  671. if '.' not in table:
  672. if database:
  673. full_table = f'{quote_identifier(database)}.{quote_identifier(table)}'
  674. else:
  675. full_table = quote_identifier(table)
  676. column_defs = []
  677. if column_types is None and column_type_names is None:
  678. describe_result = self.query(f'DESCRIBE TABLE {full_table}')
  679. column_defs = [ColumnDef(**row) for row in describe_result.named_results()
  680. if row['default_type'] not in ('ALIAS', 'MATERIALIZED')]
  681. if column_names is None or isinstance(column_names, str) and column_names == '*':
  682. column_names = [cd.name for cd in column_defs]
  683. column_types = [cd.ch_type for cd in column_defs]
  684. elif isinstance(column_names, str):
  685. column_names = [column_names]
  686. if len(column_names) == 0:
  687. raise ValueError('Column names must be specified for insert')
  688. if not column_types:
  689. if column_type_names:
  690. column_types = [get_from_name(name) for name in column_type_names]
  691. else:
  692. column_map = {d.name: d for d in column_defs}
  693. try:
  694. column_types = [column_map[name].ch_type for name in column_names]
  695. except KeyError as ex:
  696. raise ProgrammingError(f'Unrecognized column {ex} in table {table}') from None
  697. if len(column_names) != len(column_types):
  698. raise ProgrammingError('Column names do not match column types') from None
  699. return InsertContext(full_table,
  700. column_names,
  701. column_types,
  702. column_oriented=column_oriented,
  703. settings=settings,
  704. data=data)
  705. def min_version(self, version_str: str) -> bool:
  706. """
  707. Determine whether the connected server is at least the submitted version
  708. For Altinity Stable versions like 22.8.15.25.altinitystable
  709. the last condition in the first list comprehension expression is added
  710. :param version_str: A version string consisting of up to 4 integers delimited by dots
  711. :return: True if version_str is greater than the server_version, False if less than
  712. """
  713. try:
  714. server_parts = [int(x) for x in self.server_version.split('.') if x.isnumeric()]
  715. server_parts.extend([0] * (4 - len(server_parts)))
  716. version_parts = [int(x) for x in version_str.split('.')]
  717. version_parts.extend([0] * (4 - len(version_parts)))
  718. except ValueError:
  719. logger.warning('Server %s or requested version %s does not match format of numbers separated by dots',
  720. self.server_version, version_str)
  721. return False
  722. for x, y in zip(server_parts, version_parts):
  723. if x > y:
  724. return True
  725. if x < y:
  726. return False
  727. return True
  728. @abstractmethod
  729. def data_insert(self, context: InsertContext) -> QuerySummary:
  730. """
  731. Subclass implementation of the data insert
  732. :context: InsertContext parameter object
  733. :return: No return, throws an exception if the insert fails
  734. """
  735. @abstractmethod
  736. def raw_insert(self, table: str,
  737. column_names: Optional[Sequence[str]] = None,
  738. insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
  739. settings: Optional[Dict] = None,
  740. fmt: Optional[str] = None,
  741. compression: Optional[str] = None) -> QuerySummary:
  742. """
  743. Insert data already formatted in a bytes object
  744. :param table: Table name (whether qualified with the database name or not)
  745. :param column_names: Sequence of column names
  746. :param insert_block: Binary or string data already in a recognized ClickHouse format
  747. :param settings: Optional dictionary of ClickHouse settings (key/string values)
  748. :param compression: Recognized ClickHouse `Accept-Encoding` header compression value
  749. :param fmt: Valid clickhouse format
  750. """
  751. @abstractmethod
  752. def close(self):
  753. """
  754. Subclass implementation to close the connection to the server/deallocate the client
  755. """
  756. @abstractmethod
  757. def close_connections(self):
  758. """
  759. Subclass implementation to disconnect all "re-used" client connections
  760. """
  761. def _context_query(self, lcls: dict, **overrides):
  762. kwargs = lcls.copy()
  763. kwargs.pop('self')
  764. kwargs.update(overrides)
  765. return self._query_with_context((self.create_query_context(**kwargs)))
  766. def __enter__(self):
  767. return self
  768. def __exit__(self, exc_type, exc_value, exc_traceback):
  769. self.close()