npquery.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import logging
  2. import itertools
  3. from typing import Generator, Sequence, Tuple
  4. from clickhouse_connect.driver.common import empty_gen, StreamContext
  5. from clickhouse_connect.driver.exceptions import StreamClosedError
  6. from clickhouse_connect.driver.types import Closable
  7. from clickhouse_connect.driver.options import np, pd
  8. logger = logging.getLogger(__name__)
  9. # pylint: disable=too-many-instance-attributes
  10. class NumpyResult(Closable):
  11. def __init__(self,
  12. block_gen: Generator[Sequence, None, None] = None,
  13. column_names: Tuple = (),
  14. column_types: Tuple = (),
  15. d_types: Sequence = (),
  16. source: Closable = None):
  17. self.column_names = column_names
  18. self.column_types = column_types
  19. self.np_types = d_types
  20. self.source = source
  21. self.query_id = ''
  22. self.summary = {}
  23. self._block_gen = block_gen or empty_gen()
  24. self._numpy_result = None
  25. self._df_result = None
  26. def _np_stream(self) -> Generator:
  27. if self._block_gen is None:
  28. raise StreamClosedError
  29. block_gen = self._block_gen
  30. self._block_gen = None
  31. if not self.np_types:
  32. return block_gen
  33. d_types = self.np_types
  34. first_type = d_types[0]
  35. if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
  36. self.np_types = first_type
  37. def numpy_blocks():
  38. for block in block_gen:
  39. yield np.array(block, first_type).transpose()
  40. else:
  41. if any(x == np.object_ for x in d_types):
  42. self.np_types = [np.object_] * len(self.np_types)
  43. self.np_types = np.dtype(list(zip(self.column_names, d_types)))
  44. def numpy_blocks():
  45. for block in block_gen:
  46. np_array = np.empty(len(block[0]), dtype=self.np_types)
  47. for col_name, data in zip(self.column_names, block):
  48. np_array[col_name] = data
  49. yield np_array
  50. return numpy_blocks()
  51. def _df_stream(self) -> Generator:
  52. if self._block_gen is None:
  53. raise StreamClosedError
  54. block_gen = self._block_gen
  55. def pd_blocks():
  56. for block in block_gen:
  57. yield pd.DataFrame(dict(zip(self.column_names, block)))
  58. self._block_gen = None
  59. return pd_blocks()
  60. def close_numpy(self):
  61. if not self._block_gen:
  62. raise StreamClosedError
  63. chunk_size = 4
  64. pieces = []
  65. blocks = []
  66. for block in self._np_stream():
  67. blocks.append(block)
  68. if len(blocks) == chunk_size:
  69. pieces.append(np.concatenate(blocks, dtype=self.np_types))
  70. chunk_size *= 2
  71. blocks = []
  72. pieces.extend(blocks)
  73. if len(pieces) > 1:
  74. self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
  75. elif len(pieces) == 1:
  76. self._numpy_result = pieces[0]
  77. else:
  78. self._numpy_result = np.empty((0,))
  79. self.close()
  80. return self
  81. def close_df(self):
  82. if self._block_gen is None:
  83. raise StreamClosedError
  84. chains = [itertools.chain(b) for b in zip(*self._block_gen)]
  85. new_df_series = []
  86. for c in chains:
  87. new_df_series.append(pd.concat([pd.Series(piece, copy=False) for piece in c], copy=False))
  88. self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series)))
  89. self.close()
  90. return self
  91. @property
  92. def np_result(self):
  93. if self._numpy_result is None:
  94. self.close_numpy()
  95. return self._numpy_result
  96. @property
  97. def df_result(self):
  98. if self._df_result is None:
  99. self.close_df()
  100. return self._df_result
  101. @property
  102. def np_stream(self) -> StreamContext:
  103. return StreamContext(self, self._np_stream())
  104. @property
  105. def df_stream(self) -> StreamContext:
  106. return StreamContext(self, self._df_stream())
  107. def close(self):
  108. if self._block_gen is not None:
  109. self._block_gen.close()
  110. self._block_gen = None
  111. if self.source:
  112. self.source.close()
  113. self.source = None