emitter.py 67 KB


  1. # coding: utf-8
  2. # Emitter expects events obeying the following grammar:
  3. # stream ::= STREAM-START document* STREAM-END
  4. # document ::= DOCUMENT-START node DOCUMENT-END
  5. # node ::= SCALAR | sequence | mapping
  6. # sequence ::= SEQUENCE-START node* SEQUENCE-END
  7. # mapping ::= MAPPING-START (node node)* MAPPING-END
  8. import sys
  9. from ruamel.yaml.error import YAMLError, YAMLStreamError
  10. from ruamel.yaml.events import * # NOQA
  11. # fmt: off
  12. from ruamel.yaml.compat import nprint, dbg, DBG_EVENT, \
  13. check_anchorname_char, nprintf # NOQA
  14. # fmt: on
  15. from typing import Any, Dict, List, Union, Text, Tuple, Optional # NOQA
  16. from ruamel.yaml.compat import StreamType # NOQA
  17. __all__ = ['Emitter', 'EmitterError']
  18. class EmitterError(YAMLError):
  19. pass
  20. class ScalarAnalysis:
  21. def __init__(
  22. self,
  23. scalar: Any,
  24. empty: Any,
  25. multiline: Any,
  26. allow_flow_plain: bool,
  27. allow_block_plain: bool,
  28. allow_single_quoted: bool,
  29. allow_double_quoted: bool,
  30. allow_block: bool,
  31. ) -> None:
  32. self.scalar = scalar
  33. self.empty = empty
  34. self.multiline = multiline
  35. self.allow_flow_plain = allow_flow_plain
  36. self.allow_block_plain = allow_block_plain
  37. self.allow_single_quoted = allow_single_quoted
  38. self.allow_double_quoted = allow_double_quoted
  39. self.allow_block = allow_block
  40. def __repr__(self) -> str:
  41. return f'scalar={self.scalar!r}, empty={self.empty}, multiline={self.multiline}, allow_flow_plain={self.allow_flow_plain}, allow_block_plain={self.allow_block_plain}, allow_single_quoted={self.allow_single_quoted}, allow_double_quoted={self.allow_double_quoted}, allow_block={self.allow_block}' # NOQA
  42. class Indents:
  43. # replacement for the list based stack of None/int
  44. def __init__(self) -> None:
  45. self.values: List[Tuple[Any, bool]] = []
  46. def append(self, val: Any, seq: Any) -> None:
  47. self.values.append((val, seq))
  48. def pop(self) -> Any:
  49. return self.values.pop()[0]
  50. def last_seq(self) -> bool:
  51. # return the seq(uence) value for the element added before the last one
  52. # in increase_indent()
  53. try:
  54. return self.values[-2][1]
  55. except IndexError:
  56. return False
  57. def seq_flow_align(
  58. self, seq_indent: int, column: int, pre_comment: Optional[bool] = False,
  59. ) -> int:
  60. # extra spaces because of dash
  61. # nprint('seq_flow_align', self.values, pre_comment)
  62. if len(self.values) < 2 or not self.values[-1][1]:
  63. if len(self.values) == 0 or not pre_comment:
  64. return 0
  65. base = self.values[-1][0] if self.values[-1][0] is not None else 0
  66. if pre_comment:
  67. return base + seq_indent # type: ignore
  68. # return (len(self.values)) * seq_indent
  69. # -1 for the dash
  70. return base + seq_indent - column - 1 # type: ignore
  71. def __len__(self) -> int:
  72. return len(self.values)
  73. class Emitter:
  74. # fmt: off
  75. DEFAULT_TAG_PREFIXES = {
  76. '!': '!',
  77. 'tag:yaml.org,2002:': '!!',
  78. '!!': '!!',
  79. }
  80. # fmt: on
  81. MAX_SIMPLE_KEY_LENGTH = 128
  82. flow_seq_start = '['
  83. flow_seq_end = ']'
  84. flow_seq_separator = ','
  85. flow_map_start = '{'
  86. flow_map_end = '}'
  87. flow_map_separator = ','
  88. def __init__(
  89. self,
  90. stream: StreamType,
  91. canonical: Any = None,
  92. indent: Optional[int] = None,
  93. width: Optional[int] = None,
  94. allow_unicode: Optional[bool] = None,
  95. line_break: Any = None,
  96. block_seq_indent: Optional[int] = None,
  97. top_level_colon_align: Optional[bool] = None,
  98. prefix_colon: Any = None,
  99. brace_single_entry_mapping_in_flow_sequence: Optional[bool] = None,
  100. dumper: Any = None,
  101. ) -> None:
  102. # NOQA
  103. self.dumper = dumper
  104. if self.dumper is not None and getattr(self.dumper, '_emitter', None) is None:
  105. self.dumper._emitter = self
  106. self.stream = stream
  107. # Encoding can be overriden by STREAM-START.
  108. self.encoding: Optional[Text] = None
  109. self.allow_space_break = None
  110. # Emitter is a state machine with a stack of states to handle nested
  111. # structures.
  112. self.states: List[Any] = []
  113. self.state: Any = self.expect_stream_start
  114. # Current event and the event queue.
  115. self.events: List[Any] = []
  116. self.event: Any = None
  117. # The current indentation level and the stack of previous indents.
  118. self.indents = Indents()
  119. self.indent: Optional[int] = None
  120. # flow_context is an expanding/shrinking list consisting of '{' and '['
  121. # for each unclosed flow context. If empty list that means block context
  122. self.flow_context: List[Text] = []
  123. # Contexts.
  124. self.root_context = False
  125. self.sequence_context = False
  126. self.mapping_context = False
  127. self.simple_key_context = False
  128. # Characteristics of the last emitted character:
  129. # - current position.
  130. # - is it a whitespace?
  131. # - is it an indention character
  132. # (indentation space, '-', '?', or ':')?
  133. self.line = 0
  134. self.column = 0
  135. self.whitespace = True
  136. self.indention = True
  137. self.compact_seq_seq = True # dash after dash
  138. self.compact_seq_map = True # key after dash
  139. # self.compact_ms = False # dash after key, only when excplicit key with ?
  140. self.no_newline: Optional[bool] = None # set if directly after `- `
  141. # Whether the document requires an explicit document end indicator
  142. self.open_ended = False
  143. # colon handling
  144. self.colon = ':'
  145. self.prefixed_colon = self.colon if prefix_colon is None else prefix_colon + self.colon
  146. # single entry mappings in flow sequence
  147. self.brace_single_entry_mapping_in_flow_sequence = (
  148. brace_single_entry_mapping_in_flow_sequence # NOQA
  149. )
  150. # Formatting details.
  151. self.canonical = canonical
  152. self.allow_unicode = allow_unicode
  153. # set to False to get "\Uxxxxxxxx" for non-basic unicode like emojis
  154. self.unicode_supplementary = sys.maxunicode > 0xFFFF
  155. self.sequence_dash_offset = block_seq_indent if block_seq_indent else 0
  156. self.top_level_colon_align = top_level_colon_align
  157. self.best_sequence_indent = 2
  158. self.requested_indent = indent # specific for literal zero indent
  159. if indent and 1 < indent < 10:
  160. self.best_sequence_indent = indent
  161. self.best_map_indent = self.best_sequence_indent
  162. # if self.best_sequence_indent < self.sequence_dash_offset + 1:
  163. # self.best_sequence_indent = self.sequence_dash_offset + 1
  164. self.best_width = 80
  165. if width and width > self.best_sequence_indent * 2:
  166. self.best_width = width
  167. self.best_line_break: Any = '\n'
  168. if line_break in ['\r', '\n', '\r\n']:
  169. self.best_line_break = line_break
  170. # Tag prefixes.
  171. self.tag_prefixes: Any = None
  172. # Prepared anchor and tag.
  173. self.prepared_anchor: Any = None
  174. self.prepared_tag: Any = None
  175. # Scalar analysis and style.
  176. self.analysis: Any = None
  177. self.style: Any = None
  178. self.scalar_after_indicator = True # write a scalar on the same line as `---`
  179. self.alt_null = 'null'
  180. @property
  181. def stream(self) -> Any:
  182. try:
  183. return self._stream
  184. except AttributeError:
  185. raise YAMLStreamError('output stream needs to be specified')
  186. @stream.setter
  187. def stream(self, val: Any) -> None:
  188. if val is None:
  189. return
  190. if not hasattr(val, 'write'):
  191. raise YAMLStreamError('stream argument needs to have a write() method')
  192. self._stream = val
  193. @property
  194. def serializer(self) -> Any:
  195. try:
  196. if hasattr(self.dumper, 'typ'):
  197. return self.dumper.serializer
  198. return self.dumper._serializer
  199. except AttributeError:
  200. return self # cyaml
  201. @property
  202. def flow_level(self) -> int:
  203. return len(self.flow_context)
  204. def dispose(self) -> None:
  205. # Reset the state attributes (to clear self-references)
  206. self.states = []
  207. self.state = None
  208. def emit(self, event: Any) -> None:
  209. if dbg(DBG_EVENT):
  210. nprint(event)
  211. self.events.append(event)
  212. while not self.need_more_events():
  213. self.event = self.events.pop(0)
  214. self.state()
  215. self.event = None
  216. # In some cases, we wait for a few next events before emitting.
  217. def need_more_events(self) -> bool:
  218. if not self.events:
  219. return True
  220. event = self.events[0]
  221. if isinstance(event, DocumentStartEvent):
  222. return self.need_events(1)
  223. elif isinstance(event, SequenceStartEvent):
  224. return self.need_events(2)
  225. elif isinstance(event, MappingStartEvent):
  226. return self.need_events(3)
  227. else:
  228. return False
  229. def need_events(self, count: int) -> bool:
  230. level = 0
  231. for event in self.events[1:]:
  232. if isinstance(event, (DocumentStartEvent, CollectionStartEvent)):
  233. level += 1
  234. elif isinstance(event, (DocumentEndEvent, CollectionEndEvent)):
  235. level -= 1
  236. elif isinstance(event, StreamEndEvent):
  237. level = -1
  238. if level < 0:
  239. return False
  240. return len(self.events) < count + 1
  241. def increase_indent(
  242. self, flow: bool = False, sequence: Optional[bool] = None, indentless: bool = False,
  243. ) -> None:
  244. self.indents.append(self.indent, sequence)
  245. if self.indent is None: # top level
  246. if flow:
  247. # self.indent = self.best_sequence_indent if self.indents.last_seq() else \
  248. # self.best_map_indent
  249. # self.indent = self.best_sequence_indent
  250. self.indent = self.requested_indent
  251. else:
  252. self.indent = 0
  253. elif not indentless:
  254. self.indent += (
  255. self.best_sequence_indent if self.indents.last_seq() else self.best_map_indent
  256. )
  257. # if self.indents.last_seq():
  258. # if self.indent == 0: # top level block sequence
  259. # self.indent = self.best_sequence_indent - self.sequence_dash_offset
  260. # else:
  261. # self.indent += self.best_sequence_indent
  262. # else:
  263. # self.indent += self.best_map_indent
  264. # States.
  265. # Stream handlers.
  266. def expect_stream_start(self) -> None:
  267. if isinstance(self.event, StreamStartEvent):
  268. if self.event.encoding and not hasattr(self.stream, 'encoding'):
  269. self.encoding = self.event.encoding
  270. self.write_stream_start()
  271. self.state = self.expect_first_document_start
  272. else:
  273. raise EmitterError(f'expected StreamStartEvent, but got {self.event!s}')
  274. def expect_nothing(self) -> None:
  275. raise EmitterError(f'expected nothing, but got {self.event!s}')
  276. # Document handlers.
  277. def expect_first_document_start(self) -> Any:
  278. return self.expect_document_start(first=True)
  279. def expect_document_start(self, first: bool = False) -> None:
  280. if isinstance(self.event, DocumentStartEvent):
  281. if (self.event.version or self.event.tags) and self.open_ended:
  282. self.write_indicator('...', True)
  283. self.write_indent()
  284. if self.event.version:
  285. version_text = self.prepare_version(self.event.version)
  286. self.write_version_directive(version_text)
  287. self.tag_prefixes = self.DEFAULT_TAG_PREFIXES.copy()
  288. if self.event.tags:
  289. handles = sorted(self.event.tags.keys())
  290. for handle in handles:
  291. prefix = self.event.tags[handle]
  292. self.tag_prefixes[prefix] = handle
  293. handle_text = self.prepare_tag_handle(handle)
  294. prefix_text = self.prepare_tag_prefix(prefix)
  295. self.write_tag_directive(handle_text, prefix_text)
  296. implicit = (
  297. first
  298. and not self.event.explicit
  299. and not self.canonical
  300. and not self.event.version
  301. and not self.event.tags
  302. and not self.check_empty_document()
  303. )
  304. if not implicit:
  305. self.write_indent()
  306. self.write_indicator('---', True)
  307. if self.canonical:
  308. self.write_indent()
  309. self.state = self.expect_document_root
  310. elif isinstance(self.event, StreamEndEvent):
  311. if self.open_ended:
  312. self.write_indicator('...', True)
  313. self.write_indent()
  314. self.write_stream_end()
  315. self.state = self.expect_nothing
  316. else:
  317. raise EmitterError(f'expected DocumentStartEvent, but got {self.event!s}')
  318. def expect_document_end(self) -> None:
  319. if isinstance(self.event, DocumentEndEvent):
  320. self.write_indent()
  321. if self.event.explicit:
  322. self.write_indicator('...', True)
  323. self.write_indent()
  324. self.flush_stream()
  325. self.state = self.expect_document_start
  326. else:
  327. raise EmitterError(f'expected DocumentEndEvent, but got {self.event!s}')
  328. def expect_document_root(self) -> None:
  329. self.states.append(self.expect_document_end)
  330. self.expect_node(root=True)
  331. # Node handlers.
  332. def expect_node(
  333. self,
  334. root: bool = False,
  335. sequence: bool = False,
  336. mapping: bool = False,
  337. simple_key: bool = False,
  338. ) -> None:
  339. self.root_context = root
  340. self.sequence_context = sequence # not used in PyYAML
  341. force_flow_indent = False
  342. self.mapping_context = mapping
  343. self.simple_key_context = simple_key
  344. if isinstance(self.event, AliasEvent):
  345. self.expect_alias()
  346. elif isinstance(self.event, (ScalarEvent, CollectionStartEvent)):
  347. if (
  348. self.process_anchor('&')
  349. and isinstance(self.event, ScalarEvent)
  350. and self.sequence_context
  351. ):
  352. self.sequence_context = False
  353. if (
  354. root
  355. and isinstance(self.event, ScalarEvent)
  356. and not self.scalar_after_indicator
  357. ):
  358. self.write_indent()
  359. self.process_tag()
  360. if isinstance(self.event, ScalarEvent):
  361. # nprint('@', self.indention, self.no_newline, self.column)
  362. self.expect_scalar()
  363. elif isinstance(self.event, SequenceStartEvent):
  364. # nprint('@', self.indention, self.no_newline, self.column)
  365. i2, n2 = self.indention, self.no_newline # NOQA
  366. if self.event.comment:
  367. if self.event.flow_style is False:
  368. if self.write_post_comment(self.event):
  369. self.indention = False
  370. self.no_newline = True
  371. if self.event.flow_style:
  372. column = self.column
  373. if self.write_pre_comment(self.event):
  374. if self.event.flow_style:
  375. # force_flow_indent = True
  376. force_flow_indent = not self.indents.values[-1][1]
  377. self.indention = i2
  378. self.no_newline = not self.indention
  379. if self.event.flow_style:
  380. self.column = column
  381. if (
  382. self.flow_level
  383. or self.canonical
  384. or self.event.flow_style
  385. or self.check_empty_sequence()
  386. ):
  387. self.expect_flow_sequence(force_flow_indent)
  388. else:
  389. self.expect_block_sequence()
  390. elif isinstance(self.event, MappingStartEvent):
  391. if self.event.flow_style is False and self.event.comment:
  392. self.write_post_comment(self.event)
  393. if self.event.comment and self.event.comment[1]:
  394. self.write_pre_comment(self.event)
  395. if self.event.flow_style and self.indents.values:
  396. force_flow_indent = not self.indents.values[-1][1]
  397. if (
  398. self.flow_level
  399. or self.canonical
  400. or self.event.flow_style
  401. or self.check_empty_mapping()
  402. ):
  403. self.expect_flow_mapping(
  404. single=self.event.nr_items == 1, force_flow_indent=force_flow_indent,
  405. )
  406. else:
  407. self.expect_block_mapping()
  408. else:
  409. raise EmitterError('expected NodeEvent, but got {self.event!s}')
  410. def expect_alias(self) -> None:
  411. if self.event.anchor is None:
  412. raise EmitterError('anchor is not specified for alias')
  413. self.process_anchor('*')
  414. self.state = self.states.pop()
  415. def expect_scalar(self) -> None:
  416. self.increase_indent(flow=True)
  417. self.process_scalar()
  418. self.indent = self.indents.pop()
  419. self.state = self.states.pop()
  420. # Flow sequence handlers.
  421. def expect_flow_sequence(self, force_flow_indent: Optional[bool] = False) -> None:
  422. if force_flow_indent:
  423. self.increase_indent(flow=True, sequence=True)
  424. ind = self.indents.seq_flow_align(
  425. self.best_sequence_indent, self.column, force_flow_indent,
  426. )
  427. self.write_indicator(' ' * ind + self.flow_seq_start, True, whitespace=True)
  428. if not force_flow_indent:
  429. self.increase_indent(flow=True, sequence=True)
  430. self.flow_context.append('[')
  431. self.state = self.expect_first_flow_sequence_item
  432. def expect_first_flow_sequence_item(self) -> None:
  433. if isinstance(self.event, SequenceEndEvent):
  434. self.indent = self.indents.pop()
  435. popped = self.flow_context.pop()
  436. assert popped == '['
  437. self.write_indicator(self.flow_seq_end, False)
  438. if self.event.comment and self.event.comment[0]:
  439. # eol comment on empty flow sequence
  440. self.write_post_comment(self.event)
  441. elif self.flow_level == 0:
  442. self.write_line_break()
  443. self.state = self.states.pop()
  444. else:
  445. if self.canonical or self.column > self.best_width:
  446. self.write_indent()
  447. self.states.append(self.expect_flow_sequence_item)
  448. self.expect_node(sequence=True)
  449. def expect_flow_sequence_item(self) -> None:
  450. if isinstance(self.event, SequenceEndEvent):
  451. self.indent = self.indents.pop()
  452. popped = self.flow_context.pop()
  453. assert popped == '['
  454. if self.canonical:
  455. # ToDo: so-39595807, maybe add a space to the flow_seq_separator
  456. # and strip the last space, if space then indent, else do not
  457. # not sure that [1,2,3] is a valid YAML seq
  458. self.write_indicator(self.flow_seq_separator, False)
  459. self.write_indent()
  460. self.write_indicator(self.flow_seq_end, False)
  461. if self.event.comment and self.event.comment[0]:
  462. # eol comment on flow sequence
  463. self.write_post_comment(self.event)
  464. else:
  465. self.no_newline = False
  466. self.state = self.states.pop()
  467. else:
  468. self.write_indicator(self.flow_seq_separator, False)
  469. if self.canonical or self.column > self.best_width:
  470. self.write_indent()
  471. self.states.append(self.expect_flow_sequence_item)
  472. self.expect_node(sequence=True)
  473. # Flow mapping handlers.
  474. def expect_flow_mapping(
  475. self, single: Optional[bool] = False, force_flow_indent: Optional[bool] = False,
  476. ) -> None:
  477. if force_flow_indent:
  478. self.increase_indent(flow=True, sequence=False)
  479. ind = self.indents.seq_flow_align(
  480. self.best_sequence_indent, self.column, force_flow_indent,
  481. )
  482. map_init = self.flow_map_start
  483. if (
  484. single
  485. and self.flow_level
  486. and self.flow_context[-1] == '['
  487. and not self.canonical
  488. and not self.brace_single_entry_mapping_in_flow_sequence
  489. ):
  490. # single map item with flow context, no curly braces necessary
  491. map_init = ''
  492. self.write_indicator(' ' * ind + map_init, True, whitespace=True)
  493. self.flow_context.append(map_init)
  494. if not force_flow_indent:
  495. self.increase_indent(flow=True, sequence=False)
  496. self.state = self.expect_first_flow_mapping_key
  497. def expect_first_flow_mapping_key(self) -> None:
  498. if isinstance(self.event, MappingEndEvent):
  499. self.indent = self.indents.pop()
  500. popped = self.flow_context.pop()
  501. assert popped == '{' # empty flow mapping
  502. self.write_indicator(self.flow_map_end, False)
  503. if self.event.comment and self.event.comment[0]:
  504. # eol comment on empty mapping
  505. self.write_post_comment(self.event)
  506. elif self.flow_level == 0:
  507. self.write_line_break()
  508. self.state = self.states.pop()
  509. else:
  510. if self.canonical or self.column > self.best_width:
  511. self.write_indent()
  512. if not self.canonical and self.check_simple_key():
  513. self.states.append(self.expect_flow_mapping_simple_value)
  514. self.expect_node(mapping=True, simple_key=True)
  515. else:
  516. self.write_indicator('?', True)
  517. self.states.append(self.expect_flow_mapping_value)
  518. self.expect_node(mapping=True)
  519. def expect_flow_mapping_key(self) -> None:
  520. if isinstance(self.event, MappingEndEvent):
  521. # if self.event.comment and self.event.comment[1]:
  522. # self.write_pre_comment(self.event)
  523. self.indent = self.indents.pop()
  524. popped = self.flow_context.pop()
  525. assert popped in ['{', '']
  526. if self.canonical:
  527. self.write_indicator(self.flow_map_separator, False)
  528. self.write_indent()
  529. if popped != '':
  530. self.write_indicator(self.flow_map_end, False)
  531. if self.event.comment and self.event.comment[0]:
  532. # eol comment on flow mapping, never reached on empty mappings
  533. self.write_post_comment(self.event)
  534. else:
  535. self.no_newline = False
  536. self.state = self.states.pop()
  537. else:
  538. self.write_indicator(self.flow_map_separator, False)
  539. if self.canonical or self.column > self.best_width:
  540. self.write_indent()
  541. if not self.canonical and self.check_simple_key():
  542. self.states.append(self.expect_flow_mapping_simple_value)
  543. self.expect_node(mapping=True, simple_key=True)
  544. else:
  545. self.write_indicator('?', True)
  546. self.states.append(self.expect_flow_mapping_value)
  547. self.expect_node(mapping=True)
  548. def expect_flow_mapping_simple_value(self) -> None:
  549. if getattr(self.event, 'style', '?') != '-': # suppress for flow style sets
  550. self.write_indicator(self.prefixed_colon, False)
  551. self.states.append(self.expect_flow_mapping_key)
  552. self.expect_node(mapping=True)
  553. def expect_flow_mapping_value(self) -> None:
  554. if self.canonical or self.column > self.best_width:
  555. self.write_indent()
  556. self.write_indicator(self.prefixed_colon, True)
  557. self.states.append(self.expect_flow_mapping_key)
  558. self.expect_node(mapping=True)
  559. # Block sequence handlers.
  560. def expect_block_sequence(self) -> None:
  561. if self.mapping_context:
  562. indentless = not self.indention
  563. else:
  564. indentless = False
  565. if not self.compact_seq_seq and self.column != 0:
  566. self.write_line_break()
  567. self.increase_indent(flow=False, sequence=True, indentless=indentless)
  568. self.state = self.expect_first_block_sequence_item
  569. def expect_first_block_sequence_item(self) -> Any:
  570. return self.expect_block_sequence_item(first=True)
  571. def expect_block_sequence_item(self, first: bool = False) -> None:
  572. if not first and isinstance(self.event, SequenceEndEvent):
  573. if self.event.comment and self.event.comment[1]:
  574. # final comments on a block list e.g. empty line
  575. self.write_pre_comment(self.event)
  576. self.indent = self.indents.pop()
  577. self.state = self.states.pop()
  578. self.no_newline = False
  579. else:
  580. if self.event.comment and self.event.comment[1]:
  581. self.write_pre_comment(self.event)
  582. nonl = self.no_newline if self.column == 0 else False
  583. self.write_indent()
  584. ind = self.sequence_dash_offset # if len(self.indents) > 1 else 0
  585. self.write_indicator(' ' * ind + '-', True, indention=True)
  586. if nonl or self.sequence_dash_offset + 2 > self.best_sequence_indent:
  587. self.no_newline = True
  588. self.states.append(self.expect_block_sequence_item)
  589. self.expect_node(sequence=True)
  590. # Block mapping handlers.
  591. def expect_block_mapping(self) -> None:
  592. if not self.mapping_context and not (self.compact_seq_map or self.column == 0):
  593. self.write_line_break()
  594. self.increase_indent(flow=False, sequence=False)
  595. self.state = self.expect_first_block_mapping_key
  596. def expect_first_block_mapping_key(self) -> None:
  597. return self.expect_block_mapping_key(first=True)
  598. def expect_block_mapping_key(self, first: Any = False) -> None:
  599. if not first and isinstance(self.event, MappingEndEvent):
  600. if self.event.comment and self.event.comment[1]:
  601. # final comments from a doc
  602. self.write_pre_comment(self.event)
  603. self.indent = self.indents.pop()
  604. self.state = self.states.pop()
  605. else:
  606. if self.event.comment and self.event.comment[1]:
  607. # final comments from a doc
  608. self.write_pre_comment(self.event)
  609. self.write_indent()
  610. if self.check_simple_key():
  611. if not isinstance(
  612. self.event, (SequenceStartEvent, MappingStartEvent),
  613. ): # sequence keys
  614. try:
  615. if self.event.style == '?':
  616. self.write_indicator('?', True, indention=True)
  617. except AttributeError: # aliases have no style
  618. pass
  619. self.states.append(self.expect_block_mapping_simple_value)
  620. self.expect_node(mapping=True, simple_key=True)
  621. # test on style for alias in !!set
  622. if isinstance(self.event, AliasEvent) and not self.event.style == '?':
  623. self.stream.write(' ')
  624. else:
  625. self.write_indicator('?', True, indention=True)
  626. self.states.append(self.expect_block_mapping_value)
  627. self.expect_node(mapping=True)
  628. def expect_block_mapping_simple_value(self) -> None:
  629. if getattr(self.event, 'style', None) != '?':
  630. # prefix = ''
  631. if self.indent == 0 and self.top_level_colon_align is not None:
  632. # write non-prefixed colon
  633. c = ' ' * (self.top_level_colon_align - self.column) + self.colon
  634. else:
  635. c = self.prefixed_colon
  636. self.write_indicator(c, False)
  637. self.states.append(self.expect_block_mapping_key)
  638. self.expect_node(mapping=True)
  639. def expect_block_mapping_value(self) -> None:
  640. self.write_indent()
  641. self.write_indicator(self.prefixed_colon, True, indention=True)
  642. self.states.append(self.expect_block_mapping_key)
  643. self.expect_node(mapping=True)
  644. # Checkers.
  645. def check_empty_sequence(self) -> bool:
  646. return (
  647. isinstance(self.event, SequenceStartEvent)
  648. and bool(self.events)
  649. and isinstance(self.events[0], SequenceEndEvent)
  650. )
  651. def check_empty_mapping(self) -> bool:
  652. return (
  653. isinstance(self.event, MappingStartEvent)
  654. and bool(self.events)
  655. and isinstance(self.events[0], MappingEndEvent)
  656. )
  657. def check_empty_document(self) -> bool:
  658. if not isinstance(self.event, DocumentStartEvent) or not self.events:
  659. return False
  660. event = self.events[0]
  661. return (
  662. isinstance(event, ScalarEvent)
  663. and event.anchor is None
  664. and event.tag is None
  665. and event.implicit
  666. and event.value == ""
  667. )
  668. def check_simple_key(self) -> bool:
  669. length = 0
  670. if isinstance(self.event, NodeEvent) and self.event.anchor is not None:
  671. if self.prepared_anchor is None:
  672. self.prepared_anchor = self.prepare_anchor(self.event.anchor)
  673. length += len(self.prepared_anchor)
  674. if (
  675. isinstance(self.event, (ScalarEvent, CollectionStartEvent))
  676. and self.event.tag is not None
  677. ):
  678. if self.prepared_tag is None:
  679. self.prepared_tag = self.prepare_tag(self.event.ctag)
  680. length += len(self.prepared_tag)
  681. if isinstance(self.event, ScalarEvent):
  682. if self.analysis is None:
  683. self.analysis = self.analyze_scalar(self.event.value)
  684. length += len(self.analysis.scalar)
  685. return length < self.MAX_SIMPLE_KEY_LENGTH and (
  686. isinstance(self.event, AliasEvent)
  687. or (isinstance(self.event, SequenceStartEvent) and self.event.flow_style is True)
  688. or (isinstance(self.event, MappingStartEvent) and self.event.flow_style is True)
  689. or (
  690. isinstance(self.event, ScalarEvent)
  691. # if there is an explicit style for an empty string, it is a simple key
  692. and not (self.analysis.empty and self.style and self.style not in '\'"')
  693. and not self.analysis.multiline
  694. )
  695. or self.check_empty_sequence()
  696. or self.check_empty_mapping()
  697. )
  698. # Anchor, Tag, and Scalar processors.
  699. def process_anchor(self, indicator: Any) -> bool:
  700. if self.event.anchor is None:
  701. self.prepared_anchor = None
  702. return False
  703. if self.prepared_anchor is None:
  704. self.prepared_anchor = self.prepare_anchor(self.event.anchor)
  705. if self.prepared_anchor:
  706. self.write_indicator(indicator + self.prepared_anchor, True)
  707. # issue 288
  708. self.no_newline = False
  709. self.prepared_anchor = None
  710. return True
  711. def process_tag(self) -> None:
  712. tag = self.event.tag
  713. if isinstance(self.event, ScalarEvent):
  714. if self.style is None:
  715. self.style = self.choose_scalar_style()
  716. if (
  717. self.event.value == ''
  718. and self.style == "'"
  719. and tag == 'tag:yaml.org,2002:null'
  720. and self.alt_null is not None
  721. ):
  722. self.event.value = self.alt_null
  723. self.analysis = None
  724. self.style = self.choose_scalar_style()
  725. if (not self.canonical or tag is None) and (
  726. (self.style == "" and self.event.implicit[0])
  727. or (self.style != "" and self.event.implicit[1])
  728. ):
  729. self.prepared_tag = None
  730. return
  731. if self.event.implicit[0] and tag is None:
  732. tag = '!'
  733. self.prepared_tag = None
  734. else:
  735. if (not self.canonical or tag is None) and self.event.implicit:
  736. self.prepared_tag = None
  737. return
  738. if tag is None:
  739. raise EmitterError('tag is not specified')
  740. if self.prepared_tag is None:
  741. self.prepared_tag = self.prepare_tag(self.event.ctag)
  742. if self.prepared_tag:
  743. self.write_indicator(self.prepared_tag, True)
  744. if (
  745. self.sequence_context
  746. and not self.flow_level
  747. and isinstance(self.event, ScalarEvent)
  748. ):
  749. self.no_newline = True
  750. self.prepared_tag = None
  751. def choose_scalar_style(self) -> Any:
  752. # issue 449 needs this otherwise emits single quoted empty string
  753. if self.event.value == '' and self.event.ctag.handle == '!!':
  754. return None
  755. if self.analysis is None:
  756. self.analysis = self.analyze_scalar(self.event.value)
  757. if self.event.style == '"' or self.canonical:
  758. return '"'
  759. if (not self.event.style or self.event.style == '?' or self.event.style == '-') and (
  760. self.event.implicit[0] or not self.event.implicit[2]
  761. ):
  762. if not (
  763. self.simple_key_context and (self.analysis.empty or self.analysis.multiline)
  764. ) and (
  765. self.flow_level
  766. and self.analysis.allow_flow_plain
  767. or (not self.flow_level and self.analysis.allow_block_plain)
  768. ):
  769. return ""
  770. if self.event.style == '-':
  771. return ""
  772. self.analysis.allow_block = True
  773. if self.event.style and self.event.style in '|>':
  774. if (
  775. not self.flow_level
  776. and not self.simple_key_context
  777. and self.analysis.allow_block
  778. ):
  779. return self.event.style
  780. if not self.event.style and self.analysis.allow_double_quoted:
  781. if "'" in self.event.value or '\n' in self.event.value:
  782. return '"'
  783. if not self.event.style or self.event.style == "'":
  784. if self.analysis.allow_single_quoted and not (
  785. self.simple_key_context and self.analysis.multiline
  786. ):
  787. return "'"
  788. return '"'
  789. def process_scalar(self) -> None:
  790. if self.analysis is None:
  791. self.analysis = self.analyze_scalar(self.event.value)
  792. if self.style is None:
  793. self.style = self.choose_scalar_style()
  794. split = not self.simple_key_context
  795. # if self.analysis.multiline and split \
  796. # and (not self.style or self.style in '\'\"'):
  797. # self.write_indent()
  798. # nprint('xx', self.sequence_context, self.flow_level)
  799. if self.sequence_context and not self.flow_level:
  800. self.write_indent()
  801. if self.style == '"':
  802. self.write_double_quoted(self.analysis.scalar, split)
  803. elif self.style == "'":
  804. self.write_single_quoted(self.analysis.scalar, split)
  805. elif self.style == '>':
  806. try:
  807. cmx = self.event.comment[1][0]
  808. except (IndexError, TypeError):
  809. cmx = ""
  810. self.write_folded(self.analysis.scalar, cmx)
  811. if (
  812. self.event.comment
  813. and self.event.comment[0]
  814. and self.event.comment[0].column >= self.indent
  815. ):
  816. # comment following a folded scalar must dedent (issue 376)
  817. self.event.comment[0].column = self.indent - 1 # type: ignore
  818. elif self.style == '|':
  819. # self.write_literal(self.analysis.scalar, self.event.comment)
  820. try:
  821. cmx = self.event.comment[1][0]
  822. except (IndexError, TypeError):
  823. cmx = ""
  824. self.write_literal(self.analysis.scalar, cmx)
  825. if (
  826. self.event.comment
  827. and self.event.comment[0]
  828. and self.event.comment[0].column >= self.indent
  829. ):
  830. # comment following a literal scalar must dedent (issue 376)
  831. self.event.comment[0].column = self.indent - 1 # type: ignore
  832. else:
  833. self.write_plain(self.analysis.scalar, split)
  834. self.analysis = None
  835. self.style = None
  836. if self.event.comment:
  837. self.write_post_comment(self.event)
  838. # Analyzers.
  839. def prepare_version(self, version: Any) -> Any:
  840. major, minor = version
  841. if major != 1:
  842. raise EmitterError(f'unsupported YAML version: {major:d}.{minor:d}')
  843. return f'{major:d}.{minor:d}'
  844. def prepare_tag_handle(self, handle: Any) -> Any:
  845. if not handle:
  846. raise EmitterError('tag handle must not be empty')
  847. if handle[0] != '!' or handle[-1] != '!':
  848. raise EmitterError(f"tag handle must start and end with '!': {handle!r}")
  849. for ch in handle[1:-1]:
  850. if not ('0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in '-_'):
  851. raise EmitterError(f'invalid character {ch!r} in the tag handle: {handle!r}')
  852. return handle
  853. def prepare_tag_prefix(self, prefix: Any) -> Any:
  854. if not prefix:
  855. raise EmitterError('tag prefix must not be empty')
  856. chunks: List[Any] = []
  857. start = end = 0
  858. if prefix[0] == '!':
  859. end = 1
  860. ch_set = "-;/?:@&=+$,_.~*'()[]"
  861. if self.dumper:
  862. version = getattr(self.dumper, 'version', (1, 2))
  863. if version is None or version >= (1, 2):
  864. ch_set += '#'
  865. while end < len(prefix):
  866. ch = prefix[end]
  867. if '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in ch_set:
  868. end += 1
  869. else:
  870. if start < end:
  871. chunks.append(prefix[start:end])
  872. start = end = end + 1
  873. data = ch
  874. for ch in data:
  875. chunks.append(f'%{ord(ch):02X}')
  876. if start < end:
  877. chunks.append(prefix[start:end])
  878. return "".join(chunks)
  879. def prepare_tag(self, tag: Any) -> Any:
  880. if not tag:
  881. raise EmitterError('tag must not be empty')
  882. tag = str(tag)
  883. if tag == '!' or tag == '!!':
  884. return tag
  885. handle = None
  886. suffix = tag
  887. prefixes = sorted(self.tag_prefixes.keys())
  888. for prefix in prefixes:
  889. if tag.startswith(prefix) and (prefix == '!' or len(prefix) < len(tag)):
  890. handle = self.tag_prefixes[prefix]
  891. suffix = tag[len(prefix) :]
  892. chunks: List[Any] = []
  893. start = end = 0
  894. ch_set = "-;/?:@&=+$,_.~*'()[]"
  895. if self.dumper:
  896. version = getattr(self.dumper, 'version', (1, 2))
  897. if version is None or version >= (1, 2):
  898. ch_set += '#'
  899. while end < len(suffix):
  900. ch = suffix[end]
  901. if (
  902. '0' <= ch <= '9'
  903. or 'A' <= ch <= 'Z'
  904. or 'a' <= ch <= 'z'
  905. or ch in ch_set
  906. or (ch == '!' and handle != '!')
  907. ):
  908. end += 1
  909. else:
  910. if start < end:
  911. chunks.append(suffix[start:end])
  912. start = end = end + 1
  913. data = ch
  914. for ch in data:
  915. chunks.append(f'%{ord(ch):02X}')
  916. if start < end:
  917. chunks.append(suffix[start:end])
  918. suffix_text = "".join(chunks)
  919. if handle:
  920. return f'{handle!s}{suffix_text!s}'
  921. else:
  922. return f'!<{suffix_text!s}>'
  923. def prepare_anchor(self, anchor: Any) -> Any:
  924. if not anchor:
  925. raise EmitterError('anchor must not be empty')
  926. for ch in anchor:
  927. if not check_anchorname_char(ch):
  928. raise EmitterError(f'invalid character {ch!r} in the anchor: {anchor!r}')
  929. return anchor
  930. def analyze_scalar(self, scalar: Any) -> Any:
  931. # Empty scalar is a special case.
  932. if not scalar:
  933. return ScalarAnalysis(
  934. scalar=scalar,
  935. empty=True,
  936. multiline=False,
  937. allow_flow_plain=False,
  938. allow_block_plain=True,
  939. allow_single_quoted=True,
  940. allow_double_quoted=True,
  941. allow_block=False,
  942. )
  943. # Indicators and special characters.
  944. block_indicators = False
  945. flow_indicators = False
  946. line_breaks = False
  947. special_characters = False
  948. # Important whitespace combinations.
  949. leading_space = False
  950. leading_break = False
  951. trailing_space = False
  952. trailing_break = False
  953. break_space = False
  954. space_break = False
  955. # Check document indicators.
  956. if scalar.startswith('---') or scalar.startswith('...'):
  957. block_indicators = True
  958. flow_indicators = True
  959. # First character or preceded by a whitespace.
  960. preceeded_by_whitespace = True
  961. # Last character or followed by a whitespace.
  962. followed_by_whitespace = len(scalar) == 1 or scalar[1] in '\0 \t\r\n\x85\u2028\u2029'
  963. # The previous character is a space.
  964. previous_space = False
  965. # The previous character is a break.
  966. previous_break = False
  967. index = 0
  968. while index < len(scalar):
  969. ch = scalar[index]
  970. # Check for indicators.
  971. if index == 0:
  972. # Leading indicators are special characters.
  973. if ch in '#,[]{}&*!|>\'"%@`':
  974. flow_indicators = True
  975. block_indicators = True
  976. if ch in '?:': # ToDo
  977. if self.serializer.use_version == (1, 1):
  978. flow_indicators = True
  979. elif len(scalar) == 1: # single character
  980. flow_indicators = True
  981. if followed_by_whitespace:
  982. block_indicators = True
  983. if ch == '-' and followed_by_whitespace:
  984. flow_indicators = True
  985. block_indicators = True
  986. else:
  987. # Some indicators cannot appear within a scalar as well.
  988. if ch in ',[]{}': # http://yaml.org/spec/1.2/spec.html#id2788859
  989. flow_indicators = True
  990. if ch == '?' and self.serializer.use_version == (1, 1):
  991. flow_indicators = True
  992. if ch == ':':
  993. if followed_by_whitespace:
  994. flow_indicators = True
  995. block_indicators = True
  996. if ch == '#' and preceeded_by_whitespace:
  997. flow_indicators = True
  998. block_indicators = True
  999. # Check for line breaks, special, and unicode characters.
  1000. if ch in '\n\x85\u2028\u2029':
  1001. line_breaks = True
  1002. if not (ch == '\n' or '\x20' <= ch <= '\x7E'):
  1003. if (
  1004. ch == '\x85'
  1005. or '\xA0' <= ch <= '\uD7FF'
  1006. or '\uE000' <= ch <= '\uFFFD'
  1007. or (self.unicode_supplementary and ('\U00010000' <= ch <= '\U0010FFFF'))
  1008. ) and ch != '\uFEFF':
  1009. # unicode_characters = True
  1010. if not self.allow_unicode:
  1011. special_characters = True
  1012. else:
  1013. special_characters = True
  1014. # Detect important whitespace combinations.
  1015. if ch == ' ':
  1016. if index == 0:
  1017. leading_space = True
  1018. if index == len(scalar) - 1:
  1019. trailing_space = True
  1020. if previous_break:
  1021. break_space = True
  1022. previous_space = True
  1023. previous_break = False
  1024. elif ch in '\n\x85\u2028\u2029':
  1025. if index == 0:
  1026. leading_break = True
  1027. if index == len(scalar) - 1:
  1028. trailing_break = True
  1029. if previous_space:
  1030. space_break = True
  1031. previous_space = False
  1032. previous_break = True
  1033. else:
  1034. previous_space = False
  1035. previous_break = False
  1036. # Prepare for the next character.
  1037. index += 1
  1038. preceeded_by_whitespace = ch in '\0 \t\r\n\x85\u2028\u2029'
  1039. followed_by_whitespace = (
  1040. index + 1 >= len(scalar) or scalar[index + 1] in '\0 \t\r\n\x85\u2028\u2029'
  1041. )
  1042. # Let's decide what styles are allowed.
  1043. allow_flow_plain = True
  1044. allow_block_plain = True
  1045. allow_single_quoted = True
  1046. allow_double_quoted = True
  1047. allow_block = True
  1048. # Leading and trailing whitespaces are bad for plain scalars.
  1049. if leading_space or leading_break or trailing_space or trailing_break:
  1050. allow_flow_plain = allow_block_plain = False
  1051. # We do not permit trailing spaces for block scalars.
  1052. if trailing_space:
  1053. allow_block = False
  1054. # Spaces at the beginning of a new line are only acceptable for block
  1055. # scalars.
  1056. if break_space:
  1057. allow_flow_plain = allow_block_plain = allow_single_quoted = False
  1058. # Spaces followed by breaks, as well as special character are only
  1059. # allowed for double quoted scalars.
  1060. if special_characters:
  1061. allow_flow_plain = allow_block_plain = allow_single_quoted = allow_block = False
  1062. elif space_break:
  1063. allow_flow_plain = allow_block_plain = allow_single_quoted = False
  1064. if not self.allow_space_break:
  1065. allow_block = False
  1066. # Although the plain scalar writer supports breaks, we never emit
  1067. # multiline plain scalars.
  1068. if line_breaks:
  1069. allow_flow_plain = allow_block_plain = False
  1070. # Flow indicators are forbidden for flow plain scalars.
  1071. if flow_indicators:
  1072. allow_flow_plain = False
  1073. # Block indicators are forbidden for block plain scalars.
  1074. if block_indicators:
  1075. allow_block_plain = False
  1076. return ScalarAnalysis(
  1077. scalar=scalar,
  1078. empty=False,
  1079. multiline=line_breaks,
  1080. allow_flow_plain=allow_flow_plain,
  1081. allow_block_plain=allow_block_plain,
  1082. allow_single_quoted=allow_single_quoted,
  1083. allow_double_quoted=allow_double_quoted,
  1084. allow_block=allow_block,
  1085. )
  1086. # Writers.
  1087. def flush_stream(self) -> None:
  1088. if hasattr(self.stream, 'flush'):
  1089. self.stream.flush()
  1090. def write_stream_start(self) -> None:
  1091. # Write BOM if needed.
  1092. if self.encoding and self.encoding.startswith('utf-16'):
  1093. self.stream.write('\uFEFF'.encode(self.encoding))
  1094. def write_stream_end(self) -> None:
  1095. self.flush_stream()
  1096. def write_indicator(
  1097. self,
  1098. indicator: Any,
  1099. need_whitespace: Any,
  1100. whitespace: bool = False,
  1101. indention: bool = False,
  1102. ) -> None:
  1103. if self.whitespace or not need_whitespace:
  1104. data = indicator
  1105. else:
  1106. data = ' ' + indicator
  1107. self.whitespace = whitespace
  1108. self.indention = self.indention and indention
  1109. self.column += len(data)
  1110. self.open_ended = False
  1111. if bool(self.encoding):
  1112. data = data.encode(self.encoding)
  1113. self.stream.write(data)
  1114. def write_indent(self) -> None:
  1115. indent = self.indent or 0
  1116. if (
  1117. not self.indention
  1118. or self.column > indent
  1119. or (self.column == indent and not self.whitespace)
  1120. ):
  1121. if bool(self.no_newline):
  1122. self.no_newline = False
  1123. else:
  1124. self.write_line_break()
  1125. if self.column < indent:
  1126. self.whitespace = True
  1127. data = ' ' * (indent - self.column)
  1128. self.column = indent
  1129. if self.encoding:
  1130. data = data.encode(self.encoding) # type: ignore
  1131. self.stream.write(data)
  1132. def write_line_break(self, data: Any = None) -> None:
  1133. if data is None:
  1134. data = self.best_line_break
  1135. self.whitespace = True
  1136. self.indention = True
  1137. self.line += 1
  1138. self.column = 0
  1139. if bool(self.encoding):
  1140. data = data.encode(self.encoding)
  1141. self.stream.write(data)
  1142. def write_version_directive(self, version_text: Any) -> None:
  1143. data: Any = f'%YAML {version_text!s}'
  1144. if self.encoding:
  1145. data = data.encode(self.encoding)
  1146. self.stream.write(data)
  1147. self.write_line_break()
  1148. def write_tag_directive(self, handle_text: Any, prefix_text: Any) -> None:
  1149. data: Any = f'%TAG {handle_text!s} {prefix_text!s}'
  1150. if self.encoding:
  1151. data = data.encode(self.encoding)
  1152. self.stream.write(data)
  1153. self.write_line_break()
  1154. # Scalar streams.
  1155. def write_single_quoted(self, text: Any, split: Any = True) -> None:
  1156. if self.root_context:
  1157. if self.requested_indent is not None:
  1158. self.write_line_break()
  1159. if self.requested_indent != 0:
  1160. self.write_indent()
  1161. self.write_indicator("'", True)
  1162. spaces = False
  1163. breaks = False
  1164. start = end = 0
  1165. while end <= len(text):
  1166. ch = None
  1167. if end < len(text):
  1168. ch = text[end]
  1169. if spaces:
  1170. if ch is None or ch != ' ':
  1171. if (
  1172. start + 1 == end
  1173. and self.column > self.best_width
  1174. and split
  1175. and start != 0
  1176. and end != len(text)
  1177. ):
  1178. self.write_indent()
  1179. else:
  1180. data = text[start:end]
  1181. self.column += len(data)
  1182. if bool(self.encoding):
  1183. data = data.encode(self.encoding)
  1184. self.stream.write(data)
  1185. start = end
  1186. elif breaks:
  1187. if ch is None or ch not in '\n\x85\u2028\u2029':
  1188. if text[start] == '\n':
  1189. self.write_line_break()
  1190. for br in text[start:end]:
  1191. if br == '\n':
  1192. self.write_line_break()
  1193. else:
  1194. self.write_line_break(br)
  1195. self.write_indent()
  1196. start = end
  1197. else:
  1198. if ch is None or ch in ' \n\x85\u2028\u2029' or ch == "'":
  1199. if start < end:
  1200. data = text[start:end]
  1201. self.column += len(data)
  1202. if bool(self.encoding):
  1203. data = data.encode(self.encoding)
  1204. self.stream.write(data)
  1205. start = end
  1206. if ch == "'":
  1207. data = "''"
  1208. self.column += 2
  1209. if bool(self.encoding):
  1210. data = data.encode(self.encoding)
  1211. self.stream.write(data)
  1212. start = end + 1
  1213. if ch is not None:
  1214. spaces = ch == ' '
  1215. breaks = ch in '\n\x85\u2028\u2029'
  1216. end += 1
  1217. self.write_indicator("'", False)
  1218. ESCAPE_REPLACEMENTS = {
  1219. '\0': '0',
  1220. '\x07': 'a',
  1221. '\x08': 'b',
  1222. '\x09': 't',
  1223. '\x0A': 'n',
  1224. '\x0B': 'v',
  1225. '\x0C': 'f',
  1226. '\x0D': 'r',
  1227. '\x1B': 'e',
  1228. '"': '"',
  1229. '\\': '\\',
  1230. '\x85': 'N',
  1231. '\xA0': '_',
  1232. '\u2028': 'L',
  1233. '\u2029': 'P',
  1234. }
  1235. def write_double_quoted(self, text: Any, split: Any = True) -> None:
  1236. if self.root_context:
  1237. if self.requested_indent is not None:
  1238. self.write_line_break()
  1239. if self.requested_indent != 0:
  1240. self.write_indent()
  1241. self.write_indicator('"', True)
  1242. start = end = 0
  1243. while end <= len(text):
  1244. ch = None
  1245. if end < len(text):
  1246. ch = text[end]
  1247. if (
  1248. ch is None
  1249. or ch in '"\\\x85\u2028\u2029\uFEFF'
  1250. or not (
  1251. '\x20' <= ch <= '\x7E'
  1252. or (
  1253. self.allow_unicode
  1254. and (
  1255. ('\xA0' <= ch <= '\uD7FF')
  1256. or ('\uE000' <= ch <= '\uFFFD')
  1257. or ('\U00010000' <= ch <= '\U0010FFFF')
  1258. )
  1259. )
  1260. )
  1261. ):
  1262. if start < end:
  1263. data = text[start:end]
  1264. self.column += len(data)
  1265. if bool(self.encoding):
  1266. data = data.encode(self.encoding)
  1267. self.stream.write(data)
  1268. start = end
  1269. if ch is not None:
  1270. if ch in self.ESCAPE_REPLACEMENTS:
  1271. data = '\\' + self.ESCAPE_REPLACEMENTS[ch]
  1272. elif ch <= '\xFF':
  1273. data = '\\x%02X' % ord(ch)
  1274. elif ch <= '\uFFFF':
  1275. data = '\\u%04X' % ord(ch)
  1276. else:
  1277. data = '\\U%08X' % ord(ch)
  1278. self.column += len(data)
  1279. if bool(self.encoding):
  1280. data = data.encode(self.encoding)
  1281. self.stream.write(data)
  1282. start = end + 1
  1283. if (
  1284. 0 < end < len(text) - 1
  1285. and (ch == ' ' or start >= end)
  1286. and self.column + (end - start) > self.best_width
  1287. and split
  1288. ):
  1289. # SO https://stackoverflow.com/a/75634614/1307905
  1290. # data = text[start:end] + u'\\' # <<< replaced with following six lines
  1291. need_backquote = True
  1292. if len(text) > end:
  1293. try:
  1294. space_pos = text.index(' ', end)
  1295. if (
  1296. '"' not in text[end:space_pos]
  1297. and "'" not in text[end:space_pos]
  1298. and text[space_pos + 1] != ' '
  1299. and text[end - 1 : end + 1] != ' '
  1300. ):
  1301. need_backquote = False
  1302. except (ValueError, IndexError):
  1303. pass
  1304. data = text[start:end] + ('\\' if need_backquote else '')
  1305. if start < end:
  1306. start = end
  1307. self.column += len(data)
  1308. if bool(self.encoding):
  1309. data = data.encode(self.encoding)
  1310. self.stream.write(data)
  1311. self.write_indent()
  1312. self.whitespace = False
  1313. self.indention = False
  1314. if text[start] == ' ':
  1315. if not need_backquote:
  1316. # remove leading space it will load from the newline
  1317. start += 1
  1318. # data = u'\\' # <<< replaced with following line
  1319. data = '\\' if need_backquote else ''
  1320. self.column += len(data)
  1321. if bool(self.encoding):
  1322. data = data.encode(self.encoding)
  1323. self.stream.write(data)
  1324. end += 1
  1325. self.write_indicator('"', False)
  1326. def determine_block_hints(self, text: Any) -> Any:
  1327. indent = 0
  1328. indicator = ''
  1329. hints = ''
  1330. if text:
  1331. if text[0] in ' \n\x85\u2028\u2029':
  1332. indent = 2
  1333. hints += str(indent)
  1334. elif self.root_context:
  1335. for end in ['\n---', '\n...']:
  1336. pos = 0
  1337. while True:
  1338. pos = text.find(end, pos)
  1339. if pos == -1:
  1340. break
  1341. try:
  1342. if text[pos + 4] in ' \r\n':
  1343. break
  1344. except IndexError:
  1345. pass
  1346. pos += 1
  1347. if pos > -1:
  1348. break
  1349. if pos > 0:
  1350. indent = 2
  1351. if text[-1] not in '\n\x85\u2028\u2029':
  1352. indicator = '-'
  1353. elif len(text) == 1 or text[-2] in '\n\x85\u2028\u2029':
  1354. indicator = '+'
  1355. hints += indicator
  1356. return hints, indent, indicator
  1357. def write_folded(self, text: Any, comment: Any) -> None:
  1358. hints, _indent, _indicator = self.determine_block_hints(text)
  1359. if not isinstance(comment, str):
  1360. comment = ''
  1361. self.write_indicator('>' + hints + comment, True)
  1362. if _indicator == '+':
  1363. self.open_ended = True
  1364. self.write_line_break()
  1365. leading_space = True
  1366. spaces = False
  1367. breaks = True
  1368. start = end = 0
  1369. while end <= len(text):
  1370. ch = None
  1371. if end < len(text):
  1372. ch = text[end]
  1373. if breaks:
  1374. if ch is None or ch not in '\n\x85\u2028\u2029\a':
  1375. if (
  1376. not leading_space
  1377. and ch is not None
  1378. and ch != ' '
  1379. and text[start] == '\n'
  1380. ):
  1381. self.write_line_break()
  1382. leading_space = ch == ' '
  1383. for br in text[start:end]:
  1384. if br == '\n':
  1385. self.write_line_break()
  1386. else:
  1387. self.write_line_break(br)
  1388. if ch is not None:
  1389. self.write_indent()
  1390. start = end
  1391. elif spaces:
  1392. if ch != ' ':
  1393. if start + 1 == end and self.column > self.best_width:
  1394. self.write_indent()
  1395. else:
  1396. data = text[start:end]
  1397. self.column += len(data)
  1398. if bool(self.encoding):
  1399. data = data.encode(self.encoding)
  1400. self.stream.write(data)
  1401. start = end
  1402. else:
  1403. if ch is None or ch in ' \n\x85\u2028\u2029\a':
  1404. data = text[start:end]
  1405. self.column += len(data)
  1406. if bool(self.encoding):
  1407. data = data.encode(self.encoding)
  1408. self.stream.write(data)
  1409. if ch == '\a':
  1410. if end < (len(text) - 1) and not text[end + 2].isspace():
  1411. self.write_line_break()
  1412. self.write_indent()
  1413. end += 2 # \a and the space that is inserted on the fold
  1414. else:
  1415. raise EmitterError('unexcpected fold indicator \\a before space')
  1416. if ch is None:
  1417. self.write_line_break()
  1418. start = end
  1419. if ch is not None:
  1420. breaks = ch in '\n\x85\u2028\u2029'
  1421. spaces = ch == ' '
  1422. end += 1
  1423. def write_literal(self, text: Any, comment: Any = None) -> None:
  1424. hints, _indent, _indicator = self.determine_block_hints(text)
  1425. # if comment is not None:
  1426. # try:
  1427. # hints += comment[1][0]
  1428. # except (TypeError, IndexError) as e:
  1429. # pass
  1430. if not isinstance(comment, str):
  1431. comment = ''
  1432. self.write_indicator('|' + hints + comment, True)
  1433. # try:
  1434. # nprintf('selfev', comment)
  1435. # cmx = comment[1][0]
  1436. # if cmx:
  1437. # self.stream.write(cmx)
  1438. # except (TypeError, IndexError) as e:
  1439. # pass
  1440. if _indicator == '+':
  1441. self.open_ended = True
  1442. self.write_line_break()
  1443. breaks = True
  1444. start = end = 0
  1445. while end <= len(text):
  1446. ch = None
  1447. if end < len(text):
  1448. ch = text[end]
  1449. if breaks:
  1450. if ch is None or ch not in '\n\x85\u2028\u2029':
  1451. for br in text[start:end]:
  1452. if br == '\n':
  1453. self.write_line_break()
  1454. else:
  1455. self.write_line_break(br)
  1456. if ch is not None:
  1457. if self.root_context:
  1458. idnx = self.indent if self.indent is not None else 0
  1459. self.stream.write(' ' * (_indent + idnx))
  1460. else:
  1461. self.write_indent()
  1462. start = end
  1463. else:
  1464. if ch is None or ch in '\n\x85\u2028\u2029':
  1465. data = text[start:end]
  1466. if bool(self.encoding):
  1467. data = data.encode(self.encoding)
  1468. self.stream.write(data)
  1469. if ch is None:
  1470. self.write_line_break()
  1471. start = end
  1472. if ch is not None:
  1473. breaks = ch in '\n\x85\u2028\u2029'
  1474. end += 1
  1475. def write_plain(self, text: Any, split: Any = True) -> None:
  1476. if self.root_context:
  1477. if self.requested_indent is not None:
  1478. self.write_line_break()
  1479. if self.requested_indent != 0:
  1480. self.write_indent()
  1481. else:
  1482. self.open_ended = True
  1483. if not text:
  1484. return
  1485. if not self.whitespace:
  1486. data = ' '
  1487. self.column += len(data)
  1488. if self.encoding:
  1489. data = data.encode(self.encoding) # type: ignore
  1490. self.stream.write(data)
  1491. self.whitespace = False
  1492. self.indention = False
  1493. spaces = False
  1494. breaks = False
  1495. start = end = 0
  1496. while end <= len(text):
  1497. ch = None
  1498. if end < len(text):
  1499. ch = text[end]
  1500. if spaces:
  1501. if ch != ' ':
  1502. if start + 1 == end and self.column > self.best_width and split:
  1503. self.write_indent()
  1504. self.whitespace = False
  1505. self.indention = False
  1506. else:
  1507. data = text[start:end]
  1508. self.column += len(data)
  1509. if self.encoding:
  1510. data = data.encode(self.encoding) # type: ignore
  1511. self.stream.write(data)
  1512. start = end
  1513. elif breaks:
  1514. if ch not in '\n\x85\u2028\u2029': # type: ignore
  1515. if text[start] == '\n':
  1516. self.write_line_break()
  1517. for br in text[start:end]:
  1518. if br == '\n':
  1519. self.write_line_break()
  1520. else:
  1521. self.write_line_break(br)
  1522. self.write_indent()
  1523. self.whitespace = False
  1524. self.indention = False
  1525. start = end
  1526. else:
  1527. if ch is None or ch in ' \n\x85\u2028\u2029':
  1528. data = text[start:end]
  1529. if (
  1530. len(data) > self.best_width
  1531. and self.indent is not None
  1532. and self.column > self.indent
  1533. ):
  1534. # words longer than line length get a line of their own
  1535. self.write_indent()
  1536. self.column += len(data)
  1537. if self.encoding:
  1538. data = data.encode(self.encoding) # type: ignore
  1539. try:
  1540. self.stream.write(data)
  1541. except: # NOQA
  1542. sys.stdout.write(repr(data) + '\n')
  1543. raise
  1544. start = end
  1545. if ch is not None:
  1546. spaces = ch == ' '
  1547. breaks = ch in '\n\x85\u2028\u2029'
  1548. end += 1
  1549. def write_comment(self, comment: Any, pre: bool = False) -> None:
  1550. value = comment.value
  1551. # nprintf(f'{self.column:02d} {comment.start_mark.column:02d} {value!r}')
  1552. if not pre and value[-1] == '\n':
  1553. value = value[:-1]
  1554. try:
  1555. # get original column position
  1556. col = comment.start_mark.column
  1557. if comment.value and comment.value.startswith('\n'):
  1558. # never inject extra spaces if the comment starts with a newline
  1559. # and not a real comment (e.g. if you have an empty line following a key-value
  1560. col = self.column
  1561. elif col < self.column + 1:
  1562. ValueError
  1563. except ValueError:
  1564. col = self.column + 1
  1565. # nprint('post_comment', self.line, self.column, value)
  1566. try:
  1567. # at least one space if the current column >= the start column of the comment
  1568. # but not at the start of a line
  1569. nr_spaces = col - self.column
  1570. if self.column and value.strip() and nr_spaces < 1 and value[0] != '\n':
  1571. nr_spaces = 1
  1572. value = ' ' * nr_spaces + value
  1573. try:
  1574. if bool(self.encoding):
  1575. value = value.encode(self.encoding)
  1576. except UnicodeDecodeError:
  1577. pass
  1578. self.stream.write(value)
  1579. except TypeError:
  1580. raise
  1581. if not pre:
  1582. self.write_line_break()
  1583. def write_pre_comment(self, event: Any) -> bool:
  1584. comments = event.comment[1]
  1585. if comments is None:
  1586. return False
  1587. try:
  1588. start_events = (MappingStartEvent, SequenceStartEvent)
  1589. for comment in comments:
  1590. if isinstance(event, start_events) and getattr(comment, 'pre_done', None):
  1591. continue
  1592. if self.column != 0:
  1593. self.write_line_break()
  1594. self.write_comment(comment, pre=True)
  1595. if isinstance(event, start_events):
  1596. comment.pre_done = True
  1597. except TypeError:
  1598. sys.stdout.write(f'eventtt {type(event)} {event}')
  1599. raise
  1600. return True
  1601. def write_post_comment(self, event: Any) -> bool:
  1602. if self.event.comment[0] is None:
  1603. return False
  1604. comment = event.comment[0]
  1605. self.write_comment(comment)
  1606. return True
  1607. class RoundTripEmitter(Emitter):
  1608. def prepare_tag(self, ctag: Any) -> Any:
  1609. if not ctag:
  1610. raise EmitterError('tag must not be empty')
  1611. tag = str(ctag)
  1612. if tag == '!' or tag == '!!':
  1613. return tag
  1614. handle = ctag.handle
  1615. suffix = ctag.suffix
  1616. prefixes = sorted(self.tag_prefixes.keys())
  1617. # print('handling', repr(tag), repr(suffix), repr(handle))
  1618. if handle is None:
  1619. for prefix in prefixes:
  1620. if tag.startswith(prefix) and (prefix == '!' or len(prefix) < len(tag)):
  1621. handle = self.tag_prefixes[prefix]
  1622. suffix = suffix[len(prefix) :]
  1623. if handle:
  1624. return f'{handle!s}{suffix!s}'
  1625. else:
  1626. return f'!<{suffix!s}>'