composer.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # coding: utf-8
  2. from __future__ import absolute_import, print_function
  3. import warnings
  4. from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
  5. from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA
  6. from ruamel.yaml.events import (
  7. StreamStartEvent,
  8. StreamEndEvent,
  9. MappingStartEvent,
  10. MappingEndEvent,
  11. SequenceStartEvent,
  12. SequenceEndEvent,
  13. AliasEvent,
  14. ScalarEvent,
  15. )
  16. from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
  17. if False: # MYPY
  18. from typing import Any, Dict, Optional, List # NOQA
  19. __all__ = ['Composer', 'ComposerError']
  20. class ComposerError(MarkedYAMLError):
  21. pass
  22. class Composer(object):
  23. def __init__(self, loader=None):
  24. # type: (Any) -> None
  25. self.loader = loader
  26. if self.loader is not None and getattr(self.loader, '_composer', None) is None:
  27. self.loader._composer = self
  28. self.anchors = {} # type: Dict[Any, Any]
  29. @property
  30. def parser(self):
  31. # type: () -> Any
  32. if hasattr(self.loader, 'typ'):
  33. self.loader.parser
  34. return self.loader._parser
  35. @property
  36. def resolver(self):
  37. # type: () -> Any
  38. # assert self.loader._resolver is not None
  39. if hasattr(self.loader, 'typ'):
  40. self.loader.resolver
  41. return self.loader._resolver
  42. def check_node(self):
  43. # type: () -> Any
  44. # Drop the STREAM-START event.
  45. if self.parser.check_event(StreamStartEvent):
  46. self.parser.get_event()
  47. # If there are more documents available?
  48. return not self.parser.check_event(StreamEndEvent)
  49. def get_node(self):
  50. # type: () -> Any
  51. # Get the root node of the next document.
  52. if not self.parser.check_event(StreamEndEvent):
  53. return self.compose_document()
  54. def get_single_node(self):
  55. # type: () -> Any
  56. # Drop the STREAM-START event.
  57. self.parser.get_event()
  58. # Compose a document if the stream is not empty.
  59. document = None # type: Any
  60. if not self.parser.check_event(StreamEndEvent):
  61. document = self.compose_document()
  62. # Ensure that the stream contains no more documents.
  63. if not self.parser.check_event(StreamEndEvent):
  64. event = self.parser.get_event()
  65. raise ComposerError(
  66. 'expected a single document in the stream',
  67. document.start_mark,
  68. 'but found another document',
  69. event.start_mark,
  70. )
  71. # Drop the STREAM-END event.
  72. self.parser.get_event()
  73. return document
  74. def compose_document(self):
  75. # type: (Any) -> Any
  76. # Drop the DOCUMENT-START event.
  77. self.parser.get_event()
  78. # Compose the root node.
  79. node = self.compose_node(None, None)
  80. # Drop the DOCUMENT-END event.
  81. self.parser.get_event()
  82. self.anchors = {}
  83. return node
  84. def compose_node(self, parent, index):
  85. # type: (Any, Any) -> Any
  86. if self.parser.check_event(AliasEvent):
  87. event = self.parser.get_event()
  88. alias = event.anchor
  89. if alias not in self.anchors:
  90. raise ComposerError(
  91. None, None, 'found undefined alias %r' % utf8(alias), event.start_mark
  92. )
  93. return self.anchors[alias]
  94. event = self.parser.peek_event()
  95. anchor = event.anchor
  96. if anchor is not None: # have an anchor
  97. if anchor in self.anchors:
  98. # raise ComposerError(
  99. # "found duplicate anchor %r; first occurrence"
  100. # % utf8(anchor), self.anchors[anchor].start_mark,
  101. # "second occurrence", event.start_mark)
  102. ws = (
  103. '\nfound duplicate anchor {!r}\nfirst occurrence {}\nsecond occurrence '
  104. '{}'.format((anchor), self.anchors[anchor].start_mark, event.start_mark)
  105. )
  106. warnings.warn(ws, ReusedAnchorWarning)
  107. self.resolver.descend_resolver(parent, index)
  108. if self.parser.check_event(ScalarEvent):
  109. node = self.compose_scalar_node(anchor)
  110. elif self.parser.check_event(SequenceStartEvent):
  111. node = self.compose_sequence_node(anchor)
  112. elif self.parser.check_event(MappingStartEvent):
  113. node = self.compose_mapping_node(anchor)
  114. self.resolver.ascend_resolver()
  115. return node
  116. def compose_scalar_node(self, anchor):
  117. # type: (Any) -> Any
  118. event = self.parser.get_event()
  119. tag = event.tag
  120. if tag is None or tag == u'!':
  121. tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
  122. node = ScalarNode(
  123. tag,
  124. event.value,
  125. event.start_mark,
  126. event.end_mark,
  127. style=event.style,
  128. comment=event.comment,
  129. anchor=anchor,
  130. )
  131. if anchor is not None:
  132. self.anchors[anchor] = node
  133. return node
  134. def compose_sequence_node(self, anchor):
  135. # type: (Any) -> Any
  136. start_event = self.parser.get_event()
  137. tag = start_event.tag
  138. if tag is None or tag == u'!':
  139. tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
  140. node = SequenceNode(
  141. tag,
  142. [],
  143. start_event.start_mark,
  144. None,
  145. flow_style=start_event.flow_style,
  146. comment=start_event.comment,
  147. anchor=anchor,
  148. )
  149. if anchor is not None:
  150. self.anchors[anchor] = node
  151. index = 0
  152. while not self.parser.check_event(SequenceEndEvent):
  153. node.value.append(self.compose_node(node, index))
  154. index += 1
  155. end_event = self.parser.get_event()
  156. if node.flow_style is True and end_event.comment is not None:
  157. if node.comment is not None:
  158. nprint(
  159. 'Warning: unexpected end_event commment in sequence '
  160. 'node {}'.format(node.flow_style)
  161. )
  162. node.comment = end_event.comment
  163. node.end_mark = end_event.end_mark
  164. self.check_end_doc_comment(end_event, node)
  165. return node
  166. def compose_mapping_node(self, anchor):
  167. # type: (Any) -> Any
  168. start_event = self.parser.get_event()
  169. tag = start_event.tag
  170. if tag is None or tag == u'!':
  171. tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
  172. node = MappingNode(
  173. tag,
  174. [],
  175. start_event.start_mark,
  176. None,
  177. flow_style=start_event.flow_style,
  178. comment=start_event.comment,
  179. anchor=anchor,
  180. )
  181. if anchor is not None:
  182. self.anchors[anchor] = node
  183. while not self.parser.check_event(MappingEndEvent):
  184. # key_event = self.parser.peek_event()
  185. item_key = self.compose_node(node, None)
  186. # if item_key in node.value:
  187. # raise ComposerError("while composing a mapping",
  188. # start_event.start_mark,
  189. # "found duplicate key", key_event.start_mark)
  190. item_value = self.compose_node(node, item_key)
  191. # node.value[item_key] = item_value
  192. node.value.append((item_key, item_value))
  193. end_event = self.parser.get_event()
  194. if node.flow_style is True and end_event.comment is not None:
  195. node.comment = end_event.comment
  196. node.end_mark = end_event.end_mark
  197. self.check_end_doc_comment(end_event, node)
  198. return node
  199. def check_end_doc_comment(self, end_event, node):
  200. # type: (Any, Any) -> None
  201. if end_event.comment and end_event.comment[1]:
  202. # pre comments on an end_event, no following to move to
  203. if node.comment is None:
  204. node.comment = [None, None]
  205. assert not isinstance(node, ScalarEvent)
  206. # this is a post comment on a mapping node, add as third element
  207. # in the list
  208. node.comment.append(end_event.comment[1])
  209. end_event.comment[1] = None