dataconv.pyx 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. import struct
  2. from typing import Sequence, Optional
  3. import array
  4. from datetime import datetime, date
  5. import cython
  6. from .buffer cimport ResponseBuffer
  7. from cpython cimport Py_INCREF, Py_DECREF
  8. from cpython.buffer cimport PyBUF_READ
  9. from cpython.mem cimport PyMem_Free, PyMem_Malloc
  10. from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
  11. from cpython.bytearray cimport PyByteArray_GET_SIZE, PyByteArray_Resize
  12. from cpython.memoryview cimport PyMemoryView_FromMemory
  13. from cython.view cimport array as cvarray
  14. from ipaddress import IPv4Address
  15. from uuid import UUID, SafeUUID
  16. from libc.string cimport memcpy
  17. from datetime import tzinfo
  18. from clickhouse_connect.driver.errors import NONE_IN_NULLABLE_COLUMN
  19. @cython.boundscheck(False)
  20. @cython.wraparound(False)
  21. def pivot(data: Sequence, unsigned long long start, unsigned long long end):
  22. cdef unsigned long long row_count = end - start
  23. cdef unsigned long long col_count = len(data[0])
  24. cdef object result = PyTuple_New(col_count)
  25. cdef object col, v
  26. for x in range(col_count):
  27. col = PyTuple_New(row_count)
  28. PyTuple_SET_ITEM(result, x, col)
  29. Py_INCREF(col)
  30. for y in range(row_count):
  31. v = data[y + start][x]
  32. PyTuple_SET_ITEM(col, y, v)
  33. Py_INCREF(v)
  34. return result
  35. @cython.wraparound(False)
  36. @cython.boundscheck(False)
  37. def read_ipv4_col(ResponseBuffer buffer, unsigned long long num_rows):
  38. cdef unsigned long long x = 0
  39. cdef char* loc = buffer.read_bytes_c(4 * num_rows)
  40. cdef object column = PyTuple_New(num_rows), v
  41. ip_new = IPv4Address.__new__
  42. while x < num_rows:
  43. v = ip_new(IPv4Address)
  44. v._ip = (<unsigned int*>loc)[0]
  45. PyTuple_SET_ITEM(column, x, v)
  46. Py_INCREF(v)
  47. loc += 4
  48. x += 1
  49. return column
  50. @cython.boundscheck(False)
  51. @cython.wraparound(False)
  52. def read_datetime_col(ResponseBuffer buffer, unsigned long long num_rows, tzinfo: tzinfo):
  53. cdef unsigned long long x = 0
  54. cdef char * loc = buffer.read_bytes_c(4 * num_rows)
  55. cdef object column = PyTuple_New(num_rows), v
  56. if tzinfo is None:
  57. fts = datetime.utcfromtimestamp
  58. while x < num_rows:
  59. v = fts((<unsigned int*>loc)[0])
  60. PyTuple_SET_ITEM(column, x, v)
  61. Py_INCREF(v)
  62. loc += 4
  63. x += 1
  64. else:
  65. fts = datetime.fromtimestamp
  66. while x < num_rows:
  67. v = fts((<unsigned int*>loc)[0], tzinfo)
  68. PyTuple_SET_ITEM(column, x, v)
  69. Py_INCREF(v)
  70. loc += 4
  71. x += 1
  72. return column
  73. @cython.boundscheck(False)
  74. @cython.wraparound(False)
  75. def read_date_col(ResponseBuffer buffer, unsigned long long num_rows):
  76. cdef unsigned long long x = 0
  77. cdef char * loc = buffer.read_bytes_c(2 * num_rows)
  78. cdef object column = PyTuple_New(num_rows), v
  79. while x < num_rows:
  80. v = epoch_days_to_date((<unsigned short*>loc)[0])
  81. PyTuple_SET_ITEM(column, x, v)
  82. Py_INCREF(v)
  83. loc += 2
  84. x += 1
  85. return column
  86. @cython.boundscheck(False)
  87. @cython.wraparound(False)
  88. def read_date32_col(ResponseBuffer buffer, unsigned long long num_rows):
  89. cdef unsigned long long x = 0
  90. cdef char * loc = buffer.read_bytes_c(4 * num_rows)
  91. cdef object column = PyTuple_New(num_rows), v
  92. while x < num_rows:
  93. v = epoch_days_to_date((<int*>loc)[0])
  94. PyTuple_SET_ITEM(column, x, v)
  95. Py_INCREF(v)
  96. loc += 4
  97. x += 1
  98. return column
  99. cdef unsigned short* MONTH_DAYS = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334, 365]
  100. cdef unsigned short* MONTH_DAYS_LEAP = [0, 31, 60, 91, 121, 152, 182, 213, 244, 274, 305, 335, 366]
  101. # Constants used in epoch_days_to_date
  102. # 47482 -- Jan 1, 2100 -- Because all years 1970-2099 divisible by 4 are leap years, some extra division can be avoided
  103. # 134774 -- Number of days between Jan 1 1601 and Jan 1 1970. Adding this starts all calculations at 1601-01-01
  104. # 1461 -- Number of days in a 4-year cycle (365 * 4) + 1 leap day
  105. # 36524 -- Number of days in a 100-year cycle. 25 4-year cycles - 1 leap day for the year 100
  106. # 146097 -- Number of days in a 400-year cycle. 4 100 year cycles + 1 leap day for the year 400
  107. # Year and offset with in the year are determined by factoring out the largest "known" year blocks in
  108. # descending order (400/100/4/1 years). Month is then (over) estimated in the "day" arrays (days / 32) and
  109. # adjusted down if too large (logic originally in the Python standard library)
  110. @cython.cdivision(True)
  111. @cython.boundscheck(False)
  112. @cython.wraparound(False)
  113. cpdef inline object epoch_days_to_date(int days):
  114. cdef int years, month, year, cycles400, cycles100, cycles, rem
  115. cdef unsigned short prev
  116. cdef unsigned short* m_list
  117. if 0 <= days < 47482:
  118. cycles = (days + 365) // 1461
  119. rem = (days + 365) - cycles * 1461
  120. years = rem // 365
  121. rem -= years * 365
  122. year = (cycles << 2) + years + 1969
  123. if years == 4:
  124. return date(year - 1, 12, 31)
  125. if years == 3:
  126. m_list = MONTH_DAYS_LEAP
  127. else:
  128. m_list = MONTH_DAYS
  129. else:
  130. cycles400 = (days + 134774) // 146097
  131. rem = days + 134774 - (cycles400 * 146097)
  132. cycles100 = rem // 36524
  133. rem -= cycles100 * 36524
  134. cycles = rem // 1461
  135. rem -= cycles * 1461
  136. years = rem // 365
  137. rem -= years * 365
  138. year = (cycles << 2) + cycles400 * 400 + cycles100 * 100 + years + 1601
  139. if years == 4 or cycles100 == 4:
  140. return date(year - 1, 12, 31)
  141. if years == 3 and (year == 2000 or year % 100 != 0):
  142. m_list = MONTH_DAYS_LEAP
  143. else:
  144. m_list = MONTH_DAYS
  145. month = (rem + 24) >> 5
  146. prev = m_list[month]
  147. while rem < prev:
  148. month -= 1
  149. prev = m_list[month]
  150. return date(year, month + 1, rem + 1 - prev)
  151. @cython.boundscheck(False)
  152. @cython.wraparound(False)
  153. def read_uuid_col(ResponseBuffer buffer, unsigned long long num_rows):
  154. cdef unsigned long long x = 0
  155. cdef char * loc = buffer.read_bytes_c(16 * num_rows)
  156. cdef char[16] temp
  157. cdef object column = PyTuple_New(num_rows), v
  158. new_uuid = UUID.__new__
  159. unsafe = SafeUUID.unsafe
  160. oset = object.__setattr__
  161. for x in range(num_rows):
  162. memcpy (<void *>temp, <void *>(loc + 8), 8)
  163. memcpy (<void *>(temp + 8), <void *>loc, 8)
  164. v = new_uuid(UUID)
  165. oset(v, 'int', int.from_bytes(temp[:16], 'little'))
  166. oset(v, 'is_safe', unsafe)
  167. PyTuple_SET_ITEM(column, x, v)
  168. Py_INCREF(v)
  169. loc += 16
  170. return column
  171. @cython.boundscheck(False)
  172. @cython.wraparound(False)
  173. def read_nullable_array(ResponseBuffer buffer, array_type: str, unsigned long long num_rows, object null_obj):
  174. if num_rows == 0:
  175. return ()
  176. cdef unsigned long long x = 0
  177. cdef size_t item_size = struct.calcsize(array_type)
  178. cdef cvarray cy_array = cvarray((num_rows,), item_size, array_type, mode='c', allocate_buffer=False)
  179. # We have to make a copy of the incoming null map because the next
  180. # "read_byes_c" call could invalidate our pointer by replacing the underlying buffer
  181. cdef char * null_map = <char *>PyMem_Malloc(<size_t>num_rows)
  182. memcpy(<void *>null_map, <void *>buffer.read_bytes_c(num_rows), num_rows)
  183. cy_array.data = buffer.read_bytes_c(num_rows * item_size)
  184. cdef object column = tuple(memoryview(cy_array))
  185. for x in range(num_rows):
  186. if null_map[x] != 0:
  187. Py_DECREF(column[x])
  188. Py_INCREF(null_obj)
  189. PyTuple_SET_ITEM(column, x, null_obj)
  190. PyMem_Free(<void *>null_map)
  191. return column
  192. @cython.boundscheck(False)
  193. @cython.wraparound(False)
  194. def build_nullable_column(source: Sequence, char * null_map, object null_obj):
  195. cdef unsigned long long num_rows = len(source), x
  196. cdef object column = PyTuple_New(num_rows), v
  197. for x in range(num_rows):
  198. if null_map[x] == 0:
  199. v = source[x]
  200. else:
  201. v = null_obj
  202. Py_INCREF(v)
  203. PyTuple_SET_ITEM(column, x, v)
  204. return column
  205. @cython.boundscheck(False)
  206. @cython.wraparound(False)
  207. def build_lc_nullable_column(index: Sequence, keys: array.array, object null_obj):
  208. cdef unsigned long long num_rows = len(keys), x, y
  209. cdef object column = PyTuple_New(num_rows), v
  210. for x in range(num_rows):
  211. y = keys[x]
  212. if y == 0:
  213. v = null_obj
  214. else:
  215. v = index[y]
  216. Py_INCREF(v)
  217. PyTuple_SET_ITEM(column, x, v)
  218. return column
  219. @cython.boundscheck(False)
  220. @cython.wraparound(False)
  221. cdef inline extend_byte_array(target: bytearray, int start, object source, Py_ssize_t sz):
  222. PyByteArray_Resize(target, start + sz)
  223. target[start:start + sz] = source[0:sz]
  224. @cython.boundscheck(False)
  225. @cython.wraparound(False)
  226. def write_str_col(column: Sequence, nullable: bool, encoding: Optional[str], dest: bytearray) -> int:
  227. cdef unsigned long long buff_size = len(column) << 5
  228. cdef unsigned long long buff_loc = 0, sz = 0, dsz = 0
  229. cdef unsigned long long array_size = PyByteArray_GET_SIZE(dest)
  230. cdef char * temp_buff = <char *>PyMem_Malloc(<size_t>buff_size)
  231. cdef object mv = PyMemoryView_FromMemory(temp_buff, buff_size, PyBUF_READ)
  232. cdef object encoded
  233. cdef char b
  234. cdef char * data
  235. try:
  236. for x in column:
  237. if not x:
  238. if not nullable and x is None:
  239. return NONE_IN_NULLABLE_COLUMN
  240. temp_buff[buff_loc] = 0
  241. buff_loc += 1
  242. if buff_loc == buff_size:
  243. extend_byte_array(dest, array_size, mv, buff_loc)
  244. array_size += buff_loc
  245. buff_loc = 0
  246. else:
  247. if not encoding:
  248. data = x
  249. dsz = len(x)
  250. else:
  251. encoded = x.encode(encoding)
  252. dsz = len(encoded)
  253. data = encoded
  254. sz = dsz
  255. while True:
  256. b = sz & 0x7f
  257. sz >>= 7
  258. if sz != 0:
  259. b |= 0x80
  260. temp_buff[buff_loc] = b
  261. buff_loc += 1
  262. if buff_loc == buff_size:
  263. extend_byte_array(dest, array_size, mv, buff_loc)
  264. array_size += buff_loc
  265. buff_loc = 0
  266. if sz == 0:
  267. break
  268. if dsz + buff_loc >= buff_size:
  269. if buff_loc > 0: # Write what we have so far
  270. extend_byte_array(dest, array_size, mv, buff_loc)
  271. array_size += buff_loc
  272. buff_loc = 0
  273. if (dsz << 4) > buff_size: # resize our buffer for very large strings
  274. PyMem_Free(<void *> temp_buff)
  275. mv.release()
  276. buff_size = dsz << 6
  277. temp_buff = <char *> PyMem_Malloc(<size_t> buff_size)
  278. mv = PyMemoryView_FromMemory(temp_buff, buff_size, PyBUF_READ)
  279. memcpy(temp_buff + buff_loc, data, dsz)
  280. buff_loc += dsz
  281. if buff_loc > 0:
  282. extend_byte_array(dest, array_size, mv, buff_loc)
  283. finally:
  284. mv.release()
  285. PyMem_Free(<void *>temp_buff)
  286. return 0