npquery.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import logging
  2. import itertools
  3. from typing import Generator, Sequence, Tuple
  4. from clickhouse_connect.driver.common import empty_gen, StreamContext
  5. from clickhouse_connect.driver.exceptions import StreamClosedError
  6. from clickhouse_connect.driver.types import Closable
  7. from clickhouse_connect.driver.options import np, pd
  8. logger = logging.getLogger(__name__)
  9. # pylint: disable=too-many-instance-attributes
  10. class NumpyResult(Closable):
  11. def __init__(self,
  12. block_gen: Generator[Sequence, None, None] = None,
  13. column_names: Tuple = (),
  14. column_types: Tuple = (),
  15. d_types: Sequence = (),
  16. source: Closable = None):
  17. self.column_names = column_names
  18. self.column_types = column_types
  19. self.np_types = d_types
  20. self.source = source
  21. self.query_id = ''
  22. self.summary = {}
  23. self._block_gen = block_gen or empty_gen()
  24. self._numpy_result = None
  25. self._df_result = None
  26. def _np_stream(self) -> Generator:
  27. if self._block_gen is None:
  28. raise StreamClosedError
  29. block_gen = self._block_gen
  30. self._block_gen = None
  31. if not self.np_types:
  32. return block_gen
  33. d_types = self.np_types
  34. first_type = d_types[0]
  35. if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
  36. self.np_types = first_type
  37. def numpy_blocks():
  38. for block in block_gen:
  39. yield np.array(block, first_type).transpose()
  40. else:
  41. if any(x == np.object_ for x in d_types):
  42. self.np_types = [np.object_] * len(self.np_types)
  43. self.np_types = np.dtype(list(zip(self.column_names, d_types)))
  44. def numpy_blocks():
  45. for block in block_gen:
  46. np_array = np.empty(len(block[0]), dtype=self.np_types)
  47. for col_name, data in zip(self.column_names, block):
  48. np_array[col_name] = data
  49. yield np_array
  50. return numpy_blocks()
  51. def _df_stream(self) -> Generator:
  52. if self._block_gen is None:
  53. raise StreamClosedError
  54. block_gen = self._block_gen
  55. def pd_blocks():
  56. for block in block_gen:
  57. yield pd.DataFrame(dict(zip(self.column_names, block)))
  58. self._block_gen = None
  59. return pd_blocks()
  60. def close_numpy(self):
  61. if not self._block_gen:
  62. raise StreamClosedError
  63. chunk_size = 4
  64. pieces = []
  65. blocks = []
  66. for block in self._np_stream():
  67. blocks.append(block)
  68. if len(blocks) == chunk_size:
  69. pieces.append(np.concatenate(blocks, dtype=self.np_types))
  70. chunk_size *= 2
  71. blocks = []
  72. pieces.extend(blocks)
  73. if len(pieces) > 1:
  74. self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
  75. elif len(pieces) == 1:
  76. self._numpy_result = pieces[0]
  77. else:
  78. self._numpy_result = np.empty((0,))
  79. self.close()
  80. return self
  81. def close_df(self):
  82. if self._block_gen is None:
  83. raise StreamClosedError
  84. bg = self._block_gen
  85. chain = itertools.chain
  86. chains = [chain(b) for b in zip(*bg)]
  87. new_df_series = []
  88. for c in chains:
  89. series = [pd.Series(piece, copy=False) for piece in c if len(piece) > 0]
  90. if len(series) > 0:
  91. new_df_series.append(pd.concat(series, copy=False, ignore_index=True))
  92. self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series)))
  93. self.close()
  94. return self
  95. @property
  96. def np_result(self):
  97. if self._numpy_result is None:
  98. self.close_numpy()
  99. return self._numpy_result
  100. @property
  101. def df_result(self):
  102. if self._df_result is None:
  103. self.close_df()
  104. return self._df_result
  105. @property
  106. def np_stream(self) -> StreamContext:
  107. return StreamContext(self, self._np_stream())
  108. @property
  109. def df_stream(self) -> StreamContext:
  110. return StreamContext(self, self._df_stream())
  111. def close(self):
  112. if self._block_gen is not None:
  113. self._block_gen.close()
  114. self._block_gen = None
  115. if self.source:
  116. self.source.close()
  117. self.source = None