composer.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # SPDX-License-Identifier: MIT
  2. __all__ = ['Composer', 'ComposerError']
  3. from error import MarkedYAMLError
  4. from events import *
  5. from nodes import *
  6. class ComposerError(MarkedYAMLError):
  7. pass
  8. class Composer(object):
  9. def __init__(self):
  10. self.anchors = {}
  11. def check_node(self):
  12. # Drop the STREAM-START event.
  13. if self.check_event(StreamStartEvent):
  14. self.get_event()
  15. # If there are more documents available?
  16. return not self.check_event(StreamEndEvent)
  17. def get_node(self):
  18. # Get the root node of the next document.
  19. if not self.check_event(StreamEndEvent):
  20. return self.compose_document()
  21. def get_single_node(self):
  22. # Drop the STREAM-START event.
  23. self.get_event()
  24. # Compose a document if the stream is not empty.
  25. document = None
  26. if not self.check_event(StreamEndEvent):
  27. document = self.compose_document()
  28. # Ensure that the stream contains no more documents.
  29. if not self.check_event(StreamEndEvent):
  30. event = self.get_event()
  31. raise ComposerError("expected a single document in the stream",
  32. document.start_mark, "but found another document",
  33. event.start_mark)
  34. # Drop the STREAM-END event.
  35. self.get_event()
  36. return document
  37. def compose_document(self):
  38. # Drop the DOCUMENT-START event.
  39. self.get_event()
  40. # Compose the root node.
  41. node = self.compose_node(None, None)
  42. # Drop the DOCUMENT-END event.
  43. self.get_event()
  44. self.anchors = {}
  45. return node
  46. def compose_node(self, parent, index):
  47. if self.check_event(AliasEvent):
  48. event = self.get_event()
  49. anchor = event.anchor
  50. if anchor not in self.anchors:
  51. raise ComposerError(None, None, "found undefined alias %r"
  52. % anchor.encode('utf-8'), event.start_mark)
  53. return self.anchors[anchor]
  54. event = self.peek_event()
  55. anchor = event.anchor
  56. if anchor is not None:
  57. if anchor in self.anchors:
  58. raise ComposerError("found duplicate anchor %r; first occurence"
  59. % anchor.encode('utf-8'), self.anchors[anchor].start_mark,
  60. "second occurence", event.start_mark)
  61. self.descend_resolver(parent, index)
  62. if self.check_event(ScalarEvent):
  63. node = self.compose_scalar_node(anchor)
  64. elif self.check_event(SequenceStartEvent):
  65. node = self.compose_sequence_node(anchor)
  66. elif self.check_event(MappingStartEvent):
  67. node = self.compose_mapping_node(anchor)
  68. self.ascend_resolver()
  69. return node
  70. def compose_scalar_node(self, anchor):
  71. event = self.get_event()
  72. tag = event.tag
  73. if tag is None or tag == u'!':
  74. tag = self.resolve(ScalarNode, event.value, event.implicit)
  75. node = ScalarNode(tag, event.value,
  76. event.start_mark, event.end_mark, style=event.style)
  77. if anchor is not None:
  78. self.anchors[anchor] = node
  79. return node
  80. def compose_sequence_node(self, anchor):
  81. start_event = self.get_event()
  82. tag = start_event.tag
  83. if tag is None or tag == u'!':
  84. tag = self.resolve(SequenceNode, None, start_event.implicit)
  85. node = SequenceNode(tag, [],
  86. start_event.start_mark, None,
  87. flow_style=start_event.flow_style)
  88. if anchor is not None:
  89. self.anchors[anchor] = node
  90. index = 0
  91. while not self.check_event(SequenceEndEvent):
  92. node.value.append(self.compose_node(node, index))
  93. index += 1
  94. end_event = self.get_event()
  95. node.end_mark = end_event.end_mark
  96. return node
  97. def compose_mapping_node(self, anchor):
  98. start_event = self.get_event()
  99. tag = start_event.tag
  100. if tag is None or tag == u'!':
  101. tag = self.resolve(MappingNode, None, start_event.implicit)
  102. node = MappingNode(tag, [],
  103. start_event.start_mark, None,
  104. flow_style=start_event.flow_style)
  105. if anchor is not None:
  106. self.anchors[anchor] = node
  107. while not self.check_event(MappingEndEvent):
  108. #key_event = self.peek_event()
  109. item_key = self.compose_node(node, None)
  110. #if item_key in node.value:
  111. # raise ComposerError("while composing a mapping", start_event.start_mark,
  112. # "found duplicate key", key_event.start_mark)
  113. item_value = self.compose_node(node, item_key)
  114. #node.value[item_key] = item_value
  115. node.value.append((item_key, item_value))
  116. end_event = self.get_event()
  117. node.end_mark = end_event.end_mark
  118. return node