reader.py 11 KB


  1. # coding: utf-8
  2. from __future__ import absolute_import
  3. # This module contains abstractions for the input stream. You don't have to
  4. # looks further, there are no pretty code.
  5. #
  6. # We define two classes here.
  7. #
  8. # Mark(source, line, column)
  9. # It's just a record and its only use is producing nice error messages.
  10. # Parser does not use it for any other purposes.
  11. #
  12. # Reader(source, data)
  13. # Reader determines the encoding of `data` and converts it to unicode.
  14. # Reader provides the following methods and attributes:
  15. # reader.peek(length=1) - return the next `length` characters
  16. # reader.forward(length=1) - move the current position to `length`
  17. # characters.
  18. # reader.index - the number of the current character.
  19. # reader.line, stream.column - the line and the column of the current
  20. # character.
  21. import codecs
  22. from ruamel.yaml.error import YAMLError, FileMark, StringMark, YAMLStreamError
  23. from ruamel.yaml.compat import text_type, binary_type, PY3, UNICODE_SIZE
  24. from ruamel.yaml.util import RegExp
  25. if False: # MYPY
  26. from typing import Any, Dict, Optional, List, Union, Text, Tuple, Optional # NOQA
  27. # from ruamel.yaml.compat import StreamTextType # NOQA
  28. __all__ = ['Reader', 'ReaderError']
  29. class ReaderError(YAMLError):
  30. def __init__(self, name, position, character, encoding, reason):
  31. # type: (Any, Any, Any, Any, Any) -> None
  32. self.name = name
  33. self.character = character
  34. self.position = position
  35. self.encoding = encoding
  36. self.reason = reason
  37. def __str__(self):
  38. # type: () -> str
  39. if isinstance(self.character, binary_type):
  40. return "'%s' codec can't decode byte #x%02x: %s\n" ' in "%s", position %d' % (
  41. self.encoding,
  42. ord(self.character),
  43. self.reason,
  44. self.name,
  45. self.position,
  46. )
  47. else:
  48. return 'unacceptable character #x%04x: %s\n' ' in "%s", position %d' % (
  49. self.character,
  50. self.reason,
  51. self.name,
  52. self.position,
  53. )
  54. class Reader(object):
  55. # Reader:
  56. # - determines the data encoding and converts it to a unicode string,
  57. # - checks if characters are in allowed range,
  58. # - adds '\0' to the end.
  59. # Reader accepts
  60. # - a `str` object (PY2) / a `bytes` object (PY3),
  61. # - a `unicode` object (PY2) / a `str` object (PY3),
  62. # - a file-like object with its `read` method returning `str`,
  63. # - a file-like object with its `read` method returning `unicode`.
  64. # Yeah, it's ugly and slow.
  65. def __init__(self, stream, loader=None):
  66. # type: (Any, Any) -> None
  67. self.loader = loader
  68. if self.loader is not None and getattr(self.loader, '_reader', None) is None:
  69. self.loader._reader = self
  70. self.reset_reader()
  71. self.stream = stream # type: Any # as .read is called
  72. def reset_reader(self):
  73. # type: () -> None
  74. self.name = None # type: Any
  75. self.stream_pointer = 0
  76. self.eof = True
  77. self.buffer = ""
  78. self.pointer = 0
  79. self.raw_buffer = None # type: Any
  80. self.raw_decode = None
  81. self.encoding = None # type: Optional[Text]
  82. self.index = 0
  83. self.line = 0
  84. self.column = 0
  85. @property
  86. def stream(self):
  87. # type: () -> Any
  88. try:
  89. return self._stream
  90. except AttributeError:
  91. raise YAMLStreamError('input stream needs to specified')
  92. @stream.setter
  93. def stream(self, val):
  94. # type: (Any) -> None
  95. if val is None:
  96. return
  97. self._stream = None
  98. if isinstance(val, text_type):
  99. self.name = '<unicode string>'
  100. self.check_printable(val)
  101. self.buffer = val + u'\0' # type: ignore
  102. elif isinstance(val, binary_type):
  103. self.name = '<byte string>'
  104. self.raw_buffer = val
  105. self.determine_encoding()
  106. else:
  107. if not hasattr(val, 'read'):
  108. raise YAMLStreamError('stream argument needs to have a read() method')
  109. self._stream = val
  110. self.name = getattr(self.stream, 'name', '<file>')
  111. self.eof = False
  112. self.raw_buffer = None
  113. self.determine_encoding()
  114. def peek(self, index=0):
  115. # type: (int) -> Text
  116. try:
  117. return self.buffer[self.pointer + index]
  118. except IndexError:
  119. self.update(index + 1)
  120. return self.buffer[self.pointer + index]
  121. def prefix(self, length=1):
  122. # type: (int) -> Any
  123. if self.pointer + length >= len(self.buffer):
  124. self.update(length)
  125. return self.buffer[self.pointer : self.pointer + length]
  126. def forward_1_1(self, length=1):
  127. # type: (int) -> None
  128. if self.pointer + length + 1 >= len(self.buffer):
  129. self.update(length + 1)
  130. while length != 0:
  131. ch = self.buffer[self.pointer]
  132. self.pointer += 1
  133. self.index += 1
  134. if ch in u'\n\x85\u2028\u2029' or (
  135. ch == u'\r' and self.buffer[self.pointer] != u'\n'
  136. ):
  137. self.line += 1
  138. self.column = 0
  139. elif ch != u'\uFEFF':
  140. self.column += 1
  141. length -= 1
  142. def forward(self, length=1):
  143. # type: (int) -> None
  144. if self.pointer + length + 1 >= len(self.buffer):
  145. self.update(length + 1)
  146. while length != 0:
  147. ch = self.buffer[self.pointer]
  148. self.pointer += 1
  149. self.index += 1
  150. if ch == u'\n' or (ch == u'\r' and self.buffer[self.pointer] != u'\n'):
  151. self.line += 1
  152. self.column = 0
  153. elif ch != u'\uFEFF':
  154. self.column += 1
  155. length -= 1
  156. def get_mark(self):
  157. # type: () -> Any
  158. if self.stream is None:
  159. return StringMark(
  160. self.name, self.index, self.line, self.column, self.buffer, self.pointer
  161. )
  162. else:
  163. return FileMark(self.name, self.index, self.line, self.column)
  164. def determine_encoding(self):
  165. # type: () -> None
  166. while not self.eof and (self.raw_buffer is None or len(self.raw_buffer) < 2):
  167. self.update_raw()
  168. if isinstance(self.raw_buffer, binary_type):
  169. if self.raw_buffer.startswith(codecs.BOM_UTF16_LE):
  170. self.raw_decode = codecs.utf_16_le_decode # type: ignore
  171. self.encoding = 'utf-16-le'
  172. elif self.raw_buffer.startswith(codecs.BOM_UTF16_BE):
  173. self.raw_decode = codecs.utf_16_be_decode # type: ignore
  174. self.encoding = 'utf-16-be'
  175. else:
  176. self.raw_decode = codecs.utf_8_decode # type: ignore
  177. self.encoding = 'utf-8'
  178. self.update(1)
  179. if UNICODE_SIZE == 2:
  180. NON_PRINTABLE = RegExp(
  181. u'[^\x09\x0A\x0D\x20-\x7E\x85' u'\xA0-\uD7FF' u'\uE000-\uFFFD' u']'
  182. )
  183. else:
  184. NON_PRINTABLE = RegExp(
  185. u'[^\x09\x0A\x0D\x20-\x7E\x85'
  186. u'\xA0-\uD7FF'
  187. u'\uE000-\uFFFD'
  188. u'\U00010000-\U0010FFFF'
  189. u']'
  190. )
  191. _printable_ascii = ('\x09\x0A\x0D' + "".join(map(chr, range(0x20, 0x7F)))).encode('ascii')
  192. @classmethod
  193. def _get_non_printable_ascii(cls, data): # type: ignore
  194. # type: (Text, bytes) -> Optional[Tuple[int, Text]]
  195. ascii_bytes = data.encode('ascii')
  196. non_printables = ascii_bytes.translate(None, cls._printable_ascii) # type: ignore
  197. if not non_printables:
  198. return None
  199. non_printable = non_printables[:1]
  200. return ascii_bytes.index(non_printable), non_printable.decode('ascii')
  201. @classmethod
  202. def _get_non_printable_regex(cls, data):
  203. # type: (Text) -> Optional[Tuple[int, Text]]
  204. match = cls.NON_PRINTABLE.search(data)
  205. if not bool(match):
  206. return None
  207. return match.start(), match.group()
  208. @classmethod
  209. def _get_non_printable(cls, data):
  210. # type: (Text) -> Optional[Tuple[int, Text]]
  211. try:
  212. return cls._get_non_printable_ascii(data) # type: ignore
  213. except UnicodeEncodeError:
  214. return cls._get_non_printable_regex(data)
  215. def check_printable(self, data):
  216. # type: (Any) -> None
  217. non_printable_match = self._get_non_printable(data)
  218. if non_printable_match is not None:
  219. start, character = non_printable_match
  220. position = self.index + (len(self.buffer) - self.pointer) + start
  221. raise ReaderError(
  222. self.name,
  223. position,
  224. ord(character),
  225. 'unicode',
  226. 'special characters are not allowed',
  227. )
  228. def update(self, length):
  229. # type: (int) -> None
  230. if self.raw_buffer is None:
  231. return
  232. self.buffer = self.buffer[self.pointer :]
  233. self.pointer = 0
  234. while len(self.buffer) < length:
  235. if not self.eof:
  236. self.update_raw()
  237. if self.raw_decode is not None:
  238. try:
  239. data, converted = self.raw_decode(self.raw_buffer, 'strict', self.eof)
  240. except UnicodeDecodeError as exc:
  241. if PY3:
  242. character = self.raw_buffer[exc.start]
  243. else:
  244. character = exc.object[exc.start]
  245. if self.stream is not None:
  246. position = self.stream_pointer - len(self.raw_buffer) + exc.start
  247. elif self.stream is not None:
  248. position = self.stream_pointer - len(self.raw_buffer) + exc.start
  249. else:
  250. position = exc.start
  251. raise ReaderError(self.name, position, character, exc.encoding, exc.reason)
  252. else:
  253. data = self.raw_buffer
  254. converted = len(data)
  255. self.check_printable(data)
  256. self.buffer += data
  257. self.raw_buffer = self.raw_buffer[converted:]
  258. if self.eof:
  259. self.buffer += '\0'
  260. self.raw_buffer = None
  261. break
  262. def update_raw(self, size=None):
  263. # type: (Optional[int]) -> None
  264. if size is None:
  265. size = 4096 if PY3 else 1024
  266. data = self.stream.read(size)
  267. if self.raw_buffer is None:
  268. self.raw_buffer = data
  269. else:
  270. self.raw_buffer += data
  271. self.stream_pointer += len(data)
  272. if not data:
  273. self.eof = True
  274. # try:
  275. # import psyco
  276. # psyco.bind(Reader)
  277. # except ImportError:
  278. # pass