httpclient.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. import json
  2. import logging
  3. import re
  4. import uuid
  5. from base64 import b64encode
  6. from typing import Optional, Dict, Any, Sequence, Union, List, Callable, Generator, BinaryIO
  7. from urllib.parse import urlencode
  8. from urllib3 import Timeout
  9. from urllib3.exceptions import HTTPError
  10. from urllib3.poolmanager import PoolManager
  11. from urllib3.response import HTTPResponse
  12. from clickhouse_connect import common
  13. from clickhouse_connect.datatypes import registry
  14. from clickhouse_connect.datatypes.base import ClickHouseType
  15. from clickhouse_connect.driver.ctypes import RespBuffCls
  16. from clickhouse_connect.driver.client import Client
  17. from clickhouse_connect.driver.common import dict_copy, coerce_bool, coerce_int
  18. from clickhouse_connect.driver.compression import available_compression
  19. from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError
  20. from clickhouse_connect.driver.external import ExternalData
  21. from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \
  22. default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_reset
  23. from clickhouse_connect.driver.insert import InsertContext
  24. from clickhouse_connect.driver.summary import QuerySummary
  25. from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query
  26. from clickhouse_connect.driver.transform import NativeTransform
  27. logger = logging.getLogger(__name__)
  28. columns_only_re = re.compile(r'LIMIT 0\s*$', re.IGNORECASE)
  29. # pylint: disable=too-many-instance-attributes
  30. class HttpClient(Client):
  31. params = {}
  32. valid_transport_settings = {'database', 'buffer_size', 'session_id',
  33. 'compress', 'decompress', 'session_timeout',
  34. 'session_check', 'query_id', 'quota_key',
  35. 'wait_end_of_query', 'client_protocol_version'}
  36. optional_transport_settings = {'send_progress_in_http_headers',
  37. 'http_headers_progress_interval_ms',
  38. 'enable_http_compression'}
  39. _owns_pool_manager = False
  40. # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-statements,unused-argument
  41. def __init__(self,
  42. interface: str,
  43. host: str,
  44. port: int,
  45. username: str,
  46. password: str,
  47. database: str,
  48. compress: Union[bool, str] = True,
  49. query_limit: int = 0,
  50. query_retries: int = 2,
  51. connect_timeout: int = 10,
  52. send_receive_timeout: int = 300,
  53. client_name: Optional[str] = None,
  54. verify: bool = True,
  55. ca_cert: Optional[str] = None,
  56. client_cert: Optional[str] = None,
  57. client_cert_key: Optional[str] = None,
  58. session_id: Optional[str] = None,
  59. settings: Optional[Dict[str, Any]] = None,
  60. pool_mgr: Optional[PoolManager] = None,
  61. http_proxy: Optional[str] = None,
  62. https_proxy: Optional[str] = None,
  63. server_host_name: Optional[str] = None,
  64. apply_server_timezone: Optional[Union[str, bool]] = True):
  65. """
  66. Create an HTTP ClickHouse Connect client
  67. See clickhouse_connect.get_client for parameters
  68. """
  69. self.url = f'{interface}://{host}:{port}'
  70. self.headers = {}
  71. ch_settings = settings or {}
  72. self.http = pool_mgr
  73. if interface == 'https':
  74. if not https_proxy:
  75. https_proxy = check_env_proxy('https', host, port)
  76. if client_cert:
  77. if not username:
  78. raise ProgrammingError('username parameter is required for Mutual TLS authentication')
  79. self.headers['X-ClickHouse-User'] = username
  80. self.headers['X-ClickHouse-SSL-Certificate-Auth'] = 'on'
  81. verify = coerce_bool(verify)
  82. # pylint: disable=too-many-boolean-expressions
  83. if not self.http and (server_host_name or ca_cert or client_cert or not verify or https_proxy):
  84. options = {
  85. 'ca_cert': ca_cert,
  86. 'client_cert': client_cert,
  87. 'verify': verify,
  88. 'client_cert_key': client_cert_key
  89. }
  90. if server_host_name:
  91. if verify:
  92. options['assert_hostname'] = server_host_name
  93. options['server_hostname'] = server_host_name
  94. self.http = get_pool_manager(https_proxy=https_proxy, **options)
  95. self._owns_pool_manager = True
  96. if not self.http:
  97. if not http_proxy:
  98. http_proxy = check_env_proxy('http', host, port)
  99. if http_proxy:
  100. self.http = get_proxy_manager(host, http_proxy)
  101. else:
  102. self.http = default_pool_manager()
  103. if not client_cert and username:
  104. self.headers['Authorization'] = 'Basic ' + b64encode(f'{username}:{password}'.encode()).decode()
  105. self.headers['User-Agent'] = common.build_client_name(client_name)
  106. self._read_format = self._write_format = 'Native'
  107. self._transform = NativeTransform()
  108. connect_timeout, send_receive_timeout = coerce_int(connect_timeout), coerce_int(send_receive_timeout)
  109. self.timeout = Timeout(connect=connect_timeout, read=send_receive_timeout)
  110. self.http_retries = 1
  111. self._send_progress = None
  112. self._send_comp_setting = False
  113. self._progress_interval = None
  114. self._active_session = None
  115. if session_id:
  116. ch_settings['session_id'] = session_id
  117. elif 'session_id' not in ch_settings and common.get_setting('autogenerate_session_id'):
  118. ch_settings['session_id'] = str(uuid.uuid4())
  119. if coerce_bool(compress):
  120. compression = ','.join(available_compression)
  121. self.write_compression = available_compression[0]
  122. elif compress and compress not in ('False', 'false', '0'):
  123. if compress not in available_compression:
  124. raise ProgrammingError(f'Unsupported compression method {compress}')
  125. compression = compress
  126. self.write_compression = compress
  127. else:
  128. compression = None
  129. super().__init__(database=database,
  130. uri=self.url,
  131. query_limit=query_limit,
  132. query_retries=query_retries,
  133. server_host_name=server_host_name,
  134. apply_server_timezone=apply_server_timezone)
  135. self.params = self._validate_settings(ch_settings)
  136. comp_setting = self._setting_status('enable_http_compression')
  137. self._send_comp_setting = not comp_setting.is_set and comp_setting.is_writable
  138. if comp_setting.is_set or comp_setting.is_writable:
  139. self.compression = compression
  140. send_setting = self._setting_status('send_progress_in_http_headers')
  141. self._send_progress = not send_setting.is_set and send_setting.is_writable
  142. if (send_setting.is_set or send_setting.is_writable) and \
  143. self._setting_status('http_headers_progress_interval_ms').is_writable:
  144. self._progress_interval = str(min(120000, max(10000, (send_receive_timeout - 5) * 1000)))
  145. def set_client_setting(self, key, value):
  146. str_value = self._validate_setting(key, value, common.get_setting('invalid_setting_action'))
  147. if str_value is not None:
  148. self.params[key] = str_value
  149. def get_client_setting(self, key) -> Optional[str]:
  150. values = self.params.get(key)
  151. return values[0] if values else None
  152. def _prep_query(self, context: QueryContext):
  153. final_query = super()._prep_query(context)
  154. if context.is_insert:
  155. return final_query
  156. return f'{final_query}\n FORMAT {self._write_format}'
  157. def _query_with_context(self, context: QueryContext) -> QueryResult:
  158. headers = {}
  159. params = {}
  160. if self.database:
  161. params['database'] = self.database
  162. if self.protocol_version:
  163. params['client_protocol_version'] = self.protocol_version
  164. context.block_info = True
  165. params.update(context.bind_params)
  166. params.update(self._validate_settings(context.settings))
  167. if columns_only_re.search(context.uncommented_query):
  168. response = self._raw_request(f'{context.final_query}\n FORMAT JSON',
  169. params, headers, retries=self.query_retries)
  170. json_result = json.loads(response.data)
  171. # ClickHouse will respond with a JSON object of meta, data, and some other objects
  172. # We just grab the column names and column types from the metadata sub object
  173. names: List[str] = []
  174. types: List[ClickHouseType] = []
  175. for col in json_result['meta']:
  176. names.append(col['name'])
  177. types.append(registry.get_from_name(col['type']))
  178. return QueryResult([], None, tuple(names), tuple(types))
  179. if self.compression:
  180. headers['Accept-Encoding'] = self.compression
  181. if self._send_comp_setting:
  182. params['enable_http_compression'] = '1'
  183. final_query = self._prep_query(context)
  184. if context.external_data:
  185. body = bytes()
  186. params['query'] = final_query
  187. params.update(context.external_data.query_params)
  188. fields = context.external_data.form_data
  189. else:
  190. body = final_query
  191. fields = None
  192. headers['Content-Type'] = 'text/plain; charset=utf-8'
  193. response = self._raw_request(body,
  194. params,
  195. headers,
  196. stream=True,
  197. retries=self.query_retries,
  198. fields=fields,
  199. server_wait=not context.streaming)
  200. byte_source = RespBuffCls(ResponseSource(response)) # pylint: disable=not-callable
  201. context.set_response_tz(self._check_tz_change(response.headers.get('X-ClickHouse-Timezone')))
  202. query_result = self._transform.parse_response(byte_source, context)
  203. query_result.summary = self._summary(response)
  204. return query_result
  205. def data_insert(self, context: InsertContext) -> QuerySummary:
  206. """
  207. See BaseClient doc_string for this method
  208. """
  209. if context.empty:
  210. logger.debug('No data included in insert, skipping')
  211. return QuerySummary()
  212. def error_handler(resp: HTTPResponse):
  213. # If we actually had a local exception when building the insert, throw that instead
  214. if context.insert_exception:
  215. ex = context.insert_exception
  216. context.insert_exception = None
  217. raise ex
  218. self._error_handler(resp)
  219. headers = {'Content-Type': 'application/octet-stream'}
  220. if context.compression is None:
  221. context.compression = self.write_compression
  222. if context.compression:
  223. headers['Content-Encoding'] = context.compression
  224. block_gen = self._transform.build_insert(context)
  225. params = {}
  226. if self.database:
  227. params['database'] = self.database
  228. params.update(self._validate_settings(context.settings))
  229. response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False)
  230. logger.debug('Context insert response code: %d, content: %s', response.status, response.data)
  231. context.data = None
  232. return QuerySummary(self._summary(response))
  233. def raw_insert(self, table: str = None,
  234. column_names: Optional[Sequence[str]] = None,
  235. insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
  236. settings: Optional[Dict] = None,
  237. fmt: Optional[str] = None,
  238. compression: Optional[str] = None) -> QuerySummary:
  239. """
  240. See BaseClient doc_string for this method
  241. """
  242. params = {}
  243. headers = {'Content-Type': 'application/octet-stream'}
  244. if compression:
  245. headers['Content-Encoding'] = compression
  246. if table:
  247. cols = f" ({', '.join([quote_identifier(x) for x in column_names])})" if column_names is not None else ''
  248. query = f'INSERT INTO {table}{cols} FORMAT {fmt if fmt else self._write_format}'
  249. if not compression and isinstance(insert_block, str):
  250. insert_block = query + '\n' + insert_block
  251. elif not compression and isinstance(insert_block, (bytes, bytearray, BinaryIO)):
  252. insert_block = (query + '\n').encode() + insert_block
  253. else:
  254. params['query'] = query
  255. if self.database:
  256. params['database'] = self.database
  257. params.update(self._validate_settings(settings or {}))
  258. response = self._raw_request(insert_block, params, headers, server_wait=False)
  259. logger.debug('Raw insert response code: %d, content: %s', response.status, response.data)
  260. return QuerySummary(self._summary(response))
  261. @staticmethod
  262. def _summary(response: HTTPResponse):
  263. summary = {}
  264. if 'X-ClickHouse-Summary' in response.headers:
  265. try:
  266. summary = json.loads(response.headers['X-ClickHouse-Summary'])
  267. except json.JSONDecodeError:
  268. pass
  269. summary['query_id'] = response.headers.get('X-ClickHouse-Query-Id', '')
  270. return summary
  271. def command(self,
  272. cmd,
  273. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  274. data: Union[str, bytes] = None,
  275. settings: Optional[Dict] = None,
  276. use_database: int = True,
  277. external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
  278. """
  279. See BaseClient doc_string for this method
  280. """
  281. cmd, params = bind_query(cmd, parameters, self.server_tz)
  282. headers = {}
  283. payload = None
  284. fields = None
  285. if external_data:
  286. if data:
  287. raise ProgrammingError('Cannot combine command data with external data') from None
  288. fields = external_data.form_data
  289. params.update(external_data.query_params)
  290. elif isinstance(data, str):
  291. headers['Content-Type'] = 'text/plain; charset=utf-8'
  292. payload = data.encode()
  293. elif isinstance(data, bytes):
  294. headers['Content-Type'] = 'application/octet-stream'
  295. payload = data
  296. if payload is None and not cmd:
  297. raise ProgrammingError('Command sent without query or recognized data') from None
  298. if payload or fields:
  299. params['query'] = cmd
  300. else:
  301. payload = cmd
  302. if use_database and self.database:
  303. params['database'] = self.database
  304. params.update(self._validate_settings(settings or {}))
  305. method = 'POST' if payload or fields else 'GET'
  306. response = self._raw_request(payload, params, headers, method, fields=fields)
  307. if response.data:
  308. try:
  309. result = response.data.decode()[:-1].split('\t')
  310. if len(result) == 1:
  311. try:
  312. return int(result[0])
  313. except ValueError:
  314. return result[0]
  315. return result
  316. except UnicodeDecodeError:
  317. return str(response.data)
  318. return QuerySummary(self._summary(response))
  319. def _error_handler(self, response: HTTPResponse, retried: bool = False) -> None:
  320. err_str = f'HTTPDriver for {self.url} returned response code {response.status})'
  321. try:
  322. err_content = get_response_data(response)
  323. except Exception: # pylint: disable=broad-except
  324. err_content = None
  325. finally:
  326. response.close()
  327. if err_content:
  328. err_msg = common.format_error(err_content.decode(errors='backslashreplace'))
  329. err_str = f':{err_str}\n {err_msg}'
  330. raise OperationalError(err_str) if retried else DatabaseError(err_str) from None
  331. def _raw_request(self,
  332. data,
  333. params: Dict[str, str],
  334. headers: Optional[Dict[str, Any]] = None,
  335. method: str = 'POST',
  336. retries: int = 0,
  337. stream: bool = False,
  338. server_wait: bool = True,
  339. fields: Optional[Dict[str, tuple]] = None,
  340. error_handler: Callable = None) -> HTTPResponse:
  341. if isinstance(data, str):
  342. data = data.encode()
  343. headers = dict_copy(self.headers, headers)
  344. attempts = 0
  345. if server_wait:
  346. params['wait_end_of_query'] = '1'
  347. # We can't actually read the progress headers, but we enable them so ClickHouse sends something
  348. # to keep the connection alive when waiting for long-running queries and (2) to get summary information
  349. # if not streaming
  350. if self._send_progress:
  351. params['send_progress_in_http_headers'] = '1'
  352. if self._progress_interval:
  353. params['http_headers_progress_interval_ms'] = self._progress_interval
  354. final_params = dict_copy(self.params, params)
  355. url = f'{self.url}?{urlencode(final_params)}'
  356. kwargs = {
  357. 'headers': headers,
  358. 'timeout': self.timeout,
  359. 'retries': self.http_retries,
  360. 'preload_content': not stream
  361. }
  362. if self.server_host_name:
  363. kwargs['assert_same_host'] = False
  364. kwargs['headers'].update({'Host': self.server_host_name})
  365. if fields:
  366. kwargs['fields'] = fields
  367. else:
  368. kwargs['body'] = data
  369. check_conn_reset(self.http)
  370. query_session = final_params.get('session_id')
  371. while True:
  372. attempts += 1
  373. if query_session:
  374. if query_session == self._active_session:
  375. raise ProgrammingError('Attempt to execute concurrent queries within the same session.' +
  376. 'Please use a separate client instance per thread/process.')
  377. # There is a race condition here when using multiprocessing -- in that case the server will
  378. # throw an error instead, but in most cases this more helpful error will be thrown first
  379. self._active_session = query_session
  380. try:
  381. response = self.http.request(method, url, **kwargs)
  382. except HTTPError as ex:
  383. if isinstance(ex.__context__, ConnectionResetError):
  384. # The server closed the connection, probably because the Keep Alive has expired
  385. # We should be safe to retry, as ClickHouse should not have processed anything on a connection
  386. # that it killed. We also only retry this once, as multiple disconnects are unlikely to be
  387. # related to the Keep Alive settings
  388. if attempts == 1:
  389. logger.debug('Retrying remotely closed connection')
  390. continue
  391. logger.warning('Unexpected Http Driver Exception')
  392. raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts} {self.url}') from ex
  393. finally:
  394. if query_session:
  395. self._active_session = None # Make sure we always clear this
  396. if 200 <= response.status < 300:
  397. return response
  398. if response.status in (429, 503, 504):
  399. if attempts > retries:
  400. self._error_handler(response, True)
  401. logger.debug('Retrying requests with status code %d', response.status)
  402. elif error_handler:
  403. error_handler(response)
  404. else:
  405. self._error_handler(response)
  406. def ping(self):
  407. """
  408. See BaseClient doc_string for this method
  409. """
  410. try:
  411. response = self.http.request('GET', f'{self.url}/ping', timeout=3)
  412. return 200 <= response.status < 300
  413. except HTTPError:
  414. logger.debug('ping failed', exc_info=True)
  415. return False
  416. def raw_query(self, query: str,
  417. parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
  418. settings: Optional[Dict[str, Any]] = None, fmt: str = None,
  419. use_database: bool = True, external_data: Optional[ExternalData] = None) -> bytes:
  420. """
  421. See BaseClient doc_string for this method
  422. """
  423. final_query, bind_params = bind_query(query, parameters, self.server_tz)
  424. if fmt:
  425. final_query += f'\n FORMAT {fmt}'
  426. params = self._validate_settings(settings or {})
  427. if use_database and self.database:
  428. params['database'] = self.database
  429. params.update(bind_params)
  430. if external_data:
  431. body = bytes()
  432. params['query'] = final_query
  433. params.update(external_data.query_params)
  434. fields = external_data.form_data
  435. else:
  436. body = final_query
  437. fields = None
  438. return self._raw_request(body, params, fields=fields).data
  439. def close(self):
  440. if self._owns_pool_manager:
  441. self.http.clear()
  442. all_managers.pop(self.http, None)