123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802 |
- # coding: utf-8
- from __future__ import absolute_import
- # The following YAML grammar is LL(1) and is parsed by a recursive descent
- # parser.
- #
- # stream ::= STREAM-START implicit_document? explicit_document*
- # STREAM-END
- # implicit_document ::= block_node DOCUMENT-END*
- # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
- # block_node_or_indentless_sequence ::=
- # ALIAS
- # | properties (block_content |
- # indentless_block_sequence)?
- # | block_content
- # | indentless_block_sequence
- # block_node ::= ALIAS
- # | properties block_content?
- # | block_content
- # flow_node ::= ALIAS
- # | properties flow_content?
- # | flow_content
- # properties ::= TAG ANCHOR? | ANCHOR TAG?
- # block_content ::= block_collection | flow_collection | SCALAR
- # flow_content ::= flow_collection | SCALAR
- # block_collection ::= block_sequence | block_mapping
- # flow_collection ::= flow_sequence | flow_mapping
- # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
- # BLOCK-END
- # indentless_sequence ::= (BLOCK-ENTRY block_node?)+
- # block_mapping ::= BLOCK-MAPPING_START
- # ((KEY block_node_or_indentless_sequence?)?
- # (VALUE block_node_or_indentless_sequence?)?)*
- # BLOCK-END
- # flow_sequence ::= FLOW-SEQUENCE-START
- # (flow_sequence_entry FLOW-ENTRY)*
- # flow_sequence_entry?
- # FLOW-SEQUENCE-END
- # flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- # flow_mapping ::= FLOW-MAPPING-START
- # (flow_mapping_entry FLOW-ENTRY)*
- # flow_mapping_entry?
- # FLOW-MAPPING-END
- # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- #
- # FIRST sets:
- #
- # stream: { STREAM-START }
- # explicit_document: { DIRECTIVE DOCUMENT-START }
- # implicit_document: FIRST(block_node)
- # block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START
- # BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START }
- # flow_node: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START }
- # block_content: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START
- # FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR }
- # flow_content: { FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR }
- # block_collection: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START }
- # flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START }
- # block_sequence: { BLOCK-SEQUENCE-START }
- # block_mapping: { BLOCK-MAPPING-START }
- # block_node_or_indentless_sequence: { ALIAS ANCHOR TAG SCALAR
- # BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START
- # FLOW-MAPPING-START BLOCK-ENTRY }
- # indentless_sequence: { ENTRY }
- # flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START }
- # flow_sequence: { FLOW-SEQUENCE-START }
- # flow_mapping: { FLOW-MAPPING-START }
- # flow_sequence_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START
- # FLOW-MAPPING-START KEY }
- # flow_mapping_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START
- # FLOW-MAPPING-START KEY }
- # need to have full path with import, as pkg_resources tries to load parser.py in __init__.py
- # only to not do anything with the package afterwards
- # and for Jython too
- from ruamel.yaml.error import MarkedYAMLError
- from ruamel.yaml.tokens import * # NOQA
- from ruamel.yaml.events import * # NOQA
- from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA
- from ruamel.yaml.compat import utf8, nprint, nprintf # NOQA
- if False: # MYPY
- from typing import Any, Dict, Optional, List # NOQA
- __all__ = ['Parser', 'RoundTripParser', 'ParserError']
- class ParserError(MarkedYAMLError):
- pass
- class Parser(object):
- # Since writing a recursive-descendant parser is a straightforward task, we
- # do not give many comments here.
- DEFAULT_TAGS = {u'!': u'!', u'!!': u'tag:yaml.org,2002:'}
- def __init__(self, loader):
- # type: (Any) -> None
- self.loader = loader
- if self.loader is not None and getattr(self.loader, '_parser', None) is None:
- self.loader._parser = self
- self.reset_parser()
- def reset_parser(self):
- # type: () -> None
- # Reset the state attributes (to clear self-references)
- self.current_event = None
- self.tag_handles = {} # type: Dict[Any, Any]
- self.states = [] # type: List[Any]
- self.marks = [] # type: List[Any]
- self.state = self.parse_stream_start # type: Any
- def dispose(self):
- # type: () -> None
- self.reset_parser()
- @property
- def scanner(self):
- # type: () -> Any
- if hasattr(self.loader, 'typ'):
- return self.loader.scanner
- return self.loader._scanner
- @property
- def resolver(self):
- # type: () -> Any
- if hasattr(self.loader, 'typ'):
- return self.loader.resolver
- return self.loader._resolver
- def check_event(self, *choices):
- # type: (Any) -> bool
- # Check the type of the next event.
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- if self.current_event is not None:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.current_event, choice):
- return True
- return False
- def peek_event(self):
- # type: () -> Any
- # Get the next event.
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- return self.current_event
- def get_event(self):
- # type: () -> Any
- # Get the next event and proceed further.
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- value = self.current_event
- self.current_event = None
- return value
- # stream ::= STREAM-START implicit_document? explicit_document*
- # STREAM-END
- # implicit_document ::= block_node DOCUMENT-END*
- # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END*
- def parse_stream_start(self):
- # type: () -> Any
- # Parse the stream start.
- token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
- event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding)
- # Prepare the next state.
- self.state = self.parse_implicit_document_start
- return event
- def parse_implicit_document_start(self):
- # type: () -> Any
- # Parse an implicit document.
- if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken):
- self.tag_handles = self.DEFAULT_TAGS
- token = self.scanner.peek_token()
- start_mark = end_mark = token.start_mark
- event = DocumentStartEvent(start_mark, end_mark, explicit=False)
- # Prepare the next state.
- self.states.append(self.parse_document_end)
- self.state = self.parse_block_node
- return event
- else:
- return self.parse_document_start()
- def parse_document_start(self):
- # type: () -> Any
- # Parse any extra document end indicators.
- while self.scanner.check_token(DocumentEndToken):
- self.scanner.get_token()
- # Parse an explicit document.
- if not self.scanner.check_token(StreamEndToken):
- token = self.scanner.peek_token()
- start_mark = token.start_mark
- version, tags = self.process_directives()
- if not self.scanner.check_token(DocumentStartToken):
- raise ParserError(
- None,
- None,
- "expected '<document start>', but found %r" % self.scanner.peek_token().id,
- self.scanner.peek_token().start_mark,
- )
- token = self.scanner.get_token()
- end_mark = token.end_mark
- # if self.loader is not None and \
- # end_mark.line != self.scanner.peek_token().start_mark.line:
- # self.loader.scalar_after_indicator = False
- event = DocumentStartEvent(
- start_mark, end_mark, explicit=True, version=version, tags=tags
- ) # type: Any
- self.states.append(self.parse_document_end)
- self.state = self.parse_document_content
- else:
- # Parse the end of the stream.
- token = self.scanner.get_token()
- event = StreamEndEvent(token.start_mark, token.end_mark, comment=token.comment)
- assert not self.states
- assert not self.marks
- self.state = None
- return event
- def parse_document_end(self):
- # type: () -> Any
- # Parse the document end.
- token = self.scanner.peek_token()
- start_mark = end_mark = token.start_mark
- explicit = False
- if self.scanner.check_token(DocumentEndToken):
- token = self.scanner.get_token()
- end_mark = token.end_mark
- explicit = True
- event = DocumentEndEvent(start_mark, end_mark, explicit=explicit)
- # Prepare the next state.
- if self.resolver.processing_version == (1, 1):
- self.state = self.parse_document_start
- else:
- self.state = self.parse_implicit_document_start
- return event
- def parse_document_content(self):
- # type: () -> Any
- if self.scanner.check_token(
- DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken
- ):
- event = self.process_empty_scalar(self.scanner.peek_token().start_mark)
- self.state = self.states.pop()
- return event
- else:
- return self.parse_block_node()
- def process_directives(self):
- # type: () -> Any
- yaml_version = None
- self.tag_handles = {}
- while self.scanner.check_token(DirectiveToken):
- token = self.scanner.get_token()
- if token.name == u'YAML':
- if yaml_version is not None:
- raise ParserError(
- None, None, 'found duplicate YAML directive', token.start_mark
- )
- major, minor = token.value
- if major != 1:
- raise ParserError(
- None,
- None,
- 'found incompatible YAML document (version 1.* is ' 'required)',
- token.start_mark,
- )
- yaml_version = token.value
- elif token.name == u'TAG':
- handle, prefix = token.value
- if handle in self.tag_handles:
- raise ParserError(
- None, None, 'duplicate tag handle %r' % utf8(handle), token.start_mark
- )
- self.tag_handles[handle] = prefix
- if bool(self.tag_handles):
- value = yaml_version, self.tag_handles.copy() # type: Any
- else:
- value = yaml_version, None
- if self.loader is not None and hasattr(self.loader, 'tags'):
- self.loader.version = yaml_version
- if self.loader.tags is None:
- self.loader.tags = {}
- for k in self.tag_handles:
- self.loader.tags[k] = self.tag_handles[k]
- for key in self.DEFAULT_TAGS:
- if key not in self.tag_handles:
- self.tag_handles[key] = self.DEFAULT_TAGS[key]
- return value
- # block_node_or_indentless_sequence ::= ALIAS
- # | properties (block_content | indentless_block_sequence)?
- # | block_content
- # | indentless_block_sequence
- # block_node ::= ALIAS
- # | properties block_content?
- # | block_content
- # flow_node ::= ALIAS
- # | properties flow_content?
- # | flow_content
- # properties ::= TAG ANCHOR? | ANCHOR TAG?
- # block_content ::= block_collection | flow_collection | SCALAR
- # flow_content ::= flow_collection | SCALAR
- # block_collection ::= block_sequence | block_mapping
- # flow_collection ::= flow_sequence | flow_mapping
- def parse_block_node(self):
- # type: () -> Any
- return self.parse_node(block=True)
- def parse_flow_node(self):
- # type: () -> Any
- return self.parse_node()
- def parse_block_node_or_indentless_sequence(self):
- # type: () -> Any
- return self.parse_node(block=True, indentless_sequence=True)
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
- return self.tag_handles[handle] + suffix
- def parse_node(self, block=False, indentless_sequence=False):
- # type: (bool, bool) -> Any
- if self.scanner.check_token(AliasToken):
- token = self.scanner.get_token()
- event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any
- self.state = self.states.pop()
- return event
- anchor = None
- tag = None
- start_mark = end_mark = tag_mark = None
- if self.scanner.check_token(AnchorToken):
- token = self.scanner.get_token()
- start_mark = token.start_mark
- end_mark = token.end_mark
- anchor = token.value
- if self.scanner.check_token(TagToken):
- token = self.scanner.get_token()
- tag_mark = token.start_mark
- end_mark = token.end_mark
- tag = token.value
- elif self.scanner.check_token(TagToken):
- token = self.scanner.get_token()
- start_mark = tag_mark = token.start_mark
- end_mark = token.end_mark
- tag = token.value
- if self.scanner.check_token(AnchorToken):
- token = self.scanner.get_token()
- start_mark = tag_mark = token.start_mark
- end_mark = token.end_mark
- anchor = token.value
- if tag is not None:
- handle, suffix = tag
- if handle is not None:
- if handle not in self.tag_handles:
- raise ParserError(
- 'while parsing a node',
- start_mark,
- 'found undefined tag handle %r' % utf8(handle),
- tag_mark,
- )
- tag = self.transform_tag(handle, suffix)
- else:
- tag = suffix
- # if tag == u'!':
- # raise ParserError("while parsing a node", start_mark,
- # "found non-specific tag '!'", tag_mark,
- # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag'
- # and share your opinion.")
- if start_mark is None:
- start_mark = end_mark = self.scanner.peek_token().start_mark
- event = None
- implicit = tag is None or tag == u'!'
- if indentless_sequence and self.scanner.check_token(BlockEntryToken):
- comment = None
- pt = self.scanner.peek_token()
- if pt.comment and pt.comment[0]:
- comment = [pt.comment[0], []]
- pt.comment[0] = None
- end_mark = self.scanner.peek_token().end_mark
- event = SequenceStartEvent(
- anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment
- )
- self.state = self.parse_indentless_sequence_entry
- return event
- if self.scanner.check_token(ScalarToken):
- token = self.scanner.get_token()
- # self.scanner.peek_token_same_line_comment(token)
- end_mark = token.end_mark
- if (token.plain and tag is None) or tag == u'!':
- implicit = (True, False)
- elif tag is None:
- implicit = (False, True)
- else:
- implicit = (False, False)
- # nprint('se', token.value, token.comment)
- event = ScalarEvent(
- anchor,
- tag,
- implicit,
- token.value,
- start_mark,
- end_mark,
- style=token.style,
- comment=token.comment,
- )
- self.state = self.states.pop()
- elif self.scanner.check_token(FlowSequenceStartToken):
- pt = self.scanner.peek_token()
- end_mark = pt.end_mark
- event = SequenceStartEvent(
- anchor,
- tag,
- implicit,
- start_mark,
- end_mark,
- flow_style=True,
- comment=pt.comment,
- )
- self.state = self.parse_flow_sequence_first_entry
- elif self.scanner.check_token(FlowMappingStartToken):
- pt = self.scanner.peek_token()
- end_mark = pt.end_mark
- event = MappingStartEvent(
- anchor,
- tag,
- implicit,
- start_mark,
- end_mark,
- flow_style=True,
- comment=pt.comment,
- )
- self.state = self.parse_flow_mapping_first_key
- elif block and self.scanner.check_token(BlockSequenceStartToken):
- end_mark = self.scanner.peek_token().start_mark
- # should inserting the comment be dependent on the
- # indentation?
- pt = self.scanner.peek_token()
- comment = pt.comment
- # nprint('pt0', type(pt))
- if comment is None or comment[1] is None:
- comment = pt.split_comment()
- # nprint('pt1', comment)
- event = SequenceStartEvent(
- anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment
- )
- self.state = self.parse_block_sequence_first_entry
- elif block and self.scanner.check_token(BlockMappingStartToken):
- end_mark = self.scanner.peek_token().start_mark
- comment = self.scanner.peek_token().comment
- event = MappingStartEvent(
- anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment
- )
- self.state = self.parse_block_mapping_first_key
- elif anchor is not None or tag is not None:
- # Empty scalars are allowed even if a tag or an anchor is
- # specified.
- event = ScalarEvent(anchor, tag, (implicit, False), "", start_mark, end_mark)
- self.state = self.states.pop()
- else:
- if block:
- node = 'block'
- else:
- node = 'flow'
- token = self.scanner.peek_token()
- raise ParserError(
- 'while parsing a %s node' % node,
- start_mark,
- 'expected the node content, but found %r' % token.id,
- token.start_mark,
- )
- return event
- # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)*
- # BLOCK-END
- def parse_block_sequence_first_entry(self):
- # type: () -> Any
- token = self.scanner.get_token()
- # move any comment from start token
- # token.move_comment(self.scanner.peek_token())
- self.marks.append(token.start_mark)
- return self.parse_block_sequence_entry()
- def parse_block_sequence_entry(self):
- # type: () -> Any
- if self.scanner.check_token(BlockEntryToken):
- token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
- if not self.scanner.check_token(BlockEntryToken, BlockEndToken):
- self.states.append(self.parse_block_sequence_entry)
- return self.parse_block_node()
- else:
- self.state = self.parse_block_sequence_entry
- return self.process_empty_scalar(token.end_mark)
- if not self.scanner.check_token(BlockEndToken):
- token = self.scanner.peek_token()
- raise ParserError(
- 'while parsing a block collection',
- self.marks[-1],
- 'expected <block end>, but found %r' % token.id,
- token.start_mark,
- )
- token = self.scanner.get_token() # BlockEndToken
- event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- # indentless_sequence ::= (BLOCK-ENTRY block_node?)+
- # indentless_sequence?
- # sequence:
- # - entry
- # - nested
- def parse_indentless_sequence_entry(self):
- # type: () -> Any
- if self.scanner.check_token(BlockEntryToken):
- token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
- if not self.scanner.check_token(
- BlockEntryToken, KeyToken, ValueToken, BlockEndToken
- ):
- self.states.append(self.parse_indentless_sequence_entry)
- return self.parse_block_node()
- else:
- self.state = self.parse_indentless_sequence_entry
- return self.process_empty_scalar(token.end_mark)
- token = self.scanner.peek_token()
- event = SequenceEndEvent(token.start_mark, token.start_mark, comment=token.comment)
- self.state = self.states.pop()
- return event
- # block_mapping ::= BLOCK-MAPPING_START
- # ((KEY block_node_or_indentless_sequence?)?
- # (VALUE block_node_or_indentless_sequence?)?)*
- # BLOCK-END
- def parse_block_mapping_first_key(self):
- # type: () -> Any
- token = self.scanner.get_token()
- self.marks.append(token.start_mark)
- return self.parse_block_mapping_key()
- def parse_block_mapping_key(self):
- # type: () -> Any
- if self.scanner.check_token(KeyToken):
- token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
- if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
- self.states.append(self.parse_block_mapping_value)
- return self.parse_block_node_or_indentless_sequence()
- else:
- self.state = self.parse_block_mapping_value
- return self.process_empty_scalar(token.end_mark)
- if self.resolver.processing_version > (1, 1) and self.scanner.check_token(ValueToken):
- self.state = self.parse_block_mapping_value
- return self.process_empty_scalar(self.scanner.peek_token().start_mark)
- if not self.scanner.check_token(BlockEndToken):
- token = self.scanner.peek_token()
- raise ParserError(
- 'while parsing a block mapping',
- self.marks[-1],
- 'expected <block end>, but found %r' % token.id,
- token.start_mark,
- )
- token = self.scanner.get_token()
- token.move_comment(self.scanner.peek_token())
- event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_block_mapping_value(self):
- # type: () -> Any
- if self.scanner.check_token(ValueToken):
- token = self.scanner.get_token()
- # value token might have post comment move it to e.g. block
- if self.scanner.check_token(ValueToken):
- token.move_comment(self.scanner.peek_token())
- else:
- if not self.scanner.check_token(KeyToken):
- token.move_comment(self.scanner.peek_token(), empty=True)
- # else: empty value for this key cannot move token.comment
- if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken):
- self.states.append(self.parse_block_mapping_key)
- return self.parse_block_node_or_indentless_sequence()
- else:
- self.state = self.parse_block_mapping_key
- comment = token.comment
- if comment is None:
- token = self.scanner.peek_token()
- comment = token.comment
- if comment:
- token._comment = [None, comment[1]]
- comment = [comment[0], None]
- return self.process_empty_scalar(token.end_mark, comment=comment)
- else:
- self.state = self.parse_block_mapping_key
- token = self.scanner.peek_token()
- return self.process_empty_scalar(token.start_mark)
- # flow_sequence ::= FLOW-SEQUENCE-START
- # (flow_sequence_entry FLOW-ENTRY)*
- # flow_sequence_entry?
- # FLOW-SEQUENCE-END
- # flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- #
- # Note that while production rules for both flow_sequence_entry and
- # flow_mapping_entry are equal, their interpretations are different.
- # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?`
- # generate an inline mapping (set syntax).
- def parse_flow_sequence_first_entry(self):
- # type: () -> Any
- token = self.scanner.get_token()
- self.marks.append(token.start_mark)
- return self.parse_flow_sequence_entry(first=True)
- def parse_flow_sequence_entry(self, first=False):
- # type: (bool) -> Any
- if not self.scanner.check_token(FlowSequenceEndToken):
- if not first:
- if self.scanner.check_token(FlowEntryToken):
- self.scanner.get_token()
- else:
- token = self.scanner.peek_token()
- raise ParserError(
- 'while parsing a flow sequence',
- self.marks[-1],
- "expected ',' or ']', but got %r" % token.id,
- token.start_mark,
- )
- if self.scanner.check_token(KeyToken):
- token = self.scanner.peek_token()
- event = MappingStartEvent(
- None, None, True, token.start_mark, token.end_mark, flow_style=True
- ) # type: Any
- self.state = self.parse_flow_sequence_entry_mapping_key
- return event
- elif not self.scanner.check_token(FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry)
- return self.parse_flow_node()
- token = self.scanner.get_token()
- event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_flow_sequence_entry_mapping_key(self):
- # type: () -> Any
- token = self.scanner.get_token()
- if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry_mapping_value)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_sequence_entry_mapping_value
- return self.process_empty_scalar(token.end_mark)
- def parse_flow_sequence_entry_mapping_value(self):
- # type: () -> Any
- if self.scanner.check_token(ValueToken):
- token = self.scanner.get_token()
- if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry_mapping_end)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_sequence_entry_mapping_end
- return self.process_empty_scalar(token.end_mark)
- else:
- self.state = self.parse_flow_sequence_entry_mapping_end
- token = self.scanner.peek_token()
- return self.process_empty_scalar(token.start_mark)
- def parse_flow_sequence_entry_mapping_end(self):
- # type: () -> Any
- self.state = self.parse_flow_sequence_entry
- token = self.scanner.peek_token()
- return MappingEndEvent(token.start_mark, token.start_mark)
- # flow_mapping ::= FLOW-MAPPING-START
- # (flow_mapping_entry FLOW-ENTRY)*
- # flow_mapping_entry?
- # FLOW-MAPPING-END
- # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)?
- def parse_flow_mapping_first_key(self):
- # type: () -> Any
- token = self.scanner.get_token()
- self.marks.append(token.start_mark)
- return self.parse_flow_mapping_key(first=True)
- def parse_flow_mapping_key(self, first=False):
- # type: (Any) -> Any
- if not self.scanner.check_token(FlowMappingEndToken):
- if not first:
- if self.scanner.check_token(FlowEntryToken):
- self.scanner.get_token()
- else:
- token = self.scanner.peek_token()
- raise ParserError(
- 'while parsing a flow mapping',
- self.marks[-1],
- "expected ',' or '}', but got %r" % token.id,
- token.start_mark,
- )
- if self.scanner.check_token(KeyToken):
- token = self.scanner.get_token()
- if not self.scanner.check_token(
- ValueToken, FlowEntryToken, FlowMappingEndToken
- ):
- self.states.append(self.parse_flow_mapping_value)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_mapping_value
- return self.process_empty_scalar(token.end_mark)
- elif self.resolver.processing_version > (1, 1) and self.scanner.check_token(
- ValueToken
- ):
- self.state = self.parse_flow_mapping_value
- return self.process_empty_scalar(self.scanner.peek_token().end_mark)
- elif not self.scanner.check_token(FlowMappingEndToken):
- self.states.append(self.parse_flow_mapping_empty_value)
- return self.parse_flow_node()
- token = self.scanner.get_token()
- event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_flow_mapping_value(self):
- # type: () -> Any
- if self.scanner.check_token(ValueToken):
- token = self.scanner.get_token()
- if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken):
- self.states.append(self.parse_flow_mapping_key)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_mapping_key
- return self.process_empty_scalar(token.end_mark)
- else:
- self.state = self.parse_flow_mapping_key
- token = self.scanner.peek_token()
- return self.process_empty_scalar(token.start_mark)
- def parse_flow_mapping_empty_value(self):
- # type: () -> Any
- self.state = self.parse_flow_mapping_key
- return self.process_empty_scalar(self.scanner.peek_token().start_mark)
- def process_empty_scalar(self, mark, comment=None):
- # type: (Any, Any) -> Any
- return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment)
- class RoundTripParser(Parser):
- """roundtrip is a safe loader, that wants to see the unmangled tag"""
- def transform_tag(self, handle, suffix):
- # type: (Any, Any) -> Any
- # return self.tag_handles[handle]+suffix
- if handle == '!!' and suffix in (
- u'null',
- u'bool',
- u'int',
- u'float',
- u'binary',
- u'timestamp',
- u'omap',
- u'pairs',
- u'set',
- u'str',
- u'seq',
- u'map',
- ):
- return Parser.transform_tag(self, handle, suffix)
- return handle + suffix
|