123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- import array
- import struct
- import sys
- from typing import Sequence, MutableSequence, Dict, Optional, Union, Generator
- from clickhouse_connect.driver.exceptions import ProgrammingError, StreamClosedError, DataError
- from clickhouse_connect.driver.types import Closable
- # pylint: disable=invalid-name
- must_swap = sys.byteorder == 'big'
- int_size = array.array('i').itemsize
- low_card_version = 1
- array_map = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
- decimal_prec = {32: 9, 64: 18, 128: 38, 256: 79}
- if int_size == 2:
- array_map[4] = 'l'
- array_sizes = {v: k for k, v in array_map.items()}
- array_sizes['f'] = 4
- array_sizes['d'] = 8
- np_date_types = {0: '[s]', 3: '[ms]', 6: '[us]', 9: '[ns]'}
- def array_type(size: int, signed: bool):
- """
- Determines the Python array.array code for the requested byte size
- :param size: byte size
- :param signed: whether int types should be signed or unsigned
- :return: Python array.array code
- """
- try:
- code = array_map[size]
- except KeyError:
- return None
- return code if signed else code.upper()
- def write_array(code: str, column: Sequence, dest: MutableSequence):
- """
- Write a column of native Python data matching the array.array code
- :param code: Python array.array code matching the column data type
- :param column: Column of native Python values
- :param dest: Destination byte buffer
- """
- if len(column) and not isinstance(column[0], (int, float)):
- if code in ('f', 'F', 'd', 'D'):
- column = [float(x) for x in column]
- else:
- column = [int(x) for x in column]
- try:
- buff = struct.Struct(f'<{len(column)}{code}')
- dest += buff.pack(*column)
- except (TypeError, OverflowError, struct.error) as ex:
- raise DataError('Unable to create Python array. This is usually caused by trying to insert None ' +
- 'values into a ClickHouse column that is not Nullable') from ex
- def write_uint64(value: int, dest: MutableSequence):
- """
- Write a single UInt64 value to a binary write buffer
- :param value: UInt64 value to write
- :param dest: Destination byte buffer
- """
- dest.extend(value.to_bytes(8, 'little'))
- def write_leb128(value: int, dest: MutableSequence):
- """
- Write a LEB128 encoded integer to a target binary buffer
- :param value: Integer value (positive only)
- :param dest: Target buffer
- """
- while True:
- b = value & 0x7f
- value >>= 7
- if value == 0:
- dest.append(b)
- return
- dest.append(0x80 | b)
- def decimal_size(prec: int):
- """
- Determine the bit size of a ClickHouse or Python Decimal needed to store a value of the requested precision
- :param prec: Precision of the Decimal in total number of base 10 digits
- :return: Required bit size
- """
- if prec < 1 or prec > 79:
- raise ArithmeticError(f'Invalid precision {prec} for ClickHouse Decimal type')
- if prec < 10:
- return 32
- if prec < 19:
- return 64
- if prec < 39:
- return 128
- return 256
- def unescape_identifier(x: str) -> str:
- if x.startswith('`') and x.endswith('`'):
- return x[1:-1]
- return x
- def dict_copy(source: Dict = None, update: Optional[Dict] = None) -> Dict:
- copy = source.copy() if source else {}
- if update:
- copy.update(update)
- return copy
- def empty_gen():
- yield from ()
- def coerce_int(val: Optional[Union[str, int]]) -> int:
- if not val:
- return 0
- return int(val)
- def coerce_bool(val: Optional[Union[str, bool]]):
- if not val:
- return False
- return val in (True, 'True', 'true', '1')
- class SliceView(Sequence):
- """
- Provides a view into a sequence rather than copying. Borrows liberally from
- https://gist.github.com/mathieucaroff/0cf094325fb5294fb54c6a577f05a2c1
- Also see the discussion on SO: https://stackoverflow.com/questions/3485475/can-i-create-a-view-on-a-python-list
- """
- slots = ('_source', '_range')
- def __init__(self, source: Sequence, source_slice: Optional[slice] = None):
- if isinstance(source, SliceView):
- self._source = source._source
- self._range = source._range[source_slice]
- else:
- self._source = source
- if source_slice is None:
- self._range = range(len(source))
- else:
- self._range = range(len(source))[source_slice]
- def __len__(self):
- return len(self._range)
- def __getitem__(self, i):
- if isinstance(i, slice):
- return SliceView(self._source, i)
- return self._source[self._range[i]]
- def __str__(self):
- r = self._range
- return str(self._source[slice(r.start, r.stop, r.step)])
- def __repr__(self):
- r = self._range
- return f'SliceView({self._source[slice(r.start, r.stop, r.step)]})'
- def __eq__(self, other):
- if self is other:
- return True
- if len(self) != len(other):
- return False
- for v, w in zip(self, other):
- if v != w:
- return False
- return True
- class StreamContext:
- """
- Wraps a generator and its "source" in a Context. This ensures that the source will be "closed" even if the
- generator is not fully consumed or there is an exception during consumption
- """
- __slots__ = 'source', 'gen', '_in_context'
- def __init__(self, source: Closable, gen: Generator):
- self.source = source
- self.gen = gen
- self._in_context = False
- def __iter__(self):
- return self
- def __next__(self):
- if not self._in_context:
- raise ProgrammingError('Stream should be used within a context')
- return next(self.gen)
- def __enter__(self):
- if not self.gen:
- raise StreamClosedError
- self._in_context = True
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._in_context = False
- self.source.close()
- self.gen = None
|