|
@@ -19,7 +19,8 @@ from clickhouse_connect.driver.external import ExternalData
|
|
|
from clickhouse_connect.driver.insert import InsertContext
|
|
|
from clickhouse_connect.driver.summary import QuerySummary
|
|
|
from clickhouse_connect.driver.models import ColumnDef, SettingDef, SettingStatus
|
|
|
-from clickhouse_connect.driver.query import QueryResult, to_arrow, QueryContext, arrow_buffer, quote_identifier
|
|
|
+from clickhouse_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer, \
|
|
|
+ quote_identifier
|
|
|
|
|
|
io.DEFAULT_BUFFER_SIZE = 1024 * 256
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -255,7 +256,8 @@ class Client(ABC):
|
|
|
settings: Optional[Dict[str, Any]] = None,
|
|
|
fmt: str = None,
|
|
|
use_database: bool = True,
|
|
|
- external_data: Optional[ExternalData] = None) -> bytes:
|
|
|
+ external_data: Optional[ExternalData] = None,
|
|
|
+ stream: bool = False) -> Union[bytes, io.IOBase]:
|
|
|
"""
|
|
|
Query method that simply returns the raw ClickHouse format bytes
|
|
|
:param query: Query statement/format string
|
|
@@ -348,7 +350,7 @@ class Client(ABC):
|
|
|
"""
|
|
|
Query method that returns the results as a StreamContext. For parameter values, see the
|
|
|
create_query_context method
|
|
|
- :return: Pandas dataframe representing the result set
|
|
|
+ :return: Generator that yields a Pandas dataframe per block representing the result set
|
|
|
"""
|
|
|
return self._context_query(locals(), use_numpy=True,
|
|
|
as_pandas=True,
|
|
@@ -462,6 +464,39 @@ class Client(ABC):
|
|
|
:param external_data ClickHouse "external data" to send with query
|
|
|
:return: PyArrow.Table
|
|
|
"""
|
|
|
+ settings = self._update_arrow_settings(settings, use_strings)
|
|
|
+ return to_arrow(self.raw_query(query,
|
|
|
+ parameters,
|
|
|
+ settings,
|
|
|
+ fmt='Arrow',
|
|
|
+ external_data=external_data))
|
|
|
+
|
|
|
+ def query_arrow_stream(self,
|
|
|
+ query: str,
|
|
|
+ parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
|
|
|
+ settings: Optional[Dict[str, Any]] = None,
|
|
|
+ use_strings: Optional[bool] = None,
|
|
|
+ external_data: Optional[ExternalData] = None) -> StreamContext:
|
|
|
+ """
|
|
|
+ Query method that returns the results as a stream of Arrow tables
|
|
|
+ :param query: Query statement/format string
|
|
|
+ :param parameters: Optional dictionary used to format the query
|
|
|
+ :param settings: Optional dictionary of ClickHouse settings (key/string values)
|
|
|
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
|
|
|
+ :param external_data ClickHouse "external data" to send with query
|
|
|
+ :return: Generator that yields a PyArrow.Table for per block representing the result set
|
|
|
+ """
|
|
|
+ settings = self._update_arrow_settings(settings, use_strings)
|
|
|
+ return to_arrow_batches(self.raw_query(query,
|
|
|
+ parameters,
|
|
|
+ settings,
|
|
|
+ fmt='ArrowStream',
|
|
|
+ external_data=external_data,
|
|
|
+ stream=True))
|
|
|
+
|
|
|
+ def _update_arrow_settings(self,
|
|
|
+ settings: Optional[Dict[str, Any]],
|
|
|
+ use_strings: Optional[bool]) -> Dict[str, Any]:
|
|
|
settings = dict_copy(settings)
|
|
|
if self.database:
|
|
|
settings['database'] = self.database
|
|
@@ -473,11 +508,7 @@ class Client(ABC):
|
|
|
if not str_status.is_writable:
|
|
|
raise OperationalError(f'Cannot change readonly {arrow_str_setting} to {use_strings}')
|
|
|
settings[arrow_str_setting] = '1' if use_strings else '0'
|
|
|
- return to_arrow(self.raw_query(query,
|
|
|
- parameters,
|
|
|
- settings,
|
|
|
- fmt='Arrow',
|
|
|
- external_data=external_data))
|
|
|
+ return settings
|
|
|
|
|
|
@abstractmethod
|
|
|
def command(self,
|