|
@@ -15,17 +15,17 @@ from urllib3.response import HTTPResponse
|
|
|
from clickhouse_connect import common
|
|
|
from clickhouse_connect.datatypes import registry
|
|
|
from clickhouse_connect.datatypes.base import ClickHouseType
|
|
|
-from clickhouse_connect.driver.ctypes import RespBuffCls
|
|
|
from clickhouse_connect.driver.client import Client
|
|
|
from clickhouse_connect.driver.common import dict_copy, coerce_bool, coerce_int
|
|
|
from clickhouse_connect.driver.compression import available_compression
|
|
|
+from clickhouse_connect.driver.ctypes import RespBuffCls
|
|
|
from clickhouse_connect.driver.exceptions import DatabaseError, OperationalError, ProgrammingError
|
|
|
from clickhouse_connect.driver.external import ExternalData
|
|
|
from clickhouse_connect.driver.httputil import ResponseSource, get_pool_manager, get_response_data, \
|
|
|
default_pool_manager, get_proxy_manager, all_managers, check_env_proxy, check_conn_expiration
|
|
|
from clickhouse_connect.driver.insert import InsertContext
|
|
|
-from clickhouse_connect.driver.summary import QuerySummary
|
|
|
from clickhouse_connect.driver.query import QueryResult, QueryContext, quote_identifier, bind_query
|
|
|
+from clickhouse_connect.driver.summary import QuerySummary
|
|
|
from clickhouse_connect.driver.transform import NativeTransform
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -173,7 +173,10 @@ class HttpClient(Client):
|
|
|
final_query = super()._prep_query(context)
|
|
|
if context.is_insert:
|
|
|
return final_query
|
|
|
- return f'{final_query}\n FORMAT {self._read_format}'
|
|
|
+ fmt = f'\n FORMAT {self._read_format}'
|
|
|
+ if isinstance(final_query, bytes):
|
|
|
+ return final_query + fmt.encode()
|
|
|
+ return final_query + fmt
|
|
|
|
|
|
def _query_with_context(self, context: QueryContext) -> QueryResult:
|
|
|
headers = {}
|
|
@@ -349,20 +352,21 @@ class HttpClient(Client):
|
|
|
return QuerySummary(self._summary(response))
|
|
|
|
|
|
def _error_handler(self, response: HTTPResponse, retried: bool = False) -> None:
|
|
|
- err_str = f'HTTPDriver for {self.url} returned response code {response.status})'
|
|
|
- try:
|
|
|
- err_content = get_response_data(response)
|
|
|
- except Exception: # pylint: disable=broad-except
|
|
|
- err_content = None
|
|
|
- finally:
|
|
|
- response.close()
|
|
|
-
|
|
|
- if err_content:
|
|
|
- if self.show_clickhouse_errors:
|
|
|
+ if self.show_clickhouse_errors:
|
|
|
+ try:
|
|
|
+ err_content = get_response_data(response)
|
|
|
+ except Exception: # pylint: disable=broad-except
|
|
|
+ err_content = None
|
|
|
+ finally:
|
|
|
+ response.close()
|
|
|
+
|
|
|
+ err_str = f'HTTPDriver for {self.url} returned response code {response.status})'
|
|
|
+ if err_content:
|
|
|
err_msg = common.format_error(err_content.decode(errors='backslashreplace'))
|
|
|
- err_str = f':{err_str}\n {err_msg}'
|
|
|
- else:
|
|
|
- err_str = 'The ClickHouse server returned an error.'
|
|
|
+ err_str = f'{err_str}\n {err_msg}'
|
|
|
+ else:
|
|
|
+ err_str = 'The ClickHouse server returned an error.'
|
|
|
+
|
|
|
raise OperationalError(err_str) if retried else DatabaseError(err_str) from None
|
|
|
|
|
|
def _raw_request(self,
|
|
@@ -426,7 +430,8 @@ class HttpClient(Client):
|
|
|
logger.debug('Retrying remotely closed connection')
|
|
|
continue
|
|
|
logger.warning('Unexpected Http Driver Exception')
|
|
|
- raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts} {self.url}') from ex
|
|
|
+ err_url = f' ({self.url})' if self.show_clickhouse_errors else ''
|
|
|
+ raise OperationalError(f'Error {ex} executing HTTP request attempt {attempts}{err_url}') from ex
|
|
|
finally:
|
|
|
if query_session:
|
|
|
self._active_session = None # Make sure we always clear this
|
|
@@ -471,14 +476,16 @@ class HttpClient(Client):
|
|
|
fmt: str,
|
|
|
use_database: bool,
|
|
|
external_data: Optional[ExternalData]):
|
|
|
- final_query, bind_params = bind_query(query, parameters, self.server_tz)
|
|
|
if fmt:
|
|
|
- final_query += f'\n FORMAT {fmt}'
|
|
|
+ query += f'\n FORMAT {fmt}'
|
|
|
+ final_query, bind_params = bind_query(query, parameters, self.server_tz)
|
|
|
params = self._validate_settings(settings or {})
|
|
|
if use_database and self.database:
|
|
|
params['database'] = self.database
|
|
|
params.update(bind_params)
|
|
|
if external_data:
|
|
|
+ if isinstance(final_query, bytes):
|
|
|
+ raise ProgrammingError('Cannot combine binary query data with `External Data`')
|
|
|
body = bytes()
|
|
|
params['query'] = final_query
|
|
|
params.update(external_data.query_params)
|