serializer.py 8.1 KB


  1. # coding: utf-8
  2. from ruamel.yaml.error import YAMLError
  3. from ruamel.yaml.compat import nprint, DBG_NODE, dbg, nprintf # NOQA
  4. from ruamel.yaml.util import RegExp
  5. from ruamel.yaml.events import (
  6. StreamStartEvent,
  7. StreamEndEvent,
  8. MappingStartEvent,
  9. MappingEndEvent,
  10. SequenceStartEvent,
  11. SequenceEndEvent,
  12. AliasEvent,
  13. ScalarEvent,
  14. DocumentStartEvent,
  15. DocumentEndEvent,
  16. )
  17. from ruamel.yaml.nodes import MappingNode, ScalarNode, SequenceNode
  18. from typing import Any, Dict, Union, Text, Optional # NOQA
  19. from ruamel.yaml.compat import VersionType # NOQA
  20. __all__ = ['Serializer', 'SerializerError']
  21. class SerializerError(YAMLError):
  22. pass
  23. class Serializer:
  24. # 'id' and 3+ numbers, but not 000
  25. ANCHOR_TEMPLATE = 'id{:03d}'
  26. ANCHOR_RE = RegExp('id(?!000$)\\d{3,}')
  27. def __init__(
  28. self,
  29. encoding: Any = None,
  30. explicit_start: Optional[bool] = None,
  31. explicit_end: Optional[bool] = None,
  32. version: Optional[VersionType] = None,
  33. tags: Any = None,
  34. dumper: Any = None,
  35. ) -> None:
  36. # NOQA
  37. self.dumper = dumper
  38. if self.dumper is not None:
  39. self.dumper._serializer = self
  40. self.use_encoding = encoding
  41. self.use_explicit_start = explicit_start
  42. self.use_explicit_end = explicit_end
  43. if isinstance(version, str):
  44. self.use_version = tuple(map(int, version.split('.')))
  45. else:
  46. self.use_version = version # type: ignore
  47. self.use_tags = tags
  48. self.serialized_nodes: Dict[Any, Any] = {}
  49. self.anchors: Dict[Any, Any] = {}
  50. self.last_anchor_id = 0
  51. self.closed: Optional[bool] = None
  52. self._templated_id = None
  53. @property
  54. def emitter(self) -> Any:
  55. if hasattr(self.dumper, 'typ'):
  56. return self.dumper.emitter
  57. return self.dumper._emitter
  58. @property
  59. def resolver(self) -> Any:
  60. if hasattr(self.dumper, 'typ'):
  61. self.dumper.resolver
  62. return self.dumper._resolver
  63. def open(self) -> None:
  64. if self.closed is None:
  65. self.emitter.emit(StreamStartEvent(encoding=self.use_encoding))
  66. self.closed = False
  67. elif self.closed:
  68. raise SerializerError('serializer is closed')
  69. else:
  70. raise SerializerError('serializer is already opened')
  71. def close(self) -> None:
  72. if self.closed is None:
  73. raise SerializerError('serializer is not opened')
  74. elif not self.closed:
  75. self.emitter.emit(StreamEndEvent())
  76. self.closed = True
  77. # def __del__(self):
  78. # self.close()
  79. def serialize(self, node: Any) -> None:
  80. if dbg(DBG_NODE):
  81. nprint('Serializing nodes')
  82. node.dump()
  83. if self.closed is None:
  84. raise SerializerError('serializer is not opened')
  85. elif self.closed:
  86. raise SerializerError('serializer is closed')
  87. self.emitter.emit(
  88. DocumentStartEvent(
  89. explicit=self.use_explicit_start, version=self.use_version, tags=self.use_tags,
  90. ),
  91. )
  92. self.anchor_node(node)
  93. self.serialize_node(node, None, None)
  94. self.emitter.emit(DocumentEndEvent(explicit=self.use_explicit_end))
  95. self.serialized_nodes = {}
  96. self.anchors = {}
  97. self.last_anchor_id = 0
  98. def anchor_node(self, node: Any) -> None:
  99. if node in self.anchors:
  100. if self.anchors[node] is None:
  101. self.anchors[node] = self.generate_anchor(node)
  102. else:
  103. anchor = None
  104. try:
  105. if node.anchor.always_dump:
  106. anchor = node.anchor.value
  107. except: # NOQA
  108. pass
  109. self.anchors[node] = anchor
  110. if isinstance(node, SequenceNode):
  111. for item in node.value:
  112. self.anchor_node(item)
  113. elif isinstance(node, MappingNode):
  114. for key, value in node.value:
  115. self.anchor_node(key)
  116. self.anchor_node(value)
  117. def generate_anchor(self, node: Any) -> Any:
  118. try:
  119. anchor = node.anchor.value
  120. except: # NOQA
  121. anchor = None
  122. if anchor is None:
  123. self.last_anchor_id += 1
  124. return self.ANCHOR_TEMPLATE.format(self.last_anchor_id)
  125. return anchor
  126. def serialize_node(self, node: Any, parent: Any, index: Any) -> None:
  127. alias = self.anchors[node]
  128. if node in self.serialized_nodes:
  129. node_style = getattr(node, 'style', None)
  130. if node_style != '?':
  131. node_style = None
  132. self.emitter.emit(AliasEvent(alias, style=node_style))
  133. else:
  134. self.serialized_nodes[node] = True
  135. self.resolver.descend_resolver(parent, index)
  136. if isinstance(node, ScalarNode):
  137. # here check if the node.tag equals the one that would result from parsing
  138. # if not equal quoting is necessary for strings
  139. detected_tag = self.resolver.resolve(ScalarNode, node.value, (True, False))
  140. default_tag = self.resolver.resolve(ScalarNode, node.value, (False, True))
  141. implicit = (
  142. (node.ctag == detected_tag),
  143. (node.ctag == default_tag),
  144. node.tag.startswith('tag:yaml.org,2002:'), # type: ignore
  145. )
  146. self.emitter.emit(
  147. ScalarEvent(
  148. alias,
  149. node.ctag,
  150. implicit,
  151. node.value,
  152. style=node.style,
  153. comment=node.comment,
  154. ),
  155. )
  156. elif isinstance(node, SequenceNode):
  157. implicit = node.ctag == self.resolver.resolve(SequenceNode, node.value, True)
  158. comment = node.comment
  159. end_comment = None
  160. seq_comment = None
  161. if node.flow_style is True:
  162. if comment: # eol comment on flow style sequence
  163. seq_comment = comment[0]
  164. # comment[0] = None
  165. if comment and len(comment) > 2:
  166. end_comment = comment[2]
  167. else:
  168. end_comment = None
  169. self.emitter.emit(
  170. SequenceStartEvent(
  171. alias,
  172. node.ctag,
  173. implicit,
  174. flow_style=node.flow_style,
  175. comment=node.comment,
  176. ),
  177. )
  178. index = 0
  179. for item in node.value:
  180. self.serialize_node(item, node, index)
  181. index += 1
  182. self.emitter.emit(SequenceEndEvent(comment=[seq_comment, end_comment]))
  183. elif isinstance(node, MappingNode):
  184. implicit = node.ctag == self.resolver.resolve(MappingNode, node.value, True)
  185. comment = node.comment
  186. end_comment = None
  187. map_comment = None
  188. if node.flow_style is True:
  189. if comment: # eol comment on flow style sequence
  190. map_comment = comment[0]
  191. # comment[0] = None
  192. if comment and len(comment) > 2:
  193. end_comment = comment[2]
  194. self.emitter.emit(
  195. MappingStartEvent(
  196. alias,
  197. node.ctag,
  198. implicit,
  199. flow_style=node.flow_style,
  200. comment=node.comment,
  201. nr_items=len(node.value),
  202. ),
  203. )
  204. for key, value in node.value:
  205. self.serialize_node(key, node, None)
  206. self.serialize_node(value, node, key)
  207. self.emitter.emit(MappingEndEvent(comment=[map_comment, end_comment]))
  208. self.resolver.ascend_resolver()
  209. def templated_id(s: Text) -> Any:
  210. return Serializer.ANCHOR_RE.match(s)