tokens.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. # coding: utf-8
  2. from ruamel.yaml.compat import nprintf # NOQA
  3. from typing import Text, Any, Dict, Optional, List # NOQA
  4. from .error import StreamMark # NOQA
  5. SHOW_LINES = True
  6. class Token:
  7. __slots__ = 'start_mark', 'end_mark', '_comment'
  8. def __init__(self, start_mark: StreamMark, end_mark: StreamMark) -> None:
  9. self.start_mark = start_mark
  10. self.end_mark = end_mark
  11. def __repr__(self) -> Any:
  12. # attributes = [key for key in self.__slots__ if not key.endswith('_mark') and
  13. # hasattr('self', key)]
  14. attributes = [key for key in self.__slots__ if not key.endswith('_mark')]
  15. attributes.sort()
  16. # arguments = ', '.join(
  17. # [f'{key!s}={getattr(self, key)!r})' for key in attributes]
  18. # )
  19. arguments = [f'{key!s}={getattr(self, key)!r}' for key in attributes]
  20. if SHOW_LINES:
  21. try:
  22. arguments.append('line: ' + str(self.start_mark.line))
  23. except: # NOQA
  24. pass
  25. try:
  26. arguments.append('comment: ' + str(self._comment))
  27. except: # NOQA
  28. pass
  29. return f'{self.__class__.__name__}({", ".join(arguments)})'
  30. @property
  31. def column(self) -> int:
  32. return self.start_mark.column
  33. @column.setter
  34. def column(self, pos: Any) -> None:
  35. self.start_mark.column = pos
  36. # old style ( <= 0.17) is a TWO element list with first being the EOL
  37. # comment concatenated with following FLC/BLNK; and second being a list of FLC/BLNK
  38. # preceding the token
  39. # new style ( >= 0.17 ) is a THREE element list with the first being a list of
  40. # preceding FLC/BLNK, the second EOL and the third following FLC/BLNK
  41. # note that new style has differing order, and does not consist of CommentToken(s)
  42. # but of CommentInfo instances
  43. # any non-assigned values in new style are None, but first and last can be empty list
  44. # new style routines add one comment at a time
  45. # going to be deprecated in favour of add_comment_eol/post
  46. def add_post_comment(self, comment: Any) -> None:
  47. if not hasattr(self, '_comment'):
  48. self._comment = [None, None]
  49. else:
  50. assert len(self._comment) in [2, 5] # make sure it is version 0
  51. # if isinstance(comment, CommentToken):
  52. # if comment.value.startswith('# C09'):
  53. # raise
  54. self._comment[0] = comment
  55. # going to be deprecated in favour of add_comment_pre
  56. def add_pre_comments(self, comments: Any) -> None:
  57. if not hasattr(self, '_comment'):
  58. self._comment = [None, None]
  59. else:
  60. assert len(self._comment) == 2 # make sure it is version 0
  61. assert self._comment[1] is None
  62. self._comment[1] = comments
  63. return
  64. # new style
  65. def add_comment_pre(self, comment: Any) -> None:
  66. if not hasattr(self, '_comment'):
  67. self._comment = [[], None, None] # type: ignore
  68. else:
  69. assert len(self._comment) == 3
  70. if self._comment[0] is None:
  71. self._comment[0] = [] # type: ignore
  72. self._comment[0].append(comment) # type: ignore
  73. def add_comment_eol(self, comment: Any, comment_type: Any) -> None:
  74. if not hasattr(self, '_comment'):
  75. self._comment = [None, None, None]
  76. else:
  77. assert len(self._comment) == 3
  78. assert self._comment[1] is None
  79. if self.comment[1] is None:
  80. self._comment[1] = [] # type: ignore
  81. self._comment[1].extend([None] * (comment_type + 1 - len(self.comment[1]))) # type: ignore # NOQA
  82. # nprintf('commy', self.comment, comment_type)
  83. self._comment[1][comment_type] = comment # type: ignore
  84. def add_comment_post(self, comment: Any) -> None:
  85. if not hasattr(self, '_comment'):
  86. self._comment = [None, None, []] # type: ignore
  87. else:
  88. assert len(self._comment) == 3
  89. if self._comment[2] is None:
  90. self._comment[2] = [] # type: ignore
  91. self._comment[2].append(comment) # type: ignore
  92. # def get_comment(self) -> Any:
  93. # return getattr(self, '_comment', None)
  94. @property
  95. def comment(self) -> Any:
  96. return getattr(self, '_comment', None)
  97. def move_old_comment(self, target: Any, empty: bool = False) -> Any:
  98. """move a comment from this token to target (normally next token)
  99. used to combine e.g. comments before a BlockEntryToken to the
  100. ScalarToken that follows it
  101. empty is a special for empty values -> comment after key
  102. """
  103. c = self.comment
  104. if c is None:
  105. return
  106. # don't push beyond last element
  107. if isinstance(target, (StreamEndToken, DocumentStartToken)):
  108. return
  109. delattr(self, '_comment')
  110. tc = target.comment
  111. if not tc: # target comment, just insert
  112. # special for empty value in key: value issue 25
  113. if empty:
  114. c = [c[0], c[1], None, None, c[0]]
  115. target._comment = c
  116. # nprint('mco2:', self, target, target.comment, empty)
  117. return self
  118. if c[0] and tc[0] or c[1] and tc[1]:
  119. raise NotImplementedError(f'overlap in comment {c!r} {tc!r}')
  120. if c[0]:
  121. tc[0] = c[0]
  122. if c[1]:
  123. tc[1] = c[1]
  124. return self
  125. def split_old_comment(self) -> Any:
  126. """ split the post part of a comment, and return it
  127. as comment to be added. Delete second part if [None, None]
  128. abc: # this goes to sequence
  129. # this goes to first element
  130. - first element
  131. """
  132. comment = self.comment
  133. if comment is None or comment[0] is None:
  134. return None # nothing to do
  135. ret_val = [comment[0], None]
  136. if comment[1] is None:
  137. delattr(self, '_comment')
  138. return ret_val
  139. def move_new_comment(self, target: Any, empty: bool = False) -> Any:
  140. """move a comment from this token to target (normally next token)
  141. used to combine e.g. comments before a BlockEntryToken to the
  142. ScalarToken that follows it
  143. empty is a special for empty values -> comment after key
  144. """
  145. c = self.comment
  146. if c is None:
  147. return
  148. # don't push beyond last element
  149. if isinstance(target, (StreamEndToken, DocumentStartToken)):
  150. return
  151. delattr(self, '_comment')
  152. tc = target.comment
  153. if not tc: # target comment, just insert
  154. # special for empty value in key: value issue 25
  155. if empty:
  156. c = [c[0], c[1], c[2]]
  157. target._comment = c
  158. # nprint('mco2:', self, target, target.comment, empty)
  159. return self
  160. # if self and target have both pre, eol or post comments, something seems wrong
  161. for idx in range(3):
  162. if c[idx] is not None and tc[idx] is not None:
  163. raise NotImplementedError(f'overlap in comment {c!r} {tc!r}')
  164. # move the comment parts
  165. for idx in range(3):
  166. if c[idx]:
  167. tc[idx] = c[idx]
  168. return self
  169. # class BOMToken(Token):
  170. # id = '<byte order mark>'
  171. class DirectiveToken(Token):
  172. __slots__ = 'name', 'value'
  173. id = '<directive>'
  174. def __init__(self, name: Any, value: Any, start_mark: Any, end_mark: Any) -> None:
  175. Token.__init__(self, start_mark, end_mark)
  176. self.name = name
  177. self.value = value
  178. class DocumentStartToken(Token):
  179. __slots__ = ()
  180. id = '<document start>'
  181. class DocumentEndToken(Token):
  182. __slots__ = ()
  183. id = '<document end>'
  184. class StreamStartToken(Token):
  185. __slots__ = ('encoding',)
  186. id = '<stream start>'
  187. def __init__(
  188. self, start_mark: Any = None, end_mark: Any = None, encoding: Any = None,
  189. ) -> None:
  190. Token.__init__(self, start_mark, end_mark)
  191. self.encoding = encoding
  192. class StreamEndToken(Token):
  193. __slots__ = ()
  194. id = '<stream end>'
  195. class BlockSequenceStartToken(Token):
  196. __slots__ = ()
  197. id = '<block sequence start>'
  198. class BlockMappingStartToken(Token):
  199. __slots__ = ()
  200. id = '<block mapping start>'
  201. class BlockEndToken(Token):
  202. __slots__ = ()
  203. id = '<block end>'
  204. class FlowSequenceStartToken(Token):
  205. __slots__ = ()
  206. id = '['
  207. class FlowMappingStartToken(Token):
  208. __slots__ = ()
  209. id = '{'
  210. class FlowSequenceEndToken(Token):
  211. __slots__ = ()
  212. id = ']'
  213. class FlowMappingEndToken(Token):
  214. __slots__ = ()
  215. id = '}'
  216. class KeyToken(Token):
  217. __slots__ = ()
  218. id = '?'
  219. # def x__repr__(self):
  220. # return f'KeyToken({self.start_mark.buffer[self.start_mark.index:].split(None, 1)[0]})'
  221. class ValueToken(Token):
  222. __slots__ = ()
  223. id = ':'
  224. class BlockEntryToken(Token):
  225. __slots__ = ()
  226. id = '-'
  227. class FlowEntryToken(Token):
  228. __slots__ = ()
  229. id = ','
  230. class AliasToken(Token):
  231. __slots__ = ('value',)
  232. id = '<alias>'
  233. def __init__(self, value: Any, start_mark: Any, end_mark: Any) -> None:
  234. Token.__init__(self, start_mark, end_mark)
  235. self.value = value
  236. class AnchorToken(Token):
  237. __slots__ = ('value',)
  238. id = '<anchor>'
  239. def __init__(self, value: Any, start_mark: Any, end_mark: Any) -> None:
  240. Token.__init__(self, start_mark, end_mark)
  241. self.value = value
  242. class TagToken(Token):
  243. __slots__ = ('value',)
  244. id = '<tag>'
  245. def __init__(self, value: Any, start_mark: Any, end_mark: Any) -> None:
  246. Token.__init__(self, start_mark, end_mark)
  247. self.value = value
  248. class ScalarToken(Token):
  249. __slots__ = 'value', 'plain', 'style'
  250. id = '<scalar>'
  251. def __init__(
  252. self, value: Any, plain: Any, start_mark: Any, end_mark: Any, style: Any = None,
  253. ) -> None:
  254. Token.__init__(self, start_mark, end_mark)
  255. self.value = value
  256. self.plain = plain
  257. self.style = style
  258. class CommentToken(Token):
  259. __slots__ = '_value', '_column', 'pre_done'
  260. id = '<comment>'
  261. def __init__(
  262. self, value: Any, start_mark: Any = None, end_mark: Any = None, column: Any = None,
  263. ) -> None:
  264. if start_mark is None:
  265. assert column is not None
  266. self._column = column
  267. Token.__init__(self, start_mark, None) # type: ignore
  268. self._value = value
  269. @property
  270. def value(self) -> str:
  271. if isinstance(self._value, str):
  272. return self._value
  273. return "".join(self._value)
  274. @value.setter
  275. def value(self, val: Any) -> None:
  276. self._value = val
  277. def reset(self) -> None:
  278. if hasattr(self, 'pre_done'):
  279. delattr(self, 'pre_done')
  280. def __repr__(self) -> Any:
  281. v = f'{self.value!r}'
  282. if SHOW_LINES:
  283. try:
  284. v += ', line: ' + str(self.start_mark.line)
  285. except: # NOQA
  286. pass
  287. try:
  288. v += ', col: ' + str(self.start_mark.column)
  289. except: # NOQA
  290. pass
  291. return f'CommentToken({v})'
  292. def __eq__(self, other: Any) -> bool:
  293. if self.start_mark != other.start_mark:
  294. return False
  295. if self.end_mark != other.end_mark:
  296. return False
  297. if self.value != other.value:
  298. return False
  299. return True
  300. def __ne__(self, other: Any) -> bool:
  301. return not self.__eq__(other)