buffer.pyx 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import sys
  2. from typing import Iterable, Any, Optional
  3. import cython
  4. from cpython cimport Py_INCREF, array
  5. import array
  6. from cpython.unicode cimport PyUnicode_Decode
  7. from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
  8. from cpython.bytes cimport PyBytes_FromStringAndSize
  9. from cpython.buffer cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_ANY_CONTIGUOUS, PyBUF_SIMPLE
  10. from cpython.mem cimport PyMem_Free, PyMem_Malloc
  11. from libc.string cimport memcpy
  12. from clickhouse_connect.driver.exceptions import StreamCompleteException
  13. cdef union ull_wrapper:
  14. char* source
  15. unsigned long long int_value
  16. cdef char * errors = 'strict'
  17. cdef char * utf8 = 'utf8'
  18. cdef dict array_templates = {}
  19. cdef bint must_swap = sys.byteorder == 'big'
  20. cdef array.array swapper = array.array('Q', [0])
  21. for c in 'bBuhHiIlLqQfd':
  22. array_templates[c] = array.array(c, [])
  23. cdef class ResponseBuffer:
  24. def __init__(self, source):
  25. self.slice_sz = 4096
  26. self.buf_loc = 0
  27. self.buf_sz = 0
  28. self.source = source
  29. self.gen = source.gen
  30. self.buffer = NULL
  31. self.slice = <char*>PyMem_Malloc(self.slice_sz)
  32. # Note that return char * return from this method is only good until the next call to _read_bytes_c or
  33. # _read_byte_load, since it points into self.buffer which can be replaced with the next chunk from the stream
  34. # Accordingly, that memory MUST be copied/processed into another buffer/PyObject immediately
  35. @cython.boundscheck(False)
  36. @cython.wraparound(False)
  37. cdef char * read_bytes_c(self, unsigned long long sz) except NULL:
  38. cdef unsigned long long x, e, tail, cur_len, temp
  39. cdef char* ptr
  40. e = self.buf_sz
  41. if self.buf_loc + sz <= e:
  42. # We still have "sz" unread bytes available in the buffer, return the currently loc and advance it
  43. temp = self.buf_loc
  44. self.buf_loc += sz
  45. return self.buffer + temp
  46. # We need more data than is currently in the buffer, copy what's left into the temporary slice,
  47. # get a new buffer, and append what we need from the new buffer into that slice
  48. cur_len = e - self.buf_loc
  49. temp = self.slice_sz #
  50. while temp < sz * 2:
  51. temp <<= 1
  52. if temp > self.slice_sz:
  53. PyMem_Free(self.slice)
  54. self.slice = <char*>PyMem_Malloc(temp)
  55. self.slice_sz = temp
  56. if cur_len > 0:
  57. memcpy(self.slice, self.buffer + self.buf_loc, cur_len)
  58. self.buf_loc = 0
  59. self.buf_sz = 0
  60. # Loop until we've read enough chunks to fill the requested size
  61. while cur_len < sz:
  62. chunk = next(self.gen, None)
  63. if not chunk:
  64. raise StreamCompleteException
  65. x = len(chunk)
  66. ptr = <char *> chunk
  67. if cur_len + x <= sz:
  68. # We need this whole chunk for the requested size, copy it into the temporary slice and get the next one
  69. memcpy(self.slice + cur_len, ptr, x)
  70. cur_len += x
  71. else:
  72. # We need just the beginning of this chunk to finish the temporary, copy that and set
  73. # the pointer into our stored buffer to the first unread data
  74. tail = sz - cur_len
  75. memcpy(self.slice + cur_len, ptr, tail)
  76. PyBuffer_Release(&self.buff_source)
  77. PyObject_GetBuffer(chunk, &self.buff_source, PyBUF_SIMPLE | PyBUF_ANY_CONTIGUOUS)
  78. self.buffer = <char *> self.buff_source.buf
  79. self.buf_sz = x
  80. self.buf_loc = tail
  81. cur_len += tail
  82. return self.slice
  83. @cython.boundscheck(False)
  84. @cython.wraparound(False)
  85. cdef inline unsigned char _read_byte_load(self) except ?255:
  86. self.buf_loc = 0
  87. self.buf_sz = 0
  88. chunk = next(self.gen, None)
  89. if not chunk:
  90. raise StreamCompleteException
  91. x = len(chunk)
  92. py_chunk = chunk
  93. if x > 1:
  94. PyBuffer_Release(&self.buff_source)
  95. PyObject_GetBuffer(chunk, &self.buff_source, PyBUF_SIMPLE | PyBUF_ANY_CONTIGUOUS)
  96. self.buffer = <char *> self.buff_source.buf
  97. self.buf_loc = 1
  98. self.buf_sz = x
  99. return <unsigned char>chunk[0]
  100. @cython.boundscheck(False)
  101. @cython.wraparound(False)
  102. cdef inline object _read_str_col(self, unsigned long long num_rows, char * encoding):
  103. cdef object column = PyTuple_New(num_rows), v
  104. cdef unsigned long long x = 0, sz, shift
  105. cdef unsigned char b
  106. cdef char* buf
  107. while x < num_rows:
  108. sz = 0
  109. shift = 0
  110. while 1:
  111. if self.buf_loc < self.buf_sz:
  112. b = self.buffer[self.buf_loc]
  113. self.buf_loc += 1
  114. else:
  115. b = self._read_byte_load()
  116. sz += ((b & 0x7f) << shift)
  117. if (b & 0x80) == 0:
  118. break
  119. shift += 7
  120. buf = self.read_bytes_c(sz)
  121. if encoding:
  122. try:
  123. v = PyUnicode_Decode(buf, sz, encoding, errors)
  124. except UnicodeDecodeError:
  125. v = PyBytes_FromStringAndSize(buf, sz).hex()
  126. else:
  127. v = PyBytes_FromStringAndSize(buf, sz)
  128. PyTuple_SET_ITEM(column, x, v)
  129. Py_INCREF(v)
  130. x += 1
  131. return column
  132. @cython.boundscheck(False)
  133. @cython.wraparound(False)
  134. cdef inline object _read_nullable_str_col(self, unsigned long long num_rows, char * encoding, object null_obj):
  135. cdef object column = PyTuple_New(num_rows), v
  136. cdef unsigned long long x = 0, sz, shift
  137. cdef unsigned char b
  138. cdef char * buf
  139. cdef char * null_map = <char *> PyMem_Malloc(<size_t> num_rows)
  140. memcpy(<void *> null_map, <void *> self.read_bytes_c(num_rows), num_rows)
  141. for x in range(num_rows):
  142. if self.buf_loc < self.buf_sz:
  143. b = self.buffer[self.buf_loc]
  144. self.buf_loc += 1
  145. else:
  146. b = self._read_byte_load()
  147. shift = 0
  148. sz = b & 0x7f
  149. while b & 0x80:
  150. shift += 7
  151. if self.buf_loc < self.buf_sz:
  152. b = self.buffer[self.buf_loc]
  153. self.buf_loc += 1
  154. else:
  155. b = self._read_byte_load()
  156. sz += ((b & 0x7f) << shift)
  157. buf = self.read_bytes_c(sz)
  158. if null_map[x]:
  159. v = null_obj
  160. elif encoding:
  161. try:
  162. v = PyUnicode_Decode(buf, sz, encoding, errors)
  163. except UnicodeDecodeError:
  164. v = PyBytes_FromStringAndSize(buf, sz).hex()
  165. else:
  166. v = PyBytes_FromStringAndSize(buf, sz)
  167. PyTuple_SET_ITEM(column, x, v)
  168. Py_INCREF(v)
  169. PyMem_Free(<void *> null_map)
  170. return column
  171. @cython.boundscheck(False)
  172. @cython.wraparound(False)
  173. def read_byte(self) -> int:
  174. if self.buf_loc < self.buf_sz:
  175. b = self.buffer[self.buf_loc]
  176. self.buf_loc += 1
  177. return b
  178. b = self._read_byte_load()
  179. return b
  180. def read_leb128_str(self) -> str:
  181. cdef unsigned long long sz = self.read_leb128()
  182. cdef char * b = self.read_bytes_c(sz)
  183. return PyUnicode_Decode(b, sz, utf8, errors)
  184. @cython.boundscheck(False)
  185. @cython.wraparound(False)
  186. def read_leb128(self) -> int:
  187. cdef:
  188. unsigned long long sz = 0, shift = 0
  189. unsigned char b
  190. while 1:
  191. if self.buf_loc < self.buf_sz:
  192. b = self.buffer[self.buf_loc]
  193. self.buf_loc += 1
  194. else:
  195. b = self._read_byte_load()
  196. sz += ((b & 0x7f) << shift)
  197. if (b & 0x80) == 0:
  198. return sz
  199. shift += 7
  200. @cython.boundscheck(False)
  201. @cython.wraparound(False)
  202. def read_uint64(self) -> int:
  203. cdef ull_wrapper* x
  204. cdef char* b = self.read_bytes_c(8)
  205. if must_swap:
  206. memcpy(swapper.data.as_voidptr, b, 8)
  207. swapper.byteswap()
  208. return swapper[0]
  209. x = <ull_wrapper *> b
  210. return x.int_value
  211. @cython.boundscheck(False)
  212. @cython.wraparound(False)
  213. def read_bytes(self, unsigned long long sz) -> bytes:
  214. cdef char* b = self.read_bytes_c(sz)
  215. return b[:sz]
  216. def read_str_col(self,
  217. unsigned long long num_rows,
  218. encoding: Optional[str],
  219. nullable: bool = False,
  220. null_object: Any = None) -> Iterable[str]:
  221. cdef char * enc = NULL
  222. if encoding:
  223. pyenc = encoding.encode()
  224. enc = pyenc
  225. if nullable:
  226. return self._read_nullable_str_col(num_rows, enc, null_object)
  227. return self._read_str_col(num_rows, enc)
  228. @cython.boundscheck(False)
  229. @cython.wraparound(False)
  230. def read_array(self, t: str, unsigned long long num_rows) -> Iterable[Any]:
  231. cdef array.array template = array_templates[t]
  232. cdef array.array result = array.clone(template, num_rows, 0)
  233. cdef unsigned long long sz = result.itemsize * num_rows
  234. cdef char * b = self.read_bytes_c(sz)
  235. memcpy(result.data.as_voidptr, b, sz)
  236. if must_swap:
  237. result.byteswap()
  238. return result
  239. @cython.boundscheck(False)
  240. @cython.wraparound(False)
  241. def read_bytes_col(self, unsigned long long sz, unsigned long long num_rows) -> Iterable[Any]:
  242. cdef object column = PyTuple_New(num_rows)
  243. cdef char * b = self.read_bytes_c(sz * num_rows)
  244. for x in range(num_rows):
  245. v = PyBytes_FromStringAndSize(b, sz)
  246. b += sz
  247. PyTuple_SET_ITEM(column, x, v)
  248. Py_INCREF(v)
  249. return column
  250. @cython.boundscheck(False)
  251. @cython.wraparound(False)
  252. def read_fixed_str_col(self, unsigned long long sz, unsigned long long num_rows,
  253. encoding:str ='utf8') -> Iterable[str]:
  254. cdef object column = PyTuple_New(num_rows)
  255. cdef char * enc
  256. cdef char * b = self.read_bytes_c(sz * num_rows)
  257. cdef object v
  258. pyenc = encoding.encode()
  259. enc = pyenc
  260. for x in range(num_rows):
  261. try:
  262. v = PyUnicode_Decode(b, sz, enc, errors)
  263. except UnicodeDecodeError:
  264. v = PyBytes_FromStringAndSize(b, sz).hex()
  265. PyTuple_SET_ITEM(column, x, v)
  266. Py_INCREF(v)
  267. b += sz
  268. return column
  269. def close(self):
  270. if self.source:
  271. self.source.close()
  272. self.source = None
  273. @property
  274. def last_message(self):
  275. if self.buffer == NULL:
  276. return None
  277. return self.buffer[self.buf_sz:].decode()
  278. def __dealloc__(self):
  279. self.close()
  280. PyBuffer_Release(&self.buff_source)
  281. PyMem_Free(self.slice)