insert.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import logging
  2. from math import log
  3. from typing import Iterable, Sequence, Optional, Any, Dict, NamedTuple, Generator, Union, TYPE_CHECKING
  4. from clickhouse_connect.driver.query import quote_identifier
  5. from clickhouse_connect.driver.ctypes import data_conv
  6. from clickhouse_connect.driver.context import BaseQueryContext
  7. from clickhouse_connect.driver.options import np, pd, pd_time_test
  8. from clickhouse_connect.driver.exceptions import ProgrammingError
  9. if TYPE_CHECKING:
  10. from clickhouse_connect.datatypes.base import ClickHouseType
  11. logger = logging.getLogger(__name__)
  12. DEFAULT_BLOCK_BYTES = 1 << 21 # Try to generate blocks between 1MB and 2MB in raw size
  13. class InsertBlock(NamedTuple):
  14. prefix: bytes
  15. column_count: int
  16. row_count: int
  17. column_names: Iterable[str]
  18. column_types: Iterable['ClickHouseType']
  19. column_data: Iterable[Sequence[Any]]
  20. # pylint: disable=too-many-instance-attributes
  21. class InsertContext(BaseQueryContext):
  22. """
  23. Reusable Argument/parameter object for inserts.
  24. """
  25. # pylint: disable=too-many-arguments
  26. def __init__(self,
  27. table: str,
  28. column_names: Sequence[str],
  29. column_types: Sequence['ClickHouseType'],
  30. data: Any = None,
  31. column_oriented: Optional[bool] = None,
  32. settings: Optional[Dict[str, Any]] = None,
  33. compression: Optional[Union[str, bool]] = None,
  34. query_formats: Optional[Dict[str, str]] = None,
  35. column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
  36. block_size: Optional[int] = None):
  37. super().__init__(settings, query_formats, column_formats)
  38. self.table = table
  39. self.column_names = column_names
  40. self.column_types = column_types
  41. self.column_oriented = False if column_oriented is None else column_oriented
  42. self.compression = compression
  43. self.req_block_size = block_size
  44. self.block_row_count = DEFAULT_BLOCK_BYTES
  45. self.data = data
  46. self.insert_exception = None
  47. @property
  48. def empty(self) -> bool:
  49. return self._data is None
  50. @property
  51. def data(self):
  52. return self._raw_data
  53. @data.setter
  54. def data(self, data: Any):
  55. self._raw_data = data
  56. self.current_block = 0
  57. self.current_row = 0
  58. self.row_count = 0
  59. self.column_count = 0
  60. self._data = None
  61. if data is None or len(data) == 0:
  62. return
  63. if pd and isinstance(data, pd.DataFrame):
  64. data = self._convert_pandas(data)
  65. self.column_oriented = True
  66. if np and isinstance(data, np.ndarray):
  67. data = self._convert_numpy(data)
  68. if self.column_oriented:
  69. self._next_block_data = self._column_block_data
  70. self._block_columns = data # [SliceView(column) for column in data]
  71. self._block_rows = None
  72. self.column_count = len(data)
  73. self.row_count = len(data[0])
  74. else:
  75. self._next_block_data = self._row_block_data
  76. self._block_rows = data
  77. self._block_columns = None
  78. self.row_count = len(data)
  79. self.column_count = len(data[0])
  80. if self.row_count and self.column_count:
  81. if self.column_count != len(self.column_names):
  82. raise ProgrammingError('Insert data column count does not match column names')
  83. self._data = data
  84. self.block_row_count = self._calc_block_size()
  85. def _calc_block_size(self) -> int:
  86. if self.req_block_size:
  87. return self.req_block_size
  88. row_size = 0
  89. sample_size = min((log(self.row_count) + 1) * 2, 64)
  90. sample_freq = max(1, int(self.row_count / sample_size))
  91. for i, d_type in enumerate(self.column_types):
  92. if d_type.byte_size:
  93. row_size += d_type.byte_size
  94. continue
  95. if self.column_oriented:
  96. col_data = self._data[i]
  97. if sample_freq == 1:
  98. d_size = d_type.data_size(col_data)
  99. else:
  100. sample = [col_data[j] for j in range(0, self.row_count, sample_freq)]
  101. d_size = d_type.data_size(sample)
  102. else:
  103. data = self._data
  104. sample = [data[j][i] for j in range(0, self.row_count, sample_freq)]
  105. d_size = d_type.data_size(sample)
  106. row_size += d_size
  107. return 1 << (21 - int(log(row_size, 2)))
  108. def next_block(self) -> Generator[InsertBlock, None, None]:
  109. while True:
  110. block_end = min(self.current_row + self.block_row_count, self.row_count)
  111. row_count = block_end - self.current_row
  112. if row_count <= 0:
  113. return
  114. if self.current_block == 0:
  115. cols = f" ({', '.join([quote_identifier(x) for x in self.column_names])})"
  116. prefix = f'INSERT INTO {self.table}{cols} FORMAT Native\n'.encode()
  117. else:
  118. prefix = bytes()
  119. self.current_block += 1
  120. data = self._next_block_data(self.current_row, block_end)
  121. yield InsertBlock(prefix, self.column_count, row_count, self.column_names, self.column_types, data)
  122. self.current_row = block_end
  123. def _column_block_data(self, block_start, block_end):
  124. if block_start == 0 and self.row_count <= block_end:
  125. return self._block_columns # Optimization if we don't need to break up the block
  126. return [col[block_start: block_end] for col in self._block_columns]
  127. def _row_block_data(self, block_start, block_end):
  128. return data_conv.pivot(self._block_rows, block_start, block_end)
  129. def _convert_pandas(self, df):
  130. data = []
  131. for df_col_name, col_name, ch_type in zip(df.columns, self.column_names, self.column_types):
  132. df_col = df[df_col_name]
  133. d_type = str(df_col.dtype)
  134. if ch_type.python_type == int:
  135. if 'float' in d_type:
  136. df_col = df_col.round().astype(ch_type.base_type, copy=False)
  137. else:
  138. df_col = df_col.astype(ch_type.base_type, copy=False)
  139. elif 'datetime' in ch_type.np_type and (pd_time_test(df_col) or 'datetime64[ns' in d_type):
  140. div = ch_type.nano_divisor
  141. data.append([None if pd.isnull(x) else x.value // div for x in df_col])
  142. self.column_formats[col_name] = 'int'
  143. continue
  144. if ch_type.nullable:
  145. if d_type == 'object':
  146. # This is ugly, but the multiple replaces seem required as a result of this bug:
  147. # https://github.com/pandas-dev/pandas/issues/29024
  148. df_col = df_col.replace({pd.NaT: None}).replace({np.nan: None})
  149. elif 'Float' in ch_type.base_type:
  150. # This seems to be the only way to convert any null looking things to nan
  151. df_col = df_col.astype(ch_type.np_type)
  152. else:
  153. df_col = df_col.replace({np.nan: None})
  154. data.append(df_col.to_numpy(copy=False))
  155. return data
  156. def _convert_numpy(self, np_array):
  157. if np_array.dtype.names is None:
  158. if 'date' in str(np_array.dtype):
  159. for col_name, col_type in zip(self.column_names, self.column_types):
  160. if 'date' in col_type.np_type:
  161. self.column_formats[col_name] = 'int'
  162. return np_array.astype('int').tolist()
  163. for col_type in self.column_types:
  164. if col_type.byte_size == 0 or col_type.byte_size > np_array.dtype.itemsize:
  165. return np_array.tolist()
  166. return np_array
  167. if set(self.column_names).issubset(set(np_array.dtype.names)):
  168. data = [np_array[col_name] for col_name in self.column_names]
  169. else:
  170. # Column names don't match, so we have to assume they are in order
  171. data = [np_array[col_name] for col_name in np_array.dtype.names]
  172. for ix, (col_name, col_type) in enumerate(zip(self.column_names, self.column_types)):
  173. d_type = data[ix].dtype
  174. if 'date' in str(d_type) and 'date' in col_type.np_type:
  175. self.column_formats[col_name] = 'int'
  176. data[ix] = data[ix].astype(int).tolist()
  177. elif col_type.byte_size == 0 or col_type.byte_size > d_type.itemsize:
  178. data[ix] = data[ix].tolist()
  179. self.column_oriented = True
  180. return data