common.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import array
  2. import struct
  3. import sys
  4. from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
  5. from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
  6. from clickhouse_connect.driver.types import Closable
  7. # pylint: disable=invalid-name
  8. must_swap = sys.byteorder == 'big'
  9. int_size = array.array('i').itemsize
  10. low_card_version = 1
  11. array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
  12. decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}
  13. if int_size == 2:
  14. array_map[4] = 'l'
  15. array_sizes = {v: k for k, v in array_map.items()}
  16. array_sizes['f'] = 4
  17. array_sizes['d'] = 8
  18. np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}
  19. def array_type(size: int, signed: bool):
  20. """
  21. Determines the Python array.array code for the requested byte size
  22. :param size: byte size
  23. :param signed: whether int types should be signed or unsigned
  24. :return: Python array.array code
  25. """
  26. try:
  27. code = array_map[size]
  28. except KeyError:
  29. return None
  30. return code if signed else code.upper()
  31. def write_array(code: str, column: Sequence, dest: MutableSequence):
  32. """
  33. Write a column of native Python data matching the array.array code
  34. :param code: Python array.array code matching the column data type
  35. :param column: Column of native Python values
  36. :param dest: Destination byte buffer
  37. """
  38. if len(column) and not isinstance(column[0], (int, float)):
  39. if code in ('f', 'F', 'd', 'D'):
  40. column = [float(x) for x in column]
  41. else:
  42. column = [int(x) for x in column]
  43. try:
  44. buff = struct.Struct(f'<{len(column)}{code}')
  45. dest += buff.pack(*column)
  46. except (TypeError, OverflowError, struct.error) as ex:
  47. raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
  48. 'values into a ClickHouse column that is not Nullable') from ex
  49. def write_uint64(value: int, dest: MutableSequence):
  50. """
  51. Write a single UInt64 value to a binary write buffer
  52. :param value: UInt64 value to write
  53. :param dest: Destination byte buffer
  54. """
  55. dest.extend(value.to_bytes(8, 'little'))
  56. def write_leb128(value: int, dest: MutableSequence):
  57. """
  58. Write a LEB128 encoded integer to a target binary buffer
  59. :param value: Integer value (positive only)
  60. :param dest: Target buffer
  61. """
  62. while True:
  63. b = value & 0x7f
  64. value >>= 7
  65. if value == 0:
  66. dest.append(b)
  67. return
  68. dest.append(0x80 | b)
  69. def decimal_size(prec: int):
  70. """
  71. Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
  72. :param prec: Precision of the Decimal in total number of base 10 digits
  73. :return: Required bit size
  74. """
  75. if prec < 1 or prec > 79:
  76. raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
  77. if prec < 10:
  78. return 32
  79. if prec < 19:
  80. return 64
  81. if prec < 39:
  82. return 128
  83. return 256
  84. def unescape_identifier(x: str) -> str:
  85. if x.startswith('`') and x.endswith('`'):
  86. return x[1:-1]
  87. return x
  88. def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
  89. copy = source.copy() if source else {}
  90. if update:
  91. copy.update(update)
  92. return copy
  93. def empty_gen():
  94. yield from ()
  95. def coerce_int(val: Optional[Union[str, int]]) -> int:
  96. if not val:
  97. return 0
  98. return int(val)
  99. def coerce_bool(val: Optional[Union[str, bool]]):
  100. if not val:
  101. return False
  102. return val in (True, 'True', 'true', '1')
  103. class SliceView(Sequence):
  104. """
  105. Provides a view into a sequence rather than copying. Borrows liberally from
  106. https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
  107. Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
  108. """
  109. slots = ('_source', '_range')
  110. def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
  111. if isinstance(source, SliceView):
  112. self._source = source._source
  113. self._range = source._range[source_slice]
  114. else:
  115. self._source = source
  116. if source_slice is None:
  117. self._range = range(len(source))
  118. else:
  119. self._range = range(len(source))[source_slice]
  120. def __len__(self):
  121. return len(self._range)
  122. def __getitem__(self, i):
  123. if isinstance(i, slice):
  124. return SliceView(self._source, i)
  125. return self._source[self._range[i]]
  126. def __str__(self):
  127. r = self._range
  128. return str(self._source[slice(r.start, r.stop, r.step)])
  129. def __repr__(self):
  130. r = self._range
  131. return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'
  132. def __eq__(self, other):
  133. if self is other:
  134. return True
  135. if len(self) != len(other):
  136. return False
  137. for v, w in zip(self, other):
  138. if v != w:
  139. return False
  140. return True
  141. class StreamContext:
  142. """
  143. Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the
  144. generator is not fully consumed or there is an exception during consumption
  145. """
  146. __slots__ = 'source', 'gen', '_in_context'
  147. def __init__(self, source: Closable, gen: Generator):
  148. self.source = source
  149. self.gen = gen
  150. self._in_context = False
  151. def __iter__(self):
  152. return self
  153. def __next__(self):
  154. if not self._in_context:
  155. raise ProgrammingError('Stream should be used within a context')
  156. return next(self.gen)
  157. def __enter__(self):
  158. if not self.gen:
  159. raise StreamClosedError
  160. self._in_context = True
  161. return self
  162. def __exit__(self, exc_type, exc_val, exc_tb):
  163. self._in_context = False
  164. self.source.close()
  165. self.gen = None