common.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import array
  2. import struct
  3. import sys
  4. from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
  5. from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
  6. from clickhouse_connect.driver.types import Closable
  7. # pylint: disable=invalid-name
  8. must_swap = sys.byteorder == 'big'
  9. int_size = array.array('i').itemsize
  10. low_card_version = 1
  11. array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
  12. decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}
  13. if int_size == 2:
  14. array_map[4] = 'l'
  15. array_sizes = {v: k for k, v in array_map.items()}
  16. array_sizes['f'] = 4
  17. array_sizes['d'] = 8
  18. np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}
  19. def array_type(size: int, signed: bool):
  20. """
  21. Determines the Python array.array code for the requested byte size
  22. :param size: byte size
  23. :param signed: whether int types should be signed or unsigned
  24. :return: Python array.array code
  25. """
  26. try:
  27. code = array_map[size]
  28. except KeyError:
  29. return None
  30. return code if signed else code.upper()
  31. def write_array(code: str, column: Sequence, dest: MutableSequence):
  32. """
  33. Write a column of native Python data matching the array.array code
  34. :param code: Python array.array code matching the column data type
  35. :param column: Column of native Python values
  36. :param dest: Destination byte buffer
  37. """
  38. if len(column) and not isinstance(column[0], (int, float)):
  39. if code in ('f', 'F', 'd', 'D'):
  40. column = [float(x) for x in column]
  41. else:
  42. column = [int(x) for x in column]
  43. try:
  44. buff = struct.Struct(f'<{len(column)}{code}')
  45. dest += buff.pack(*column)
  46. except (TypeError, OverflowError, struct.error) as ex:
  47. raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
  48. 'values into a ClickHouse column that is not Nullable') from ex
  49. def write_uint64(value: int, dest: MutableSequence):
  50. """
  51. Write a single UInt64 value to a binary write buffer
  52. :param value: UInt64 value to write
  53. :param dest: Destination byte buffer
  54. """
  55. dest.extend(value.to_bytes(8, 'little'))
  56. def write_leb128(value: int, dest: MutableSequence):
  57. """
  58. Write a LEB128 encoded integer to a target binary buffer
  59. :param value: Integer value (positive only)
  60. :param dest: Target buffer
  61. """
  62. while True:
  63. b = value & 0x7f
  64. value >>= 7
  65. if value == 0:
  66. dest.append(b)
  67. return
  68. dest.append(0x80 | b)
  69. def decimal_size(prec: int):
  70. """
  71. Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
  72. :param prec: Precision of the Decimal in total number of base 10 digits
  73. :return: Required bit size
  74. """
  75. if prec < 1 or prec > 79:
  76. raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
  77. if prec < 10:
  78. return 32
  79. if prec < 19:
  80. return 64
  81. if prec < 39:
  82. return 128
  83. return 256
  84. def unescape_identifier(x: str) -> str:
  85. if x.startswith('`') and x.endswith('`'):
  86. return x[1:-1]
  87. return x
  88. def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
  89. copy = source.copy() if source else {}
  90. if update:
  91. copy.update(update)
  92. return copy
  93. def dict_add(source: Dict, key: str, value: any) -> Dict:
  94. if value is not None:
  95. source[key] = value
  96. return source
  97. def empty_gen():
  98. yield from ()
  99. def coerce_int(val: Optional[Union[str, int]]) -> int:
  100. if not val:
  101. return 0
  102. return int(val)
  103. def coerce_bool(val: Optional[Union[str, bool]]):
  104. if not val:
  105. return False
  106. return val is True or (isinstance(val, str) and val.lower() in ('true', '1', 'y', 'yes'))
  107. class SliceView(Sequence):
  108. """
  109. Provides a view into a sequence rather than copying. Borrows liberally from
  110. https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
  111. Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
  112. """
  113. slots = ('_source', '_range')
  114. def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
  115. if isinstance(source, SliceView):
  116. self._source = source._source
  117. self._range = source._range[source_slice]
  118. else:
  119. self._source = source
  120. if source_slice is None:
  121. self._range = range(len(source))
  122. else:
  123. self._range = range(len(source))[source_slice]
  124. def __len__(self):
  125. return len(self._range)
  126. def __getitem__(self, i):
  127. if isinstance(i, slice):
  128. return SliceView(self._source, i)
  129. return self._source[self._range[i]]
  130. def __str__(self):
  131. r = self._range
  132. return str(self._source[slice(r.start, r.stop, r.step)])
  133. def __repr__(self):
  134. r = self._range
  135. return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'
  136. def __eq__(self, other):
  137. if self is other:
  138. return True
  139. if len(self) != len(other):
  140. return False
  141. for v, w in zip(self, other):
  142. if v != w:
  143. return False
  144. return True
  145. class StreamContext:
  146. """
  147. Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the
  148. generator is not fully consumed or there is an exception during consumption
  149. """
  150. __slots__ = 'source', 'gen', '_in_context'
  151. def __init__(self, source: Closable, gen: Generator):
  152. self.source = source
  153. self.gen = gen
  154. self._in_context = False
  155. def __iter__(self):
  156. return self
  157. def __next__(self):
  158. if not self._in_context:
  159. raise ProgrammingError('Stream should be used within a context')
  160. return next(self.gen)
  161. def __enter__(self):
  162. if not self.gen:
  163. raise StreamClosedError
  164. self._in_context = True
  165. return self
  166. def __exit__(self, exc_type, exc_val, exc_tb):
  167. self._in_context = False
  168. self.source.close()
  169. self.gen = None