npquery.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import logging
  2. from typing import Generator, Sequence, Tuple
  3. from clickhouse_connect.driver.common import empty_gen, StreamContext
  4. from clickhouse_connect.driver.exceptions import StreamClosedError
  5. from clickhouse_connect.driver.types import Closable
  6. from clickhouse_connect.driver.options import np, pd
  7. logger = logging.getLogger(__name__)
  8. # pylint: disable=too-many-instance-attributes
  9. class NumpyResult(Closable):
  10. def __init__(self,
  11. block_gen: Generator[Sequence, None, None] = None,
  12. column_names: Tuple = (),
  13. column_types: Tuple = (),
  14. d_types: Sequence = (),
  15. source: Closable = None):
  16. self.column_names = column_names
  17. self.column_types = column_types
  18. self.np_types = d_types
  19. self.source = source
  20. self.query_id = ''
  21. self.summary = {}
  22. self._block_gen = block_gen or empty_gen()
  23. self._numpy_result = None
  24. self._df_result = None
  25. def _np_stream(self) -> Generator:
  26. if self._block_gen is None:
  27. raise StreamClosedError
  28. block_gen = self._block_gen
  29. self._block_gen = None
  30. if not self.np_types:
  31. return block_gen
  32. d_types = self.np_types
  33. first_type = d_types[0]
  34. if first_type != np.object_ and all(np.dtype(np_type) == first_type for np_type in d_types):
  35. self.np_types = first_type
  36. def numpy_blocks():
  37. for block in block_gen:
  38. yield np.array(block, first_type).transpose()
  39. else:
  40. if any(x == np.object_ for x in d_types):
  41. self.np_types = [np.object_] * len(self.np_types)
  42. self.np_types = np.dtype(list(zip(self.column_names, d_types)))
  43. def numpy_blocks():
  44. for block in block_gen:
  45. np_array = np.empty(len(block[0]), dtype=self.np_types)
  46. for col_name, data in zip(self.column_names, block):
  47. np_array[col_name] = data
  48. yield np_array
  49. return numpy_blocks()
  50. def _df_stream(self) -> Generator:
  51. if self._block_gen is None:
  52. raise StreamClosedError
  53. block_gen = self._block_gen
  54. def pd_blocks():
  55. for block in block_gen:
  56. yield pd.DataFrame(dict(zip(self.column_names, block)))
  57. self._block_gen = None
  58. return pd_blocks()
  59. def close_numpy(self):
  60. if not self._block_gen:
  61. raise StreamClosedError
  62. chunk_size = 4
  63. pieces = []
  64. blocks = []
  65. for block in self._np_stream():
  66. blocks.append(block)
  67. if len(blocks) == chunk_size:
  68. pieces.append(np.concatenate(blocks, dtype=self.np_types))
  69. chunk_size *= 2
  70. blocks = []
  71. pieces.extend(blocks)
  72. if len(pieces) > 1:
  73. self._numpy_result = np.concatenate(pieces, dtype=self.np_types)
  74. elif len(pieces) == 1:
  75. self._numpy_result = pieces[0]
  76. else:
  77. self._numpy_result = np.empty((0,))
  78. self.close()
  79. return self
  80. def close_df(self):
  81. pieces = list(self._df_stream())
  82. pieces = [piece for piece in pieces if not piece.empty]
  83. if len(pieces) > 1:
  84. self._df_result = pd.concat(pieces, ignore_index=True)
  85. elif len(pieces) == 1:
  86. self._df_result = pieces[0]
  87. else:
  88. self._df_result = pd.DataFrame()
  89. self.close()
  90. return self
  91. @property
  92. def np_result(self):
  93. if self._numpy_result is None:
  94. self.close_numpy()
  95. return self._numpy_result
  96. @property
  97. def df_result(self):
  98. if self._df_result is None:
  99. self.close_df()
  100. return self._df_result
  101. @property
  102. def np_stream(self) -> StreamContext:
  103. return StreamContext(self, self._np_stream())
  104. @property
  105. def df_stream(self) -> StreamContext:
  106. return StreamContext(self, self._df_stream())
  107. def close(self):
  108. if self._block_gen is not None:
  109. self._block_gen.close()
  110. self._block_gen = None
  111. if self.source:
  112. self.source.close()
  113. self.source = None