buffer.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import sys
  2. import array
  3. from typing import Any, Iterable
  4. from clickhouse_connect.driver.exceptions import StreamCompleteException
  5. from clickhouse_connect.driver.types import ByteSource
  6. must_swap = sys.byteorder == 'big'
  7. class ResponseBuffer(ByteSource):
  8. slots = 'slice_sz', 'buf_loc', 'end', 'gen', 'buffer', 'slice'
  9. def __init__(self, source):
  10. self.slice_sz = 4096
  11. self.buf_loc = 0
  12. self.buf_sz = 0
  13. self.source = source
  14. self.gen = source.gen
  15. self.buffer = bytes()
  16. def read_bytes(self, sz: int):
  17. if self.buf_loc + sz <= self.buf_sz:
  18. self.buf_loc += sz
  19. return self.buffer[self.buf_loc - sz: self.buf_loc]
  20. # Create a temporary buffer that bridges two or more source chunks
  21. bridge = bytearray(self.buffer[self.buf_loc: self.buf_sz])
  22. self.buf_loc = 0
  23. self.buf_sz = 0
  24. while len(bridge) < sz:
  25. chunk = next(self.gen, None)
  26. if not chunk:
  27. raise StreamCompleteException
  28. x = len(chunk)
  29. if len(bridge) + x <= sz:
  30. bridge.extend(chunk)
  31. else:
  32. tail = sz - len(bridge)
  33. bridge.extend(chunk[:tail])
  34. self.buffer = chunk
  35. self.buf_sz = x
  36. self.buf_loc = tail
  37. return bridge
  38. def read_byte(self) -> int:
  39. if self.buf_loc < self.buf_sz:
  40. self.buf_loc += 1
  41. return self.buffer[self.buf_loc - 1]
  42. self.buf_sz = 0
  43. self.buf_loc = 0
  44. chunk = next(self.gen, None)
  45. if not chunk:
  46. raise StreamCompleteException
  47. x = len(chunk)
  48. if x > 1:
  49. self.buffer = chunk
  50. self.buf_loc = 1
  51. self.buf_sz = x
  52. return chunk[0]
  53. def read_leb128(self) -> int:
  54. sz = 0
  55. shift = 0
  56. while True:
  57. b = self.read_byte()
  58. sz += ((b & 0x7f) << shift)
  59. if (b & 0x80) == 0:
  60. return sz
  61. shift += 7
  62. def read_leb128_str(self) -> str:
  63. sz = self.read_leb128()
  64. return self.read_bytes(sz).decode()
  65. def read_uint64(self) -> int:
  66. return int.from_bytes(self.read_bytes(8), 'little', signed=False)
  67. def read_str_col(self,
  68. num_rows: int,
  69. encoding: str,
  70. nullable: bool = False,
  71. null_obj: Any = None) -> Iterable[str]:
  72. column = []
  73. app = column.append
  74. null_map = self.read_bytes(num_rows) if nullable else None
  75. for ix in range(num_rows):
  76. sz = 0
  77. shift = 0
  78. while True:
  79. b = self.read_byte()
  80. sz += ((b & 0x7f) << shift)
  81. if (b & 0x80) == 0:
  82. break
  83. shift += 7
  84. x = self.read_bytes(sz)
  85. if null_map and null_map[ix]:
  86. app(null_obj)
  87. elif encoding:
  88. try:
  89. app(x.decode(encoding))
  90. except UnicodeDecodeError:
  91. app(x.hex())
  92. else:
  93. app(x)
  94. return column
  95. def read_bytes_col(self, sz: int, num_rows: int) -> Iterable[bytes]:
  96. source = self.read_bytes(sz * num_rows)
  97. return [bytes(source[x:x+sz]) for x in range(0, sz * num_rows, sz)]
  98. def read_fixed_str_col(self, sz: int, num_rows: int, encoding: str) -> Iterable[str]:
  99. source = self.read_bytes(sz * num_rows)
  100. column = []
  101. app = column.append
  102. for ix in range(0, sz * num_rows, sz):
  103. try:
  104. app(str(source[ix: ix + sz], encoding).rstrip('\x00'))
  105. except UnicodeDecodeError:
  106. app(source[ix: ix + sz].hex())
  107. return column
  108. def read_array(self, array_type: str, num_rows: int) -> Iterable[Any]:
  109. column = array.array(array_type)
  110. sz = column.itemsize * num_rows
  111. b = self.read_bytes(sz)
  112. column.frombytes(b)
  113. if must_swap:
  114. column.byteswap()
  115. return column
  116. @property
  117. def last_message(self):
  118. if len(self.buffer) == 0:
  119. return None
  120. return self.buffer.decode()
  121. def close(self):
  122. if self.source:
  123. self.source.close()
  124. self.source = None