123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980 |
- # coding: utf-8
- from __future__ import print_function, absolute_import, division, unicode_literals
- # Scanner produces tokens of the following types:
- # STREAM-START
- # STREAM-END
- # DIRECTIVE(name, value)
- # DOCUMENT-START
- # DOCUMENT-END
- # BLOCK-SEQUENCE-START
- # BLOCK-MAPPING-START
- # BLOCK-END
- # FLOW-SEQUENCE-START
- # FLOW-MAPPING-START
- # FLOW-SEQUENCE-END
- # FLOW-MAPPING-END
- # BLOCK-ENTRY
- # FLOW-ENTRY
- # KEY
- # VALUE
- # ALIAS(value)
- # ANCHOR(value)
- # TAG(value)
- # SCALAR(value, plain, style)
- #
- # RoundTripScanner
- # COMMENT(value)
- #
- # Read comments in the Scanner code for more details.
- #
- from ruamel.yaml.error import MarkedYAMLError
- from ruamel.yaml.tokens import * # NOQA
- from ruamel.yaml.compat import utf8, unichr, PY3, check_anchorname_char, nprint # NOQA
- if False: # MYPY
- from typing import Any, Dict, Optional, List, Union, Text # NOQA
- from ruamel.yaml.compat import VersionType # NOQA
- __all__ = ['Scanner', 'RoundTripScanner', 'ScannerError']
- _THE_END = '\n\0\r\x85\u2028\u2029'
- _THE_END_SPACE_TAB = ' \n\0\t\r\x85\u2028\u2029'
- _SPACE_TAB = ' \t'
- class ScannerError(MarkedYAMLError):
- pass
- class SimpleKey(object):
- # See below simple keys treatment.
- def __init__(self, token_number, required, index, line, column, mark):
- # type: (Any, Any, int, int, int, Any) -> None
- self.token_number = token_number
- self.required = required
- self.index = index
- self.line = line
- self.column = column
- self.mark = mark
- class Scanner(object):
- def __init__(self, loader=None):
- # type: (Any) -> None
- """Initialize the scanner."""
- # It is assumed that Scanner and Reader will have a common descendant.
- # Reader do the dirty work of checking for BOM and converting the
- # input data to Unicode. It also adds NUL to the end.
- #
- # Reader supports the following methods
- # self.peek(i=0) # peek the next i-th character
- # self.prefix(l=1) # peek the next l characters
- # self.forward(l=1) # read the next l characters and move the pointer
- self.loader = loader
- if self.loader is not None and getattr(self.loader, '_scanner', None) is None:
- self.loader._scanner = self
- self.reset_scanner()
- self.first_time = False
- self.yaml_version = None # type: Any
- @property
- def flow_level(self):
- # type: () -> int
- return len(self.flow_context)
- def reset_scanner(self):
- # type: () -> None
- # Had we reached the end of the stream?
- self.done = False
- # flow_context is an expanding/shrinking list consisting of '{' and '['
- # for each unclosed flow context. If empty list that means block context
- self.flow_context = [] # type: List[Text]
- # List of processed tokens that are not yet emitted.
- self.tokens = [] # type: List[Any]
- # Add the STREAM-START token.
- self.fetch_stream_start()
- # Number of tokens that were emitted through the `get_token` method.
- self.tokens_taken = 0
- # The current indentation level.
- self.indent = -1
- # Past indentation levels.
- self.indents = [] # type: List[int]
- # Variables related to simple keys treatment.
- # A simple key is a key that is not denoted by the '?' indicator.
- # Example of simple keys:
- # ---
- # block simple key: value
- # ? not a simple key:
- # : { flow simple key: value }
- # We emit the KEY token before all keys, so when we find a potential
- # simple key, we try to locate the corresponding ':' indicator.
- # Simple keys should be limited to a single line and 1024 characters.
- # Can a simple key start at the current position? A simple key may
- # start:
- # - at the beginning of the line, not counting indentation spaces
- # (in block context),
- # - after '{', '[', ',' (in the flow context),
- # - after '?', ':', '-' (in the block context).
- # In the block context, this flag also signifies if a block collection
- # may start at the current position.
- self.allow_simple_key = True
- # Keep track of possible simple keys. This is a dictionary. The key
- # is `flow_level`; there can be no more that one possible simple key
- # for each level. The value is a SimpleKey record:
- # (token_number, required, index, line, column, mark)
- # A simple key may start with ALIAS, ANCHOR, TAG, SCALAR(flow),
- # '[', or '{' tokens.
- self.possible_simple_keys = {} # type: Dict[Any, Any]
- @property
- def reader(self):
- # type: () -> Any
- try:
- return self._scanner_reader # type: ignore
- except AttributeError:
- if hasattr(self.loader, 'typ'):
- self._scanner_reader = self.loader.reader
- else:
- self._scanner_reader = self.loader._reader
- return self._scanner_reader
- @property
- def scanner_processing_version(self): # prefix until un-composited
- # type: () -> Any
- if hasattr(self.loader, 'typ'):
- return self.loader.resolver.processing_version
- return self.loader.processing_version
- # Public methods.
- def check_token(self, *choices):
- # type: (Any) -> bool
- # Check if the next token is one of the given types.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- if bool(self.tokens):
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.tokens[0], choice):
- return True
- return False
- def peek_token(self):
- # type: () -> Any
- # Return the next token, but do not delete if from the queue.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- if bool(self.tokens):
- return self.tokens[0]
- def get_token(self):
- # type: () -> Any
- # Return the next token.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- if bool(self.tokens):
- self.tokens_taken += 1
- return self.tokens.pop(0)
- # Private methods.
- def need_more_tokens(self):
- # type: () -> bool
- if self.done:
- return False
- if not self.tokens:
- return True
- # The current token may be a potential simple key, so we
- # need to look further.
- self.stale_possible_simple_keys()
- if self.next_possible_simple_key() == self.tokens_taken:
- return True
- return False
- def fetch_comment(self, comment):
- # type: (Any) -> None
- raise NotImplementedError
- def fetch_more_tokens(self):
- # type: () -> Any
- # Eat whitespaces and comments until we reach the next token.
- comment = self.scan_to_next_token()
- if comment is not None: # never happens for base scanner
- return self.fetch_comment(comment)
- # Remove obsolete possible simple keys.
- self.stale_possible_simple_keys()
- # Compare the current indentation and column. It may add some tokens
- # and decrease the current indentation level.
- self.unwind_indent(self.reader.column)
- # Peek the next character.
- ch = self.reader.peek()
- # Is it the end of stream?
- if ch == '\0':
- return self.fetch_stream_end()
- # Is it a directive?
- if ch == '%' and self.check_directive():
- return self.fetch_directive()
- # Is it the document start?
- if ch == '-' and self.check_document_start():
- return self.fetch_document_start()
- # Is it the document end?
- if ch == '.' and self.check_document_end():
- return self.fetch_document_end()
- # TODO: support for BOM within a stream.
- # if ch == u'\uFEFF':
- # return self.fetch_bom() <-- issue BOMToken
- # Note: the order of the following checks is NOT significant.
- # Is it the flow sequence start indicator?
- if ch == '[':
- return self.fetch_flow_sequence_start()
- # Is it the flow mapping start indicator?
- if ch == '{':
- return self.fetch_flow_mapping_start()
- # Is it the flow sequence end indicator?
- if ch == ']':
- return self.fetch_flow_sequence_end()
- # Is it the flow mapping end indicator?
- if ch == '}':
- return self.fetch_flow_mapping_end()
- # Is it the flow entry indicator?
- if ch == ',':
- return self.fetch_flow_entry()
- # Is it the block entry indicator?
- if ch == '-' and self.check_block_entry():
- return self.fetch_block_entry()
- # Is it the key indicator?
- if ch == '?' and self.check_key():
- return self.fetch_key()
- # Is it the value indicator?
- if ch == ':' and self.check_value():
- return self.fetch_value()
- # Is it an alias?
- if ch == '*':
- return self.fetch_alias()
- # Is it an anchor?
- if ch == '&':
- return self.fetch_anchor()
- # Is it a tag?
- if ch == '!':
- return self.fetch_tag()
- # Is it a literal scalar?
- if ch == '|' and not self.flow_level:
- return self.fetch_literal()
- # Is it a folded scalar?
- if ch == '>' and not self.flow_level:
- return self.fetch_folded()
- # Is it a single quoted scalar?
- if ch == "'":
- return self.fetch_single()
- # Is it a double quoted scalar?
- if ch == '"':
- return self.fetch_double()
- # It must be a plain scalar then.
- if self.check_plain():
- return self.fetch_plain()
- # No? It's an error. Let's produce a nice error message.
- raise ScannerError(
- 'while scanning for the next token',
- None,
- 'found character %r that cannot start any token' % utf8(ch),
- self.reader.get_mark(),
- )
- # Simple keys treatment.
- def next_possible_simple_key(self):
- # type: () -> Any
- # Return the number of the nearest possible simple key. Actually we
- # don't need to loop through the whole dictionary. We may replace it
- # with the following code:
- # if not self.possible_simple_keys:
- # return None
- # return self.possible_simple_keys[
- # min(self.possible_simple_keys.keys())].token_number
- min_token_number = None
- for level in self.possible_simple_keys:
- key = self.possible_simple_keys[level]
- if min_token_number is None or key.token_number < min_token_number:
- min_token_number = key.token_number
- return min_token_number
- def stale_possible_simple_keys(self):
- # type: () -> None
- # Remove entries that are no longer possible simple keys. According to
- # the YAML specification, simple keys
- # - should be limited to a single line,
- # - should be no longer than 1024 characters.
- # Disabling this procedure will allow simple keys of any length and
- # height (may cause problems if indentation is broken though).
- for level in list(self.possible_simple_keys):
- key = self.possible_simple_keys[level]
- if key.line != self.reader.line or self.reader.index - key.index > 1024:
- if key.required:
- raise ScannerError(
- 'while scanning a simple key',
- key.mark,
- "could not find expected ':'",
- self.reader.get_mark(),
- )
- del self.possible_simple_keys[level]
- def save_possible_simple_key(self):
- # type: () -> None
- # The next token may start a simple key. We check if it's possible
- # and save its position. This function is called for
- # ALIAS, ANCHOR, TAG, SCALAR(flow), '[', and '{'.
- # Check if a simple key is required at the current position.
- required = not self.flow_level and self.indent == self.reader.column
- # The next token might be a simple key. Let's save it's number and
- # position.
- if self.allow_simple_key:
- self.remove_possible_simple_key()
- token_number = self.tokens_taken + len(self.tokens)
- key = SimpleKey(
- token_number,
- required,
- self.reader.index,
- self.reader.line,
- self.reader.column,
- self.reader.get_mark(),
- )
- self.possible_simple_keys[self.flow_level] = key
- def remove_possible_simple_key(self):
- # type: () -> None
- # Remove the saved possible key position at the current flow level.
- if self.flow_level in self.possible_simple_keys:
- key = self.possible_simple_keys[self.flow_level]
- if key.required:
- raise ScannerError(
- 'while scanning a simple key',
- key.mark,
- "could not find expected ':'",
- self.reader.get_mark(),
- )
- del self.possible_simple_keys[self.flow_level]
- # Indentation functions.
- def unwind_indent(self, column):
- # type: (Any) -> None
- # In flow context, tokens should respect indentation.
- # Actually the condition should be `self.indent >= column` according to
- # the spec. But this condition will prohibit intuitively correct
- # constructions such as
- # key : {
- # }
- # ####
- # if self.flow_level and self.indent > column:
- # raise ScannerError(None, None,
- # "invalid intendation or unclosed '[' or '{'",
- # self.reader.get_mark())
- # In the flow context, indentation is ignored. We make the scanner less
- # restrictive then specification requires.
- if bool(self.flow_level):
- return
- # In block context, we may need to issue the BLOCK-END tokens.
- while self.indent > column:
- mark = self.reader.get_mark()
- self.indent = self.indents.pop()
- self.tokens.append(BlockEndToken(mark, mark))
- def add_indent(self, column):
- # type: (int) -> bool
- # Check if we need to increase indentation.
- if self.indent < column:
- self.indents.append(self.indent)
- self.indent = column
- return True
- return False
- # Fetchers.
- def fetch_stream_start(self):
- # type: () -> None
- # We always add STREAM-START as the first token and STREAM-END as the
- # last token.
- # Read the token.
- mark = self.reader.get_mark()
- # Add STREAM-START.
- self.tokens.append(StreamStartToken(mark, mark, encoding=self.reader.encoding))
- def fetch_stream_end(self):
- # type: () -> None
- # Set the current intendation to -1.
- self.unwind_indent(-1)
- # Reset simple keys.
- self.remove_possible_simple_key()
- self.allow_simple_key = False
- self.possible_simple_keys = {}
- # Read the token.
- mark = self.reader.get_mark()
- # Add STREAM-END.
- self.tokens.append(StreamEndToken(mark, mark))
- # The steam is finished.
- self.done = True
- def fetch_directive(self):
- # type: () -> None
- # Set the current intendation to -1.
- self.unwind_indent(-1)
- # Reset simple keys.
- self.remove_possible_simple_key()
- self.allow_simple_key = False
- # Scan and add DIRECTIVE.
- self.tokens.append(self.scan_directive())
- def fetch_document_start(self):
- # type: () -> None
- self.fetch_document_indicator(DocumentStartToken)
- def fetch_document_end(self):
- # type: () -> None
- self.fetch_document_indicator(DocumentEndToken)
- def fetch_document_indicator(self, TokenClass):
- # type: (Any) -> None
- # Set the current intendation to -1.
- self.unwind_indent(-1)
- # Reset simple keys. Note that there could not be a block collection
- # after '---'.
- self.remove_possible_simple_key()
- self.allow_simple_key = False
- # Add DOCUMENT-START or DOCUMENT-END.
- start_mark = self.reader.get_mark()
- self.reader.forward(3)
- end_mark = self.reader.get_mark()
- self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_sequence_start(self):
- # type: () -> None
- self.fetch_flow_collection_start(FlowSequenceStartToken, to_push='[')
- def fetch_flow_mapping_start(self):
- # type: () -> None
- self.fetch_flow_collection_start(FlowMappingStartToken, to_push='{')
- def fetch_flow_collection_start(self, TokenClass, to_push):
- # type: (Any, Text) -> None
- # '[' and '{' may start a simple key.
- self.save_possible_simple_key()
- # Increase the flow level.
- self.flow_context.append(to_push)
- # Simple keys are allowed after '[' and '{'.
- self.allow_simple_key = True
- # Add FLOW-SEQUENCE-START or FLOW-MAPPING-START.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_sequence_end(self):
- # type: () -> None
- self.fetch_flow_collection_end(FlowSequenceEndToken)
- def fetch_flow_mapping_end(self):
- # type: () -> None
- self.fetch_flow_collection_end(FlowMappingEndToken)
- def fetch_flow_collection_end(self, TokenClass):
- # type: (Any) -> None
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Decrease the flow level.
- try:
- popped = self.flow_context.pop() # NOQA
- except IndexError:
- # We must not be in a list or object.
- # Defer error handling to the parser.
- pass
- # No simple keys after ']' or '}'.
- self.allow_simple_key = False
- # Add FLOW-SEQUENCE-END or FLOW-MAPPING-END.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(TokenClass(start_mark, end_mark))
- def fetch_flow_entry(self):
- # type: () -> None
- # Simple keys are allowed after ','.
- self.allow_simple_key = True
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Add FLOW-ENTRY.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(FlowEntryToken(start_mark, end_mark))
- def fetch_block_entry(self):
- # type: () -> None
- # Block context needs additional checks.
- if not self.flow_level:
- # Are we allowed to start a new entry?
- if not self.allow_simple_key:
- raise ScannerError(
- None, None, 'sequence entries are not allowed here', self.reader.get_mark()
- )
- # We may need to add BLOCK-SEQUENCE-START.
- if self.add_indent(self.reader.column):
- mark = self.reader.get_mark()
- self.tokens.append(BlockSequenceStartToken(mark, mark))
- # It's an error for the block entry to occur in the flow context,
- # but we let the parser detect this.
- else:
- pass
- # Simple keys are allowed after '-'.
- self.allow_simple_key = True
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Add BLOCK-ENTRY.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(BlockEntryToken(start_mark, end_mark))
- def fetch_key(self):
- # type: () -> None
- # Block context needs additional checks.
- if not self.flow_level:
- # Are we allowed to start a key (not nessesary a simple)?
- if not self.allow_simple_key:
- raise ScannerError(
- None, None, 'mapping keys are not allowed here', self.reader.get_mark()
- )
- # We may need to add BLOCK-MAPPING-START.
- if self.add_indent(self.reader.column):
- mark = self.reader.get_mark()
- self.tokens.append(BlockMappingStartToken(mark, mark))
- # Simple keys are allowed after '?' in the block context.
- self.allow_simple_key = not self.flow_level
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Add KEY.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(KeyToken(start_mark, end_mark))
- def fetch_value(self):
- # type: () -> None
- # Do we determine a simple key?
- if self.flow_level in self.possible_simple_keys:
- # Add KEY.
- key = self.possible_simple_keys[self.flow_level]
- del self.possible_simple_keys[self.flow_level]
- self.tokens.insert(
- key.token_number - self.tokens_taken, KeyToken(key.mark, key.mark)
- )
- # If this key starts a new block mapping, we need to add
- # BLOCK-MAPPING-START.
- if not self.flow_level:
- if self.add_indent(key.column):
- self.tokens.insert(
- key.token_number - self.tokens_taken,
- BlockMappingStartToken(key.mark, key.mark),
- )
- # There cannot be two simple keys one after another.
- self.allow_simple_key = False
- # It must be a part of a complex key.
- else:
- # Block context needs additional checks.
- # (Do we really need them? They will be caught by the parser
- # anyway.)
- if not self.flow_level:
- # We are allowed to start a complex value if and only if
- # we can start a simple key.
- if not self.allow_simple_key:
- raise ScannerError(
- None,
- None,
- 'mapping values are not allowed here',
- self.reader.get_mark(),
- )
- # If this value starts a new block mapping, we need to add
- # BLOCK-MAPPING-START. It will be detected as an error later by
- # the parser.
- if not self.flow_level:
- if self.add_indent(self.reader.column):
- mark = self.reader.get_mark()
- self.tokens.append(BlockMappingStartToken(mark, mark))
- # Simple keys are allowed after ':' in the block context.
- self.allow_simple_key = not self.flow_level
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Add VALUE.
- start_mark = self.reader.get_mark()
- self.reader.forward()
- end_mark = self.reader.get_mark()
- self.tokens.append(ValueToken(start_mark, end_mark))
- def fetch_alias(self):
- # type: () -> None
- # ALIAS could be a simple key.
- self.save_possible_simple_key()
- # No simple keys after ALIAS.
- self.allow_simple_key = False
- # Scan and add ALIAS.
- self.tokens.append(self.scan_anchor(AliasToken))
- def fetch_anchor(self):
- # type: () -> None
- # ANCHOR could start a simple key.
- self.save_possible_simple_key()
- # No simple keys after ANCHOR.
- self.allow_simple_key = False
- # Scan and add ANCHOR.
- self.tokens.append(self.scan_anchor(AnchorToken))
- def fetch_tag(self):
- # type: () -> None
- # TAG could start a simple key.
- self.save_possible_simple_key()
- # No simple keys after TAG.
- self.allow_simple_key = False
- # Scan and add TAG.
- self.tokens.append(self.scan_tag())
- def fetch_literal(self):
- # type: () -> None
- self.fetch_block_scalar(style='|')
- def fetch_folded(self):
- # type: () -> None
- self.fetch_block_scalar(style='>')
- def fetch_block_scalar(self, style):
- # type: (Any) -> None
- # A simple key may follow a block scalar.
- self.allow_simple_key = True
- # Reset possible simple key on the current level.
- self.remove_possible_simple_key()
- # Scan and add SCALAR.
- self.tokens.append(self.scan_block_scalar(style))
- def fetch_single(self):
- # type: () -> None
- self.fetch_flow_scalar(style="'")
- def fetch_double(self):
- # type: () -> None
- self.fetch_flow_scalar(style='"')
- def fetch_flow_scalar(self, style):
- # type: (Any) -> None
- # A flow scalar could be a simple key.
- self.save_possible_simple_key()
- # No simple keys after flow scalars.
- self.allow_simple_key = False
- # Scan and add SCALAR.
- self.tokens.append(self.scan_flow_scalar(style))
- def fetch_plain(self):
- # type: () -> None
- # A plain scalar could be a simple key.
- self.save_possible_simple_key()
- # No simple keys after plain scalars. But note that `scan_plain` will
- # change this flag if the scan is finished at the beginning of the
- # line.
- self.allow_simple_key = False
- # Scan and add SCALAR. May change `allow_simple_key`.
- self.tokens.append(self.scan_plain())
- # Checkers.
- def check_directive(self):
- # type: () -> Any
- # DIRECTIVE: ^ '%' ...
- # The '%' indicator is already checked.
- if self.reader.column == 0:
- return True
- return None
- def check_document_start(self):
- # type: () -> Any
- # DOCUMENT-START: ^ '---' (' '|'\n')
- if self.reader.column == 0:
- if self.reader.prefix(3) == '---' and self.reader.peek(3) in _THE_END_SPACE_TAB:
- return True
- return None
- def check_document_end(self):
- # type: () -> Any
- # DOCUMENT-END: ^ '...' (' '|'\n')
- if self.reader.column == 0:
- if self.reader.prefix(3) == '...' and self.reader.peek(3) in _THE_END_SPACE_TAB:
- return True
- return None
- def check_block_entry(self):
- # type: () -> Any
- # BLOCK-ENTRY: '-' (' '|'\n')
- return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_key(self):
- # type: () -> Any
- # KEY(flow context): '?'
- if bool(self.flow_level):
- return True
- # KEY(block context): '?' (' '|'\n')
- return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_value(self):
- # type: () -> Any
- # VALUE(flow context): ':'
- if self.scanner_processing_version == (1, 1):
- if bool(self.flow_level):
- return True
- else:
- if bool(self.flow_level):
- if self.flow_context[-1] == '[':
- if self.reader.peek(1) not in _THE_END_SPACE_TAB:
- return False
- elif self.tokens and isinstance(self.tokens[-1], ValueToken):
- # mapping flow context scanning a value token
- if self.reader.peek(1) not in _THE_END_SPACE_TAB:
- return False
- return True
- # VALUE(block context): ':' (' '|'\n')
- return self.reader.peek(1) in _THE_END_SPACE_TAB
- def check_plain(self):
- # type: () -> Any
- # A plain scalar may start with any non-space character except:
- # '-', '?', ':', ',', '[', ']', '{', '}',
- # '#', '&', '*', '!', '|', '>', '\'', '\"',
- # '%', '@', '`'.
- #
- # It may also start with
- # '-', '?', ':'
- # if it is followed by a non-space character.
- #
- # Note that we limit the last rule to the block context (except the
- # '-' character) because we want the flow context to be space
- # independent.
- srp = self.reader.peek
- ch = srp()
- if self.scanner_processing_version == (1, 1):
- return ch not in '\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'"%@`' or (
- srp(1) not in _THE_END_SPACE_TAB
- and (ch == '-' or (not self.flow_level and ch in '?:'))
- )
- # YAML 1.2
- if ch not in '\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'"%@`':
- # ################### ^ ???
- return True
- ch1 = srp(1)
- if ch == '-' and ch1 not in _THE_END_SPACE_TAB:
- return True
- if ch == ':' and bool(self.flow_level) and ch1 not in _SPACE_TAB:
- return True
- return srp(1) not in _THE_END_SPACE_TAB and (
- ch == '-' or (not self.flow_level and ch in '?:')
- )
- # Scanners.
- def scan_to_next_token(self):
- # type: () -> Any
- # We ignore spaces, line breaks and comments.
- # If we find a line break in the block context, we set the flag
- # `allow_simple_key` on.
- # The byte order mark is stripped if it's the first character in the
- # stream. We do not yet support BOM inside the stream as the
- # specification requires. Any such mark will be considered as a part
- # of the document.
- #
- # TODO: We need to make tab handling rules more sane. A good rule is
- # Tabs cannot precede tokens
- # BLOCK-SEQUENCE-START, BLOCK-MAPPING-START, BLOCK-END,
- # KEY(block), VALUE(block), BLOCK-ENTRY
- # So the checking code is
- # if <TAB>:
- # self.allow_simple_keys = False
- # We also need to add the check for `allow_simple_keys == True` to
- # `unwind_indent` before issuing BLOCK-END.
- # Scanners for block, flow, and plain scalars need to be modified.
- srp = self.reader.peek
- srf = self.reader.forward
- if self.reader.index == 0 and srp() == '\uFEFF':
- srf()
- found = False
- _the_end = _THE_END
- while not found:
- while srp() == ' ':
- srf()
- if srp() == '#':
- while srp() not in _the_end:
- srf()
- if self.scan_line_break():
- if not self.flow_level:
- self.allow_simple_key = True
- else:
- found = True
- return None
- def scan_directive(self):
- # type: () -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- start_mark = self.reader.get_mark()
- srf()
- name = self.scan_directive_name(start_mark)
- value = None
- if name == 'YAML':
- value = self.scan_yaml_directive_value(start_mark)
- end_mark = self.reader.get_mark()
- elif name == 'TAG':
- value = self.scan_tag_directive_value(start_mark)
- end_mark = self.reader.get_mark()
- else:
- end_mark = self.reader.get_mark()
- while srp() not in _THE_END:
- srf()
- self.scan_directive_ignored_line(start_mark)
- return DirectiveToken(name, value, start_mark, end_mark)
- def scan_directive_name(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- length = 0
- srp = self.reader.peek
- ch = srp(length)
- while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in '-_:.':
- length += 1
- ch = srp(length)
- if not length:
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- 'expected alphabetic or numeric character, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- value = self.reader.prefix(length)
- self.reader.forward(length)
- ch = srp()
- if ch not in '\0 \r\n\x85\u2028\u2029':
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- 'expected alphabetic or numeric character, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- return value
- def scan_yaml_directive_value(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- while srp() == ' ':
- srf()
- major = self.scan_yaml_directive_number(start_mark)
- if srp() != '.':
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- "expected a digit or '.', but found %r" % utf8(srp()),
- self.reader.get_mark(),
- )
- srf()
- minor = self.scan_yaml_directive_number(start_mark)
- if srp() not in '\0 \r\n\x85\u2028\u2029':
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- "expected a digit or ' ', but found %r" % utf8(srp()),
- self.reader.get_mark(),
- )
- self.yaml_version = (major, minor)
- return self.yaml_version
- def scan_yaml_directive_number(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- ch = srp()
- if not ('0' <= ch <= '9'):
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- 'expected a digit, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- length = 0
- while '0' <= srp(length) <= '9':
- length += 1
- value = int(self.reader.prefix(length))
- srf(length)
- return value
- def scan_tag_directive_value(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- while srp() == ' ':
- srf()
- handle = self.scan_tag_directive_handle(start_mark)
- while srp() == ' ':
- srf()
- prefix = self.scan_tag_directive_prefix(start_mark)
- return (handle, prefix)
- def scan_tag_directive_handle(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- value = self.scan_tag_handle('directive', start_mark)
- ch = self.reader.peek()
- if ch != ' ':
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- "expected ' ', but found %r" % utf8(ch),
- self.reader.get_mark(),
- )
- return value
- def scan_tag_directive_prefix(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- value = self.scan_tag_uri('directive', start_mark)
- ch = self.reader.peek()
- if ch not in '\0 \r\n\x85\u2028\u2029':
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- "expected ' ', but found %r" % utf8(ch),
- self.reader.get_mark(),
- )
- return value
- def scan_directive_ignored_line(self, start_mark):
- # type: (Any) -> None
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- while srp() == ' ':
- srf()
- if srp() == '#':
- while srp() not in _THE_END:
- srf()
- ch = srp()
- if ch not in _THE_END:
- raise ScannerError(
- 'while scanning a directive',
- start_mark,
- 'expected a comment or a line break, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- self.scan_line_break()
- def scan_anchor(self, TokenClass):
- # type: (Any) -> Any
- # The specification does not restrict characters for anchors and
- # aliases. This may lead to problems, for instance, the document:
- # [ *alias, value ]
- # can be interpteted in two ways, as
- # [ "value" ]
- # and
- # [ *alias , "value" ]
- # Therefore we restrict aliases to numbers and ASCII letters.
- srp = self.reader.peek
- start_mark = self.reader.get_mark()
- indicator = srp()
- if indicator == '*':
- name = 'alias'
- else:
- name = 'anchor'
- self.reader.forward()
- length = 0
- ch = srp(length)
- # while u'0' <= ch <= u'9' or u'A' <= ch <= u'Z' or u'a' <= ch <= u'z' \
- # or ch in u'-_':
- while check_anchorname_char(ch):
- length += 1
- ch = srp(length)
- if not length:
- raise ScannerError(
- 'while scanning an %s' % (name,),
- start_mark,
- 'expected alphabetic or numeric character, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- value = self.reader.prefix(length)
- self.reader.forward(length)
- # ch1 = ch
- # ch = srp() # no need to peek, ch is already set
- # assert ch1 == ch
- if ch not in '\0 \t\r\n\x85\u2028\u2029?:,[]{}%@`':
- raise ScannerError(
- 'while scanning an %s' % (name,),
- start_mark,
- 'expected alphabetic or numeric character, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- end_mark = self.reader.get_mark()
- return TokenClass(value, start_mark, end_mark)
- def scan_tag(self):
- # type: () -> Any
- # See the specification for details.
- srp = self.reader.peek
- start_mark = self.reader.get_mark()
- ch = srp(1)
- if ch == '<':
- handle = None
- self.reader.forward(2)
- suffix = self.scan_tag_uri('tag', start_mark)
- if srp() != '>':
- raise ScannerError(
- 'while parsing a tag',
- start_mark,
- "expected '>', but found %r" % utf8(srp()),
- self.reader.get_mark(),
- )
- self.reader.forward()
- elif ch in _THE_END_SPACE_TAB:
- handle = None
- suffix = '!'
- self.reader.forward()
- else:
- length = 1
- use_handle = False
- while ch not in '\0 \r\n\x85\u2028\u2029':
- if ch == '!':
- use_handle = True
- break
- length += 1
- ch = srp(length)
- handle = '!'
- if use_handle:
- handle = self.scan_tag_handle('tag', start_mark)
- else:
- handle = '!'
- self.reader.forward()
- suffix = self.scan_tag_uri('tag', start_mark)
- ch = srp()
- if ch not in '\0 \r\n\x85\u2028\u2029':
- raise ScannerError(
- 'while scanning a tag',
- start_mark,
- "expected ' ', but found %r" % utf8(ch),
- self.reader.get_mark(),
- )
- value = (handle, suffix)
- end_mark = self.reader.get_mark()
- return TagToken(value, start_mark, end_mark)
- def scan_block_scalar(self, style, rt=False):
- # type: (Any, Optional[bool]) -> Any
- # See the specification for details.
- srp = self.reader.peek
- if style == '>':
- folded = True
- else:
- folded = False
- chunks = [] # type: List[Any]
- start_mark = self.reader.get_mark()
- # Scan the header.
- self.reader.forward()
- chomping, increment = self.scan_block_scalar_indicators(start_mark)
- # block scalar comment e.g. : |+ # comment text
- block_scalar_comment = self.scan_block_scalar_ignored_line(start_mark)
- # Determine the indentation level and go to the first non-empty line.
- min_indent = self.indent + 1
- if increment is None:
- # no increment and top level, min_indent could be 0
- if min_indent < 1 and (
- style not in '|>'
- or (self.scanner_processing_version == (1, 1))
- and getattr(
- self.loader, 'top_level_block_style_scalar_no_indent_error_1_1', False
- )
- ):
- min_indent = 1
- breaks, max_indent, end_mark = self.scan_block_scalar_indentation()
- indent = max(min_indent, max_indent)
- else:
- if min_indent < 1:
- min_indent = 1
- indent = min_indent + increment - 1
- breaks, end_mark = self.scan_block_scalar_breaks(indent)
- line_break = ""
- # Scan the inner part of the block scalar.
- while self.reader.column == indent and srp() != '\0':
- chunks.extend(breaks)
- leading_non_space = srp() not in ' \t'
- length = 0
- while srp(length) not in _THE_END:
- length += 1
- chunks.append(self.reader.prefix(length))
- self.reader.forward(length)
- line_break = self.scan_line_break()
- breaks, end_mark = self.scan_block_scalar_breaks(indent)
- if style in '|>' and min_indent == 0:
- # at the beginning of a line, if in block style see if
- # end of document/start_new_document
- if self.check_document_start() or self.check_document_end():
- break
- if self.reader.column == indent and srp() != '\0':
- # Unfortunately, folding rules are ambiguous.
- #
- # This is the folding according to the specification:
- if rt and folded and line_break == '\n':
- chunks.append('\a')
- if folded and line_break == '\n' and leading_non_space and srp() not in ' \t':
- if not breaks:
- chunks.append(' ')
- else:
- chunks.append(line_break)
- # This is Clark Evans's interpretation (also in the spec
- # examples):
- #
- # if folded and line_break == u'\n':
- # if not breaks:
- # if srp() not in ' \t':
- # chunks.append(u' ')
- # else:
- # chunks.append(line_break)
- # else:
- # chunks.append(line_break)
- else:
- break
- # Process trailing line breaks. The 'chomping' setting determines
- # whether they are included in the value.
- trailing = [] # type: List[Any]
- if chomping in [None, True]:
- chunks.append(line_break)
- if chomping is True:
- chunks.extend(breaks)
- elif chomping in [None, False]:
- trailing.extend(breaks)
- # We are done.
- token = ScalarToken("".join(chunks), False, start_mark, end_mark, style)
- if block_scalar_comment is not None:
- token.add_pre_comments([block_scalar_comment])
- if len(trailing) > 0:
- # nprint('trailing 1', trailing) # XXXXX
- # Eat whitespaces and comments until we reach the next token.
- comment = self.scan_to_next_token()
- while comment:
- trailing.append(' ' * comment[1].column + comment[0])
- comment = self.scan_to_next_token()
- # Keep track of the trailing whitespace and following comments
- # as a comment token, if isn't all included in the actual value.
- comment_end_mark = self.reader.get_mark()
- comment = CommentToken("".join(trailing), end_mark, comment_end_mark)
- token.add_post_comment(comment)
- return token
- def scan_block_scalar_indicators(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- chomping = None
- increment = None
- ch = srp()
- if ch in '+-':
- if ch == '+':
- chomping = True
- else:
- chomping = False
- self.reader.forward()
- ch = srp()
- if ch in '0123456789':
- increment = int(ch)
- if increment == 0:
- raise ScannerError(
- 'while scanning a block scalar',
- start_mark,
- 'expected indentation indicator in the range 1-9, ' 'but found 0',
- self.reader.get_mark(),
- )
- self.reader.forward()
- elif ch in '0123456789':
- increment = int(ch)
- if increment == 0:
- raise ScannerError(
- 'while scanning a block scalar',
- start_mark,
- 'expected indentation indicator in the range 1-9, ' 'but found 0',
- self.reader.get_mark(),
- )
- self.reader.forward()
- ch = srp()
- if ch in '+-':
- if ch == '+':
- chomping = True
- else:
- chomping = False
- self.reader.forward()
- ch = srp()
- if ch not in '\0 \r\n\x85\u2028\u2029':
- raise ScannerError(
- 'while scanning a block scalar',
- start_mark,
- 'expected chomping or indentation indicators, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- return chomping, increment
- def scan_block_scalar_ignored_line(self, start_mark):
- # type: (Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- prefix = ''
- comment = None
- while srp() == ' ':
- prefix += srp()
- srf()
- if srp() == '#':
- comment = prefix
- while srp() not in _THE_END:
- comment += srp()
- srf()
- ch = srp()
- if ch not in _THE_END:
- raise ScannerError(
- 'while scanning a block scalar',
- start_mark,
- 'expected a comment or a line break, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- self.scan_line_break()
- return comment
- def scan_block_scalar_indentation(self):
- # type: () -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- chunks = []
- max_indent = 0
- end_mark = self.reader.get_mark()
- while srp() in ' \r\n\x85\u2028\u2029':
- if srp() != ' ':
- chunks.append(self.scan_line_break())
- end_mark = self.reader.get_mark()
- else:
- srf()
- if self.reader.column > max_indent:
- max_indent = self.reader.column
- return chunks, max_indent, end_mark
- def scan_block_scalar_breaks(self, indent):
- # type: (int) -> Any
- # See the specification for details.
- chunks = []
- srp = self.reader.peek
- srf = self.reader.forward
- end_mark = self.reader.get_mark()
- while self.reader.column < indent and srp() == ' ':
- srf()
- while srp() in '\r\n\x85\u2028\u2029':
- chunks.append(self.scan_line_break())
- end_mark = self.reader.get_mark()
- while self.reader.column < indent and srp() == ' ':
- srf()
- return chunks, end_mark
- def scan_flow_scalar(self, style):
- # type: (Any) -> Any
- # See the specification for details.
- # Note that we loose indentation rules for quoted scalars. Quoted
- # scalars don't need to adhere indentation because " and ' clearly
- # mark the beginning and the end of them. Therefore we are less
- # restrictive then the specification requires. We only need to check
- # that document separators are not included in scalars.
- if style == '"':
- double = True
- else:
- double = False
- srp = self.reader.peek
- chunks = [] # type: List[Any]
- start_mark = self.reader.get_mark()
- quote = srp()
- self.reader.forward()
- chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark))
- while srp() != quote:
- chunks.extend(self.scan_flow_scalar_spaces(double, start_mark))
- chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark))
- self.reader.forward()
- end_mark = self.reader.get_mark()
- return ScalarToken("".join(chunks), False, start_mark, end_mark, style)
- ESCAPE_REPLACEMENTS = {
- '0': '\0',
- 'a': '\x07',
- 'b': '\x08',
- 't': '\x09',
- '\t': '\x09',
- 'n': '\x0A',
- 'v': '\x0B',
- 'f': '\x0C',
- 'r': '\x0D',
- 'e': '\x1B',
- ' ': '\x20',
- '"': '"',
- '/': '/', # as per http://www.json.org/
- '\\': '\\',
- 'N': '\x85',
- '_': '\xA0',
- 'L': '\u2028',
- 'P': '\u2029',
- }
- ESCAPE_CODES = {'x': 2, 'u': 4, 'U': 8}
- def scan_flow_scalar_non_spaces(self, double, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- chunks = [] # type: List[Any]
- srp = self.reader.peek
- srf = self.reader.forward
- while True:
- length = 0
- while srp(length) not in ' \n\'"\\\0\t\r\x85\u2028\u2029':
- length += 1
- if length != 0:
- chunks.append(self.reader.prefix(length))
- srf(length)
- ch = srp()
- if not double and ch == "'" and srp(1) == "'":
- chunks.append("'")
- srf(2)
- elif (double and ch == "'") or (not double and ch in '"\\'):
- chunks.append(ch)
- srf()
- elif double and ch == '\\':
- srf()
- ch = srp()
- if ch in self.ESCAPE_REPLACEMENTS:
- chunks.append(self.ESCAPE_REPLACEMENTS[ch])
- srf()
- elif ch in self.ESCAPE_CODES:
- length = self.ESCAPE_CODES[ch]
- srf()
- for k in range(length):
- if srp(k) not in '0123456789ABCDEFabcdef':
- raise ScannerError(
- 'while scanning a double-quoted scalar',
- start_mark,
- 'expected escape sequence of %d hexdecimal '
- 'numbers, but found %r' % (length, utf8(srp(k))),
- self.reader.get_mark(),
- )
- code = int(self.reader.prefix(length), 16)
- chunks.append(unichr(code))
- srf(length)
- elif ch in '\n\r\x85\u2028\u2029':
- self.scan_line_break()
- chunks.extend(self.scan_flow_scalar_breaks(double, start_mark))
- else:
- raise ScannerError(
- 'while scanning a double-quoted scalar',
- start_mark,
- 'found unknown escape character %r' % utf8(ch),
- self.reader.get_mark(),
- )
- else:
- return chunks
- def scan_flow_scalar_spaces(self, double, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- chunks = []
- length = 0
- while srp(length) in ' \t':
- length += 1
- whitespaces = self.reader.prefix(length)
- self.reader.forward(length)
- ch = srp()
- if ch == '\0':
- raise ScannerError(
- 'while scanning a quoted scalar',
- start_mark,
- 'found unexpected end of stream',
- self.reader.get_mark(),
- )
- elif ch in '\r\n\x85\u2028\u2029':
- line_break = self.scan_line_break()
- breaks = self.scan_flow_scalar_breaks(double, start_mark)
- if line_break != '\n':
- chunks.append(line_break)
- elif not breaks:
- chunks.append(' ')
- chunks.extend(breaks)
- else:
- chunks.append(whitespaces)
- return chunks
- def scan_flow_scalar_breaks(self, double, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- chunks = [] # type: List[Any]
- srp = self.reader.peek
- srf = self.reader.forward
- while True:
- # Instead of checking indentation, we check for document
- # separators.
- prefix = self.reader.prefix(3)
- if (prefix == '---' or prefix == '...') and srp(3) in _THE_END_SPACE_TAB:
- raise ScannerError(
- 'while scanning a quoted scalar',
- start_mark,
- 'found unexpected document separator',
- self.reader.get_mark(),
- )
- while srp() in ' \t':
- srf()
- if srp() in '\r\n\x85\u2028\u2029':
- chunks.append(self.scan_line_break())
- else:
- return chunks
- def scan_plain(self):
- # type: () -> Any
- # See the specification for details.
- # We add an additional restriction for the flow context:
- # plain scalars in the flow context cannot contain ',', ': ' and '?'.
- # We also keep track of the `allow_simple_key` flag here.
- # Indentation rules are loosed for the flow context.
- srp = self.reader.peek
- srf = self.reader.forward
- chunks = [] # type: List[Any]
- start_mark = self.reader.get_mark()
- end_mark = start_mark
- indent = self.indent + 1
- # We allow zero indentation for scalars, but then we need to check for
- # document separators at the beginning of the line.
- # if indent == 0:
- # indent = 1
- spaces = [] # type: List[Any]
- while True:
- length = 0
- if srp() == '#':
- break
- while True:
- ch = srp(length)
- if ch == ':' and srp(length + 1) not in _THE_END_SPACE_TAB:
- pass
- elif ch == '?' and self.scanner_processing_version != (1, 1):
- pass
- elif (
- ch in _THE_END_SPACE_TAB
- or (
- not self.flow_level
- and ch == ':'
- and srp(length + 1) in _THE_END_SPACE_TAB
- )
- or (self.flow_level and ch in ',:?[]{}')
- ):
- break
- length += 1
- # It's not clear what we should do with ':' in the flow context.
- if (
- self.flow_level
- and ch == ':'
- and srp(length + 1) not in '\0 \t\r\n\x85\u2028\u2029,[]{}'
- ):
- srf(length)
- raise ScannerError(
- 'while scanning a plain scalar',
- start_mark,
- "found unexpected ':'",
- self.reader.get_mark(),
- 'Please check '
- 'http://pyyaml.org/wiki/YAMLColonInFlowContext '
- 'for details.',
- )
- if length == 0:
- break
- self.allow_simple_key = False
- chunks.extend(spaces)
- chunks.append(self.reader.prefix(length))
- srf(length)
- end_mark = self.reader.get_mark()
- spaces = self.scan_plain_spaces(indent, start_mark)
- if (
- not spaces
- or srp() == '#'
- or (not self.flow_level and self.reader.column < indent)
- ):
- break
- token = ScalarToken("".join(chunks), True, start_mark, end_mark)
- if spaces and spaces[0] == '\n':
- # Create a comment token to preserve the trailing line breaks.
- comment = CommentToken("".join(spaces) + '\n', start_mark, end_mark)
- token.add_post_comment(comment)
- return token
- def scan_plain_spaces(self, indent, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- # The specification is really confusing about tabs in plain scalars.
- # We just forbid them completely. Do not use tabs in YAML!
- srp = self.reader.peek
- srf = self.reader.forward
- chunks = []
- length = 0
- while srp(length) in ' ':
- length += 1
- whitespaces = self.reader.prefix(length)
- self.reader.forward(length)
- ch = srp()
- if ch in '\r\n\x85\u2028\u2029':
- line_break = self.scan_line_break()
- self.allow_simple_key = True
- prefix = self.reader.prefix(3)
- if (prefix == '---' or prefix == '...') and srp(3) in _THE_END_SPACE_TAB:
- return
- breaks = []
- while srp() in ' \r\n\x85\u2028\u2029':
- if srp() == ' ':
- srf()
- else:
- breaks.append(self.scan_line_break())
- prefix = self.reader.prefix(3)
- if (prefix == '---' or prefix == '...') and srp(3) in _THE_END_SPACE_TAB:
- return
- if line_break != '\n':
- chunks.append(line_break)
- elif not breaks:
- chunks.append(' ')
- chunks.extend(breaks)
- elif whitespaces:
- chunks.append(whitespaces)
- return chunks
- def scan_tag_handle(self, name, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- # For some strange reasons, the specification does not allow '_' in
- # tag handles. I have allowed it anyway.
- srp = self.reader.peek
- ch = srp()
- if ch != '!':
- raise ScannerError(
- 'while scanning a %s' % (name,),
- start_mark,
- "expected '!', but found %r" % utf8(ch),
- self.reader.get_mark(),
- )
- length = 1
- ch = srp(length)
- if ch != ' ':
- while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' or ch in '-_':
- length += 1
- ch = srp(length)
- if ch != '!':
- self.reader.forward(length)
- raise ScannerError(
- 'while scanning a %s' % (name,),
- start_mark,
- "expected '!', but found %r" % utf8(ch),
- self.reader.get_mark(),
- )
- length += 1
- value = self.reader.prefix(length)
- self.reader.forward(length)
- return value
- def scan_tag_uri(self, name, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- # Note: we do not check if URI is well-formed.
- srp = self.reader.peek
- chunks = []
- length = 0
- ch = srp(length)
- while (
- '0' <= ch <= '9'
- or 'A' <= ch <= 'Z'
- or 'a' <= ch <= 'z'
- or ch in "-;/?:@&=+$,_.!~*'()[]%"
- or ((self.scanner_processing_version > (1, 1)) and ch == '#')
- ):
- if ch == '%':
- chunks.append(self.reader.prefix(length))
- self.reader.forward(length)
- length = 0
- chunks.append(self.scan_uri_escapes(name, start_mark))
- else:
- length += 1
- ch = srp(length)
- if length != 0:
- chunks.append(self.reader.prefix(length))
- self.reader.forward(length)
- length = 0
- if not chunks:
- raise ScannerError(
- 'while parsing a %s' % (name,),
- start_mark,
- 'expected URI, but found %r' % utf8(ch),
- self.reader.get_mark(),
- )
- return "".join(chunks)
- def scan_uri_escapes(self, name, start_mark):
- # type: (Any, Any) -> Any
- # See the specification for details.
- srp = self.reader.peek
- srf = self.reader.forward
- code_bytes = [] # type: List[Any]
- mark = self.reader.get_mark()
- while srp() == '%':
- srf()
- for k in range(2):
- if srp(k) not in '0123456789ABCDEFabcdef':
- raise ScannerError(
- 'while scanning a %s' % (name,),
- start_mark,
- 'expected URI escape sequence of 2 hexdecimal numbers,'
- ' but found %r' % utf8(srp(k)),
- self.reader.get_mark(),
- )
- if PY3:
- code_bytes.append(int(self.reader.prefix(2), 16))
- else:
- code_bytes.append(chr(int(self.reader.prefix(2), 16)))
- srf(2)
- try:
- if PY3:
- value = bytes(code_bytes).decode('utf-8')
- else:
- value = unicode(b"".join(code_bytes), 'utf-8')
- except UnicodeDecodeError as exc:
- raise ScannerError('while scanning a %s' % (name,), start_mark, str(exc), mark)
- return value
- def scan_line_break(self):
- # type: () -> Any
- # Transforms:
- # '\r\n' : '\n'
- # '\r' : '\n'
- # '\n' : '\n'
- # '\x85' : '\n'
- # '\u2028' : '\u2028'
- # '\u2029 : '\u2029'
- # default : ''
- ch = self.reader.peek()
- if ch in '\r\n\x85':
- if self.reader.prefix(2) == '\r\n':
- self.reader.forward(2)
- else:
- self.reader.forward()
- return '\n'
- elif ch in '\u2028\u2029':
- self.reader.forward()
- return ch
- return ""
- class RoundTripScanner(Scanner):
- def check_token(self, *choices):
- # type: (Any) -> bool
- # Check if the next token is one of the given types.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- self._gather_comments()
- if bool(self.tokens):
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.tokens[0], choice):
- return True
- return False
- def peek_token(self):
- # type: () -> Any
- # Return the next token, but do not delete if from the queue.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- self._gather_comments()
- if bool(self.tokens):
- return self.tokens[0]
- return None
- def _gather_comments(self):
- # type: () -> Any
- """combine multiple comment lines"""
- comments = [] # type: List[Any]
- if not self.tokens:
- return comments
- if isinstance(self.tokens[0], CommentToken):
- comment = self.tokens.pop(0)
- self.tokens_taken += 1
- comments.append(comment)
- while self.need_more_tokens():
- self.fetch_more_tokens()
- if not self.tokens:
- return comments
- if isinstance(self.tokens[0], CommentToken):
- self.tokens_taken += 1
- comment = self.tokens.pop(0)
- # nprint('dropping2', comment)
- comments.append(comment)
- if len(comments) >= 1:
- self.tokens[0].add_pre_comments(comments)
- # pull in post comment on e.g. ':'
- if not self.done and len(self.tokens) < 2:
- self.fetch_more_tokens()
- def get_token(self):
- # type: () -> Any
- # Return the next token.
- while self.need_more_tokens():
- self.fetch_more_tokens()
- self._gather_comments()
- if bool(self.tokens):
- # nprint('tk', self.tokens)
- # only add post comment to single line tokens:
- # scalar, value token. FlowXEndToken, otherwise
- # hidden streamtokens could get them (leave them and they will be
- # pre comments for the next map/seq
- if (
- len(self.tokens) > 1
- and isinstance(
- self.tokens[0],
- (ScalarToken, ValueToken, FlowSequenceEndToken, FlowMappingEndToken),
- )
- and isinstance(self.tokens[1], CommentToken)
- and self.tokens[0].end_mark.line == self.tokens[1].start_mark.line
- ):
- self.tokens_taken += 1
- c = self.tokens.pop(1)
- self.fetch_more_tokens()
- while len(self.tokens) > 1 and isinstance(self.tokens[1], CommentToken):
- self.tokens_taken += 1
- c1 = self.tokens.pop(1)
- c.value = c.value + (' ' * c1.start_mark.column) + c1.value
- self.fetch_more_tokens()
- self.tokens[0].add_post_comment(c)
- elif (
- len(self.tokens) > 1
- and isinstance(self.tokens[0], ScalarToken)
- and isinstance(self.tokens[1], CommentToken)
- and self.tokens[0].end_mark.line != self.tokens[1].start_mark.line
- ):
- self.tokens_taken += 1
- c = self.tokens.pop(1)
- c.value = (
- '\n' * (c.start_mark.line - self.tokens[0].end_mark.line)
- + (' ' * c.start_mark.column)
- + c.value
- )
- self.tokens[0].add_post_comment(c)
- self.fetch_more_tokens()
- while len(self.tokens) > 1 and isinstance(self.tokens[1], CommentToken):
- self.tokens_taken += 1
- c1 = self.tokens.pop(1)
- c.value = c.value + (' ' * c1.start_mark.column) + c1.value
- self.fetch_more_tokens()
- self.tokens_taken += 1
- return self.tokens.pop(0)
- return None
- def fetch_comment(self, comment):
- # type: (Any) -> None
- value, start_mark, end_mark = comment
- while value and value[-1] == ' ':
- # empty line within indented key context
- # no need to update end-mark, that is not used
- value = value[:-1]
- self.tokens.append(CommentToken(value, start_mark, end_mark))
- # scanner
- def scan_to_next_token(self):
- # type: () -> Any
- # We ignore spaces, line breaks and comments.
- # If we find a line break in the block context, we set the flag
- # `allow_simple_key` on.
- # The byte order mark is stripped if it's the first character in the
- # stream. We do not yet support BOM inside the stream as the
- # specification requires. Any such mark will be considered as a part
- # of the document.
- #
- # TODO: We need to make tab handling rules more sane. A good rule is
- # Tabs cannot precede tokens
- # BLOCK-SEQUENCE-START, BLOCK-MAPPING-START, BLOCK-END,
- # KEY(block), VALUE(block), BLOCK-ENTRY
- # So the checking code is
- # if <TAB>:
- # self.allow_simple_keys = False
- # We also need to add the check for `allow_simple_keys == True` to
- # `unwind_indent` before issuing BLOCK-END.
- # Scanners for block, flow, and plain scalars need to be modified.
- srp = self.reader.peek
- srf = self.reader.forward
- if self.reader.index == 0 and srp() == '\uFEFF':
- srf()
- found = False
- while not found:
- while srp() == ' ':
- srf()
- ch = srp()
- if ch == '#':
- start_mark = self.reader.get_mark()
- comment = ch
- srf()
- while ch not in _THE_END:
- ch = srp()
- if ch == '\0': # don't gobble the end-of-stream character
- # but add an explicit newline as "YAML processors should terminate
- # the stream with an explicit line break
- # https://yaml.org/spec/1.2/spec.html#id2780069
- comment += '\n'
- break
- comment += ch
- srf()
- # gather any blank lines following the comment too
- ch = self.scan_line_break()
- while len(ch) > 0:
- comment += ch
- ch = self.scan_line_break()
- end_mark = self.reader.get_mark()
- if not self.flow_level:
- self.allow_simple_key = True
- return comment, start_mark, end_mark
- if bool(self.scan_line_break()):
- start_mark = self.reader.get_mark()
- if not self.flow_level:
- self.allow_simple_key = True
- ch = srp()
- if ch == '\n': # empty toplevel lines
- start_mark = self.reader.get_mark()
- comment = ""
- while ch:
- ch = self.scan_line_break(empty_line=True)
- comment += ch
- if srp() == '#':
- # empty line followed by indented real comment
- comment = comment.rsplit('\n', 1)[0] + '\n'
- end_mark = self.reader.get_mark()
- return comment, start_mark, end_mark
- else:
- found = True
- return None
- def scan_line_break(self, empty_line=False):
- # type: (bool) -> Text
- # Transforms:
- # '\r\n' : '\n'
- # '\r' : '\n'
- # '\n' : '\n'
- # '\x85' : '\n'
- # '\u2028' : '\u2028'
- # '\u2029 : '\u2029'
- # default : ''
- ch = self.reader.peek() # type: Text
- if ch in '\r\n\x85':
- if self.reader.prefix(2) == '\r\n':
- self.reader.forward(2)
- else:
- self.reader.forward()
- return '\n'
- elif ch in '\u2028\u2029':
- self.reader.forward()
- return ch
- elif empty_line and ch in '\t ':
- self.reader.forward()
- return ch
- return ""
- def scan_block_scalar(self, style, rt=True):
- # type: (Any, Optional[bool]) -> Any
- return Scanner.scan_block_scalar(self, style, rt=rt)
- # try:
- # import psyco
- # psyco.bind(Scanner)
- # except ImportError:
- # pass
|