composer.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. # coding: utf-8
  2. import warnings
  3. from ruamel.yaml.error import MarkedYAMLError, ReusedAnchorWarning
  4. from ruamel.yaml.compat import nprint, nprintf # NOQA
  5. from ruamel.yaml.events import (
  6. StreamStartEvent,
  7. StreamEndEvent,
  8. MappingStartEvent,
  9. MappingEndEvent,
  10. SequenceStartEvent,
  11. SequenceEndEvent,
  12. AliasEvent,
  13. ScalarEvent,
  14. )
  15. from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
  16. from typing import Any, Dict, Optional, List # NOQA
  17. __all__ = ['Composer', 'ComposerError']
  18. class ComposerError(MarkedYAMLError):
  19. pass
  20. class Composer:
  21. def __init__(self, loader: Any = None) -> None:
  22. self.loader = loader
  23. if self.loader is not None and getattr(self.loader, '_composer', None) is None:
  24. self.loader._composer = self
  25. self.anchors: Dict[Any, Any] = {}
  26. self.warn_double_anchors = True
  27. @property
  28. def parser(self) -> Any:
  29. if hasattr(self.loader, 'typ'):
  30. self.loader.parser
  31. return self.loader._parser
  32. @property
  33. def resolver(self) -> Any:
  34. # assert self.loader._resolver is not None
  35. if hasattr(self.loader, 'typ'):
  36. self.loader.resolver
  37. return self.loader._resolver
  38. def check_node(self) -> Any:
  39. # Drop the STREAM-START event.
  40. if self.parser.check_event(StreamStartEvent):
  41. self.parser.get_event()
  42. # If there are more documents available?
  43. return not self.parser.check_event(StreamEndEvent)
  44. def get_node(self) -> Any:
  45. # Get the root node of the next document.
  46. if not self.parser.check_event(StreamEndEvent):
  47. return self.compose_document()
  48. def get_single_node(self) -> Any:
  49. # Drop the STREAM-START event.
  50. self.parser.get_event()
  51. # Compose a document if the stream is not empty.
  52. document: Any = None
  53. if not self.parser.check_event(StreamEndEvent):
  54. document = self.compose_document()
  55. # Ensure that the stream contains no more documents.
  56. if not self.parser.check_event(StreamEndEvent):
  57. event = self.parser.get_event()
  58. raise ComposerError(
  59. 'expected a single document in the stream',
  60. document.start_mark,
  61. 'but found another document',
  62. event.start_mark,
  63. )
  64. # Drop the STREAM-END event.
  65. self.parser.get_event()
  66. return document
  67. def compose_document(self: Any) -> Any:
  68. # Drop the DOCUMENT-START event.
  69. self.parser.get_event()
  70. # Compose the root node.
  71. node = self.compose_node(None, None)
  72. # Drop the DOCUMENT-END event.
  73. self.parser.get_event()
  74. self.anchors = {}
  75. return node
  76. def return_alias(self, a: Any) -> Any:
  77. return a
  78. def compose_node(self, parent: Any, index: Any) -> Any:
  79. if self.parser.check_event(AliasEvent):
  80. event = self.parser.get_event()
  81. alias = event.anchor
  82. if alias not in self.anchors:
  83. raise ComposerError(
  84. None, None, f'found undefined alias {alias!r}', event.start_mark,
  85. )
  86. return self.return_alias(self.anchors[alias])
  87. event = self.parser.peek_event()
  88. anchor = event.anchor
  89. if anchor is not None: # have an anchor
  90. if self.warn_double_anchors and anchor in self.anchors:
  91. ws = (
  92. f'\nfound duplicate anchor {anchor!r}\n'
  93. f'first occurrence {self.anchors[anchor].start_mark}\n'
  94. f'second occurrence {event.start_mark}'
  95. )
  96. warnings.warn(ws, ReusedAnchorWarning, stacklevel=2)
  97. self.resolver.descend_resolver(parent, index)
  98. if self.parser.check_event(ScalarEvent):
  99. node = self.compose_scalar_node(anchor)
  100. elif self.parser.check_event(SequenceStartEvent):
  101. node = self.compose_sequence_node(anchor)
  102. elif self.parser.check_event(MappingStartEvent):
  103. node = self.compose_mapping_node(anchor)
  104. self.resolver.ascend_resolver()
  105. return node
  106. def compose_scalar_node(self, anchor: Any) -> Any:
  107. event = self.parser.get_event()
  108. tag = event.ctag
  109. if tag is None or str(tag) == '!':
  110. tag = self.resolver.resolve(ScalarNode, event.value, event.implicit)
  111. assert not isinstance(tag, str)
  112. # e.g tag.yaml.org,2002:str
  113. node = ScalarNode(
  114. tag,
  115. event.value,
  116. event.start_mark,
  117. event.end_mark,
  118. style=event.style,
  119. comment=event.comment,
  120. anchor=anchor,
  121. )
  122. if anchor is not None:
  123. self.anchors[anchor] = node
  124. return node
  125. def compose_sequence_node(self, anchor: Any) -> Any:
  126. start_event = self.parser.get_event()
  127. tag = start_event.ctag
  128. if tag is None or str(tag) == '!':
  129. tag = self.resolver.resolve(SequenceNode, None, start_event.implicit)
  130. assert not isinstance(tag, str)
  131. node = SequenceNode(
  132. tag,
  133. [],
  134. start_event.start_mark,
  135. None,
  136. flow_style=start_event.flow_style,
  137. comment=start_event.comment,
  138. anchor=anchor,
  139. )
  140. if anchor is not None:
  141. self.anchors[anchor] = node
  142. index = 0
  143. while not self.parser.check_event(SequenceEndEvent):
  144. node.value.append(self.compose_node(node, index))
  145. index += 1
  146. end_event = self.parser.get_event()
  147. if node.flow_style is True and end_event.comment is not None:
  148. if node.comment is not None:
  149. x = node.flow_style
  150. nprint(
  151. f'Warning: unexpected end_event commment in sequence node {x}',
  152. )
  153. node.comment = end_event.comment
  154. node.end_mark = end_event.end_mark
  155. self.check_end_doc_comment(end_event, node)
  156. return node
  157. def compose_mapping_node(self, anchor: Any) -> Any:
  158. start_event = self.parser.get_event()
  159. tag = start_event.ctag
  160. if tag is None or str(tag) == '!':
  161. tag = self.resolver.resolve(MappingNode, None, start_event.implicit)
  162. assert not isinstance(tag, str)
  163. node = MappingNode(
  164. tag,
  165. [],
  166. start_event.start_mark,
  167. None,
  168. flow_style=start_event.flow_style,
  169. comment=start_event.comment,
  170. anchor=anchor,
  171. )
  172. if anchor is not None:
  173. self.anchors[anchor] = node
  174. while not self.parser.check_event(MappingEndEvent):
  175. # key_event = self.parser.peek_event()
  176. item_key = self.compose_node(node, None)
  177. # if item_key in node.value:
  178. # raise ComposerError("while composing a mapping",
  179. # start_event.start_mark,
  180. # "found duplicate key", key_event.start_mark)
  181. item_value = self.compose_node(node, item_key)
  182. # node.value[item_key] = item_value
  183. node.value.append((item_key, item_value))
  184. end_event = self.parser.get_event()
  185. if node.flow_style is True and end_event.comment is not None:
  186. node.comment = end_event.comment
  187. node.end_mark = end_event.end_mark
  188. self.check_end_doc_comment(end_event, node)
  189. return node
  190. def check_end_doc_comment(self, end_event: Any, node: Any) -> None:
  191. if end_event.comment and end_event.comment[1]:
  192. # pre comments on an end_event, no following to move to
  193. if node.comment is None:
  194. node.comment = [None, None]
  195. assert not isinstance(node, ScalarEvent)
  196. # this is a post comment on a mapping node, add as third element
  197. # in the list
  198. node.comment.append(end_event.comment[1])
  199. end_event.comment[1] = None