dataconv.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import array
  2. from datetime import datetime, date, tzinfo
  3. from ipaddress import IPv4Address
  4. from typing import Sequence, Optional, Any
  5. from uuid import UUID, SafeUUID
  6. from clickhouse_connect.driver.common import int_size
  7. from clickhouse_connect.driver.errors import NONE_IN_NULLABLE_COLUMN
  8. from clickhouse_connect.driver.types import ByteSource
  9. from clickhouse_connect.driver.options import np
  10. MONTH_DAYS = (0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365)
  11. MONTH_DAYS_LEAP = (0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366)
  12. def read_ipv4_col(source: ByteSource, num_rows: int):
  13. column = source.read_array('I', num_rows)
  14. fast_ip_v4 = IPv4Address.__new__
  15. new_col = []
  16. app = new_col.append
  17. for x in column:
  18. ipv4 = fast_ip_v4(IPv4Address)
  19. ipv4._ip = x # pylint: disable=protected-access
  20. app(ipv4)
  21. return new_col
  22. def read_datetime_col(source: ByteSource, num_rows: int, tz_info: Optional[tzinfo]):
  23. src_array = source.read_array('I', num_rows)
  24. if tz_info is None:
  25. fts = datetime.utcfromtimestamp
  26. return [fts(ts) for ts in src_array]
  27. fts = datetime.fromtimestamp
  28. return [fts(ts, tz_info) for ts in src_array]
  29. def epoch_days_to_date(days: int) -> date:
  30. cycles400, rem = divmod(days + 134774, 146097)
  31. cycles100, rem = divmod(rem, 36524)
  32. cycles, rem = divmod(rem, 1461)
  33. years, rem = divmod(rem, 365)
  34. year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601
  35. if years == 4 or cycles100 == 4:
  36. return date(year - 1, 12, 31)
  37. m_list = MONTH_DAYS_LEAP if years == 3 and (year == 2000 or year % 100 != 0) else MONTH_DAYS
  38. month = (rem + 24) >> 5
  39. while rem < m_list[month]:
  40. month -= 1
  41. return date(year, month + 1, rem + 1 - m_list[month])
  42. def read_date_col(source: ByteSource, num_rows: int):
  43. column = source.read_array('H', num_rows)
  44. return [epoch_days_to_date(x) for x in column]
  45. def read_date32_col(source: ByteSource, num_rows: int):
  46. column = source.read_array('l' if int_size == 2 else 'i', num_rows)
  47. return [epoch_days_to_date(x) for x in column]
  48. def read_uuid_col(source: ByteSource, num_rows: int):
  49. v = source.read_array('Q', num_rows * 2)
  50. empty_uuid = UUID(int=0)
  51. new_uuid = UUID.__new__
  52. unsafe = SafeUUID.unsafe
  53. oset = object.__setattr__
  54. column = []
  55. app = column.append
  56. for i in range(num_rows):
  57. ix = i << 1
  58. int_value = v[ix] << 64 | v[ix + 1]
  59. if int_value == 0:
  60. app(empty_uuid)
  61. else:
  62. fast_uuid = new_uuid(UUID)
  63. oset(fast_uuid, 'int', int_value)
  64. oset(fast_uuid, 'is_safe', unsafe)
  65. app(fast_uuid)
  66. return column
  67. def read_nullable_array(source: ByteSource, array_type: str, num_rows: int, null_obj: Any):
  68. null_map = source.read_bytes(num_rows)
  69. column = source.read_array(array_type, num_rows)
  70. return [null_obj if null_map[ix] else column[ix] for ix in range(num_rows)]
  71. def build_nullable_column(source: Sequence, null_map: bytes, null_obj: Any):
  72. return [source[ix] if null_map[ix] == 0 else null_obj for ix in range(len(source))]
  73. def build_lc_nullable_column(index: Sequence, keys: array.array, null_obj: Any):
  74. column = []
  75. for key in keys:
  76. if key == 0:
  77. column.append(null_obj)
  78. else:
  79. column.append(index[key])
  80. return column
  81. def to_numpy_array(column: Sequence):
  82. arr = np.empty((len(column),), dtype=np.object)
  83. arr[:] = column
  84. return arr
  85. def pivot(data: Sequence[Sequence], start_row: int, end_row: int) -> Sequence[Sequence]:
  86. return tuple(zip(*data[start_row: end_row]))
  87. def write_str_col(column: Sequence, nullable: bool, encoding: Optional[str], dest: bytearray) -> int:
  88. app = dest.append
  89. for x in column:
  90. if not x:
  91. if not nullable and x is None:
  92. return NONE_IN_NULLABLE_COLUMN
  93. app(0)
  94. else:
  95. if encoding:
  96. x = x.encode(encoding)
  97. else:
  98. x = bytes(x)
  99. sz = len(x)
  100. while True:
  101. b = sz & 0x7f
  102. sz >>= 7
  103. if sz == 0:
  104. app(b)
  105. break
  106. app(0x80 | b)
  107. dest += x
  108. return 0