123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806 |
- # coding: utf-8
- from __future__ import print_function, absolute_import, division
- import datetime
- import base64
- import binascii
- import re
- import sys
- import types
- import warnings
- # fmt: off
- from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning,
- MantissaNoDotYAML1_1Warning)
- from ruamel.yaml.nodes import * # NOQA
- from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode)
- from ruamel.yaml.compat import (utf8, builtins_module, to_str, PY2, PY3, # NOQA
- text_type, nprint, nprintf, version_tnf)
- from ruamel.yaml.compat import ordereddict, Hashable, MutableSequence # type: ignore
- from ruamel.yaml.compat import MutableMapping # type: ignore
- from ruamel.yaml.comments import * # NOQA
- from ruamel.yaml.comments import (CommentedMap, CommentedOrderedMap, CommentedSet,
- CommentedKeySeq, CommentedSeq, TaggedScalar,
- CommentedKeyMap)
- from ruamel.yaml.scalarstring import (SingleQuotedScalarString, DoubleQuotedScalarString,
- LiteralScalarString, FoldedScalarString,
- PlainScalarString, ScalarString,)
- from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt
- from ruamel.yaml.scalarfloat import ScalarFloat
- from ruamel.yaml.scalarbool import ScalarBoolean
- from ruamel.yaml.timestamp import TimeStamp
- from ruamel.yaml.util import RegExp
- if False: # MYPY
- from typing import Any, Dict, List, Set, Generator, Union, Optional # NOQA
- __all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor',
- 'ConstructorError', 'RoundTripConstructor']
- # fmt: on
- class ConstructorError(MarkedYAMLError):
- pass
- class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning):
- pass
- class DuplicateKeyError(MarkedYAMLFutureWarning):
- pass
- class BaseConstructor(object):
- yaml_constructors = {} # type: Dict[Any, Any]
- yaml_multi_constructors = {} # type: Dict[Any, Any]
- def __init__(self, preserve_quotes=None, loader=None):
- # type: (Optional[bool], Any) -> None
- self.loader = loader
- if self.loader is not None and getattr(self.loader, '_constructor', None) is None:
- self.loader._constructor = self
- self.loader = loader
- self.yaml_base_dict_type = dict
- self.yaml_base_list_type = list
- self.constructed_objects = {} # type: Dict[Any, Any]
- self.recursive_objects = {} # type: Dict[Any, Any]
- self.state_generators = [] # type: List[Any]
- self.deep_construct = False
- self._preserve_quotes = preserve_quotes
- self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16))
- @property
- def composer(self):
- # type: () -> Any
- if hasattr(self.loader, 'typ'):
- return self.loader.composer
- try:
- return self.loader._composer
- except AttributeError:
- sys.stdout.write('slt {}\n'.format(type(self)))
- sys.stdout.write('slc {}\n'.format(self.loader._composer))
- sys.stdout.write('{}\n'.format(dir(self)))
- raise
- @property
- def resolver(self):
- # type: () -> Any
- if hasattr(self.loader, 'typ'):
- return self.loader.resolver
- return self.loader._resolver
- def check_data(self):
- # type: () -> Any
- # If there are more documents available?
- return self.composer.check_node()
- def get_data(self):
- # type: () -> Any
- # Construct and return the next document.
- if self.composer.check_node():
- return self.construct_document(self.composer.get_node())
- def get_single_data(self):
- # type: () -> Any
- # Ensure that the stream contains a single document and construct it.
- node = self.composer.get_single_node()
- if node is not None:
- return self.construct_document(node)
- return None
- def construct_document(self, node):
- # type: (Any) -> Any
- data = self.construct_object(node)
- while bool(self.state_generators):
- state_generators = self.state_generators
- self.state_generators = []
- for generator in state_generators:
- for _dummy in generator:
- pass
- self.constructed_objects = {}
- self.recursive_objects = {}
- self.deep_construct = False
- return data
- def construct_object(self, node, deep=False):
- # type: (Any, bool) -> Any
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if node in self.constructed_objects:
- return self.constructed_objects[node]
- if deep:
- old_deep = self.deep_construct
- self.deep_construct = True
- if node in self.recursive_objects:
- return self.recursive_objects[node]
- # raise ConstructorError(
- # None, None, 'found unconstructable recursive node', node.start_mark
- # )
- self.recursive_objects[node] = None
- data = self.construct_non_recursive_object(node)
- self.constructed_objects[node] = data
- del self.recursive_objects[node]
- if deep:
- self.deep_construct = old_deep
- return data
- def construct_non_recursive_object(self, node, tag=None):
- # type: (Any, Optional[str]) -> Any
- constructor = None # type: Any
- tag_suffix = None
- if tag is None:
- tag = node.tag
- if tag in self.yaml_constructors:
- constructor = self.yaml_constructors[tag]
- else:
- for tag_prefix in self.yaml_multi_constructors:
- if tag.startswith(tag_prefix):
- tag_suffix = tag[len(tag_prefix) :]
- constructor = self.yaml_multi_constructors[tag_prefix]
- break
- else:
- if None in self.yaml_multi_constructors:
- tag_suffix = tag
- constructor = self.yaml_multi_constructors[None]
- elif None in self.yaml_constructors:
- constructor = self.yaml_constructors[None]
- elif isinstance(node, ScalarNode):
- constructor = self.__class__.construct_scalar
- elif isinstance(node, SequenceNode):
- constructor = self.__class__.construct_sequence
- elif isinstance(node, MappingNode):
- constructor = self.__class__.construct_mapping
- if tag_suffix is None:
- data = constructor(self, node)
- else:
- data = constructor(self, tag_suffix, node)
- if isinstance(data, types.GeneratorType):
- generator = data
- data = next(generator)
- if self.deep_construct:
- for _dummy in generator:
- pass
- else:
- self.state_generators.append(generator)
- return data
- def construct_scalar(self, node):
- # type: (Any) -> Any
- if not isinstance(node, ScalarNode):
- raise ConstructorError(
- None, None, 'expected a scalar node, but found %s' % node.id, node.start_mark
- )
- return node.value
- def construct_sequence(self, node, deep=False):
- # type: (Any, bool) -> Any
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- None, None, 'expected a sequence node, but found %s' % node.id, node.start_mark
- )
- return [self.construct_object(child, deep=deep) for child in node.value]
- def construct_mapping(self, node, deep=False):
- # type: (Any, bool) -> Any
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
- )
- total_mapping = self.yaml_base_dict_type()
- if getattr(node, 'merge', None) is not None:
- todo = [(node.merge, False), (node.value, False)]
- else:
- todo = [(node.value, True)]
- for values, check in todo:
- mapping = self.yaml_base_dict_type() # type: Dict[Any, Any]
- for key_node, value_node in values:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, list):
- key = tuple(key)
- if PY2:
- try:
- hash(key)
- except TypeError as exc:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unacceptable key (%s)' % exc,
- key_node.start_mark,
- )
- else:
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- value = self.construct_object(value_node, deep=deep)
- if check:
- if self.check_mapping_key(node, key_node, mapping, key, value):
- mapping[key] = value
- else:
- mapping[key] = value
- total_mapping.update(mapping)
- return total_mapping
- def check_mapping_key(self, node, key_node, mapping, key, value):
- # type: (Any, Any, Any, Any, Any) -> bool
- """return True if key is unique"""
- if key in mapping:
- if not self.allow_duplicate_keys:
- mk = mapping.get(key)
- if PY2:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
- if isinstance(value, unicode):
- value = value.encode('utf-8')
- if isinstance(mk, unicode):
- mk = mk.encode('utf-8')
- args = [
- 'while constructing a mapping',
- node.start_mark,
- 'found duplicate key "{}" with value "{}" '
- '(original value: "{}")'.format(key, value, mk),
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
- else:
- raise DuplicateKeyError(*args)
- return False
- return True
- def check_set_key(self, node, key_node, setting, key):
- # type: (Any, Any, Any, Any, Any) -> None
- if key in setting:
- if not self.allow_duplicate_keys:
- if PY2:
- if isinstance(key, unicode):
- key = key.encode('utf-8')
- args = [
- 'while constructing a set',
- node.start_mark,
- 'found duplicate key "{}"'.format(key),
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
- else:
- raise DuplicateKeyError(*args)
- def construct_pairs(self, node, deep=False):
- # type: (Any, bool) -> Any
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
- )
- pairs = []
- for key_node, value_node in node.value:
- key = self.construct_object(key_node, deep=deep)
- value = self.construct_object(value_node, deep=deep)
- pairs.append((key, value))
- return pairs
- @classmethod
- def add_constructor(cls, tag, constructor):
- # type: (Any, Any) -> None
- if 'yaml_constructors' not in cls.__dict__:
- cls.yaml_constructors = cls.yaml_constructors.copy()
- cls.yaml_constructors[tag] = constructor
- @classmethod
- def add_multi_constructor(cls, tag_prefix, multi_constructor):
- # type: (Any, Any) -> None
- if 'yaml_multi_constructors' not in cls.__dict__:
- cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy()
- cls.yaml_multi_constructors[tag_prefix] = multi_constructor
- class SafeConstructor(BaseConstructor):
- def construct_scalar(self, node):
- # type: (Any) -> Any
- if isinstance(node, MappingNode):
- for key_node, value_node in node.value:
- if key_node.tag == u'tag:yaml.org,2002:value':
- return self.construct_scalar(value_node)
- return BaseConstructor.construct_scalar(self, node)
- def flatten_mapping(self, node):
- # type: (Any) -> Any
- """
- This implements the merge key feature http://yaml.org/type/merge.html
- by inserting keys from the merge dict/list of dicts if not yet
- available in this node
- """
- merge = [] # type: List[Any]
- index = 0
- while index < len(node.value):
- key_node, value_node = node.value[index]
- if key_node.tag == u'tag:yaml.org,2002:merge':
- if merge: # double << key
- if self.allow_duplicate_keys:
- del node.value[index]
- index += 1
- continue
- args = [
- 'while constructing a mapping',
- node.start_mark,
- 'found duplicate key "{}"'.format(key_node.value),
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
- else:
- raise DuplicateKeyError(*args)
- del node.value[index]
- if isinstance(value_node, MappingNode):
- self.flatten_mapping(value_node)
- merge.extend(value_node.value)
- elif isinstance(value_node, SequenceNode):
- submerge = []
- for subnode in value_node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping for merging, but found %s' % subnode.id,
- subnode.start_mark,
- )
- self.flatten_mapping(subnode)
- submerge.append(subnode.value)
- submerge.reverse()
- for value in submerge:
- merge.extend(value)
- else:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping or list of mappings for merging, '
- 'but found %s' % value_node.id,
- value_node.start_mark,
- )
- elif key_node.tag == u'tag:yaml.org,2002:value':
- key_node.tag = u'tag:yaml.org,2002:str'
- index += 1
- else:
- index += 1
- if bool(merge):
- node.merge = merge # separate merge keys to be able to update without duplicate
- node.value = merge + node.value
- def construct_mapping(self, node, deep=False):
- # type: (Any, bool) -> Any
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if isinstance(node, MappingNode):
- self.flatten_mapping(node)
- return BaseConstructor.construct_mapping(self, node, deep=deep)
- def construct_yaml_null(self, node):
- # type: (Any) -> Any
- self.construct_scalar(node)
- return None
- # YAML 1.2 spec doesn't mention yes/no etc any more, 1.1 does
- bool_values = {
- u'yes': True,
- u'no': False,
- u'y': True,
- u'n': False,
- u'true': True,
- u'false': False,
- u'on': True,
- u'off': False,
- }
- def construct_yaml_bool(self, node):
- # type: (Any) -> bool
- value = self.construct_scalar(node)
- return self.bool_values[value.lower()]
- def construct_yaml_int(self, node):
- # type: (Any) -> int
- value_s = to_str(self.construct_scalar(node))
- value_s = value_s.replace('_', "")
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '0':
- return 0
- elif value_s.startswith('0b'):
- return sign * int(value_s[2:], 2)
- elif value_s.startswith('0x'):
- return sign * int(value_s[2:], 16)
- elif value_s.startswith('0o'):
- return sign * int(value_s[2:], 8)
- elif self.resolver.processing_version == (1, 1) and value_s[0] == '0':
- return sign * int(value_s, 8)
- elif self.resolver.processing_version == (1, 1) and ':' in value_s:
- digits = [int(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- else:
- return sign * int(value_s)
- inf_value = 1e300
- while inf_value != inf_value * inf_value:
- inf_value *= inf_value
- nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99).
- def construct_yaml_float(self, node):
- # type: (Any) -> float
- value_so = to_str(self.construct_scalar(node))
- value_s = value_so.replace('_', "").lower()
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '.inf':
- return sign * self.inf_value
- elif value_s == '.nan':
- return self.nan_value
- elif self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [float(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0.0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- else:
- if self.resolver.processing_version != (1, 2) and 'e' in value_s:
- # value_s is lower case independent of input
- mantissa, exponent = value_s.split('e')
- if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
- return sign * float(value_s)
- if PY3:
- def construct_yaml_binary(self, node):
- # type: (Any) -> Any
- try:
- value = self.construct_scalar(node).encode('ascii')
- except UnicodeEncodeError as exc:
- raise ConstructorError(
- None,
- None,
- 'failed to convert base64 data into ascii: %s' % exc,
- node.start_mark,
- )
- try:
- if hasattr(base64, 'decodebytes'):
- return base64.decodebytes(value)
- else:
- return base64.decodestring(value)
- except binascii.Error as exc:
- raise ConstructorError(
- None, None, 'failed to decode base64 data: %s' % exc, node.start_mark
- )
- else:
- def construct_yaml_binary(self, node):
- # type: (Any) -> Any
- value = self.construct_scalar(node)
- try:
- return to_str(value).decode('base64')
- except (binascii.Error, UnicodeEncodeError) as exc:
- raise ConstructorError(
- None, None, 'failed to decode base64 data: %s' % exc, node.start_mark
- )
- timestamp_regexp = RegExp(
- u"""^(?P<year>[0-9][0-9][0-9][0-9])
- -(?P<month>[0-9][0-9]?)
- -(?P<day>[0-9][0-9]?)
- (?:((?P<t>[Tt])|[ \\t]+) # explictly not retaining extra spaces
- (?P<hour>[0-9][0-9]?)
- :(?P<minute>[0-9][0-9])
- :(?P<second>[0-9][0-9])
- (?:\\.(?P<fraction>[0-9]*))?
- (?:[ \\t]*(?P<tz>Z|(?P<tz_sign>[-+])(?P<tz_hour>[0-9][0-9]?)
- (?::(?P<tz_minute>[0-9][0-9]))?))?)?$""",
- re.X,
- )
- def construct_yaml_timestamp(self, node, values=None):
- # type: (Any, Any) -> Any
- if values is None:
- try:
- match = self.timestamp_regexp.match(node.value)
- except TypeError:
- match = None
- if match is None:
- raise ConstructorError(
- None,
- None,
- 'failed to construct timestamp from "{}"'.format(node.value),
- node.start_mark,
- )
- values = match.groupdict()
- year = int(values['year'])
- month = int(values['month'])
- day = int(values['day'])
- if not values['hour']:
- return datetime.date(year, month, day)
- hour = int(values['hour'])
- minute = int(values['minute'])
- second = int(values['second'])
- fraction = 0
- if values['fraction']:
- fraction_s = values['fraction'][:6]
- while len(fraction_s) < 6:
- fraction_s += '0'
- fraction = int(fraction_s)
- if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4:
- fraction += 1
- delta = None
- if values['tz_sign']:
- tz_hour = int(values['tz_hour'])
- minutes = values['tz_minute']
- tz_minute = int(minutes) if minutes else 0
- delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute)
- if values['tz_sign'] == '-':
- delta = -delta
- # should do something else instead (or hook this up to the preceding if statement
- # in reverse
- # if delta is None:
- # return datetime.datetime(year, month, day, hour, minute, second, fraction)
- # return datetime.datetime(year, month, day, hour, minute, second, fraction,
- # datetime.timezone.utc)
- # the above is not good enough though, should provide tzinfo. In Python3 that is easily
- # doable drop that kind of support for Python2 as it has not native tzinfo
- data = datetime.datetime(year, month, day, hour, minute, second, fraction)
- if delta:
- data -= delta
- return data
- def construct_yaml_omap(self, node):
- # type: (Any) -> Any
- # Note: we do now check for duplicate keys
- omap = ordereddict()
- yield omap
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a sequence, but found %s' % node.id,
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a mapping of length 1, but found %s' % subnode.id,
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a single mapping item, but found %d items' % len(subnode.value),
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- assert key not in omap
- value = self.construct_object(value_node)
- omap[key] = value
- def construct_yaml_pairs(self, node):
- # type: (Any) -> Any
- # Note: the same code as `construct_yaml_omap`.
- pairs = [] # type: List[Any]
- yield pairs
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- 'expected a sequence, but found %s' % node.id,
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- 'expected a mapping of length 1, but found %s' % subnode.id,
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- 'expected a single mapping item, but found %d items' % len(subnode.value),
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- value = self.construct_object(value_node)
- pairs.append((key, value))
- def construct_yaml_set(self, node):
- # type: (Any) -> Any
- data = set() # type: Set[Any]
- yield data
- value = self.construct_mapping(node)
- data.update(value)
- def construct_yaml_str(self, node):
- # type: (Any) -> Any
- value = self.construct_scalar(node)
- if PY3:
- return value
- try:
- return value.encode('ascii')
- except UnicodeEncodeError:
- return value
- def construct_yaml_seq(self, node):
- # type: (Any) -> Any
- data = self.yaml_base_list_type() # type: List[Any]
- yield data
- data.extend(self.construct_sequence(node))
- def construct_yaml_map(self, node):
- # type: (Any) -> Any
- data = self.yaml_base_dict_type() # type: Dict[Any, Any]
- yield data
- value = self.construct_mapping(node)
- data.update(value)
- def construct_yaml_object(self, node, cls):
- # type: (Any, Any) -> Any
- data = cls.__new__(cls)
- yield data
- if hasattr(data, '__setstate__'):
- state = self.construct_mapping(node, deep=True)
- data.__setstate__(state)
- else:
- state = self.construct_mapping(node)
- data.__dict__.update(state)
- def construct_undefined(self, node):
- # type: (Any) -> None
- raise ConstructorError(
- None,
- None,
- 'could not determine a constructor for the tag %r' % utf8(node.tag),
- node.start_mark,
- )
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:null', SafeConstructor.construct_yaml_null)
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:bool', SafeConstructor.construct_yaml_bool)
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:int', SafeConstructor.construct_yaml_int)
- SafeConstructor.add_constructor(
- u'tag:yaml.org,2002:float', SafeConstructor.construct_yaml_float
- )
- SafeConstructor.add_constructor(
- u'tag:yaml.org,2002:binary', SafeConstructor.construct_yaml_binary
- )
- SafeConstructor.add_constructor(
- u'tag:yaml.org,2002:timestamp', SafeConstructor.construct_yaml_timestamp
- )
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:omap', SafeConstructor.construct_yaml_omap)
- SafeConstructor.add_constructor(
- u'tag:yaml.org,2002:pairs', SafeConstructor.construct_yaml_pairs
- )
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:set', SafeConstructor.construct_yaml_set)
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:str', SafeConstructor.construct_yaml_str)
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:seq', SafeConstructor.construct_yaml_seq)
- SafeConstructor.add_constructor(u'tag:yaml.org,2002:map', SafeConstructor.construct_yaml_map)
- SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined)
- if PY2:
- class classobj:
- pass
- class Constructor(SafeConstructor):
- def construct_python_str(self, node):
- # type: (Any) -> Any
- return utf8(self.construct_scalar(node))
- def construct_python_unicode(self, node):
- # type: (Any) -> Any
- return self.construct_scalar(node)
- if PY3:
- def construct_python_bytes(self, node):
- # type: (Any) -> Any
- try:
- value = self.construct_scalar(node).encode('ascii')
- except UnicodeEncodeError as exc:
- raise ConstructorError(
- None,
- None,
- 'failed to convert base64 data into ascii: %s' % exc,
- node.start_mark,
- )
- try:
- if hasattr(base64, 'decodebytes'):
- return base64.decodebytes(value)
- else:
- return base64.decodestring(value)
- except binascii.Error as exc:
- raise ConstructorError(
- None, None, 'failed to decode base64 data: %s' % exc, node.start_mark
- )
- def construct_python_long(self, node):
- # type: (Any) -> int
- val = self.construct_yaml_int(node)
- if PY3:
- return val
- return int(val)
- def construct_python_complex(self, node):
- # type: (Any) -> Any
- return complex(self.construct_scalar(node))
- def construct_python_tuple(self, node):
- # type: (Any) -> Any
- return tuple(self.construct_sequence(node))
- def find_python_module(self, name, mark):
- # type: (Any, Any) -> Any
- if not name:
- raise ConstructorError(
- 'while constructing a Python module',
- mark,
- 'expected non-empty name appended to the tag',
- mark,
- )
- try:
- __import__(name)
- except ImportError as exc:
- raise ConstructorError(
- 'while constructing a Python module',
- mark,
- 'cannot find module %r (%s)' % (utf8(name), exc),
- mark,
- )
- return sys.modules[name]
- def find_python_name(self, name, mark):
- # type: (Any, Any) -> Any
- if not name:
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- 'expected non-empty name appended to the tag',
- mark,
- )
- if u'.' in name:
- lname = name.split('.')
- lmodule_name = lname
- lobject_name = [] # type: List[Any]
- while len(lmodule_name) > 1:
- lobject_name.insert(0, lmodule_name.pop())
- module_name = '.'.join(lmodule_name)
- try:
- __import__(module_name)
- # object_name = '.'.join(object_name)
- break
- except ImportError:
- continue
- else:
- module_name = builtins_module
- lobject_name = [name]
- try:
- __import__(module_name)
- except ImportError as exc:
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- 'cannot find module %r (%s)' % (utf8(module_name), exc),
- mark,
- )
- module = sys.modules[module_name]
- object_name = '.'.join(lobject_name)
- obj = module
- while lobject_name:
- if not hasattr(obj, lobject_name[0]):
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- 'cannot find %r in the module %r' % (utf8(object_name), module.__name__),
- mark,
- )
- obj = getattr(obj, lobject_name.pop(0))
- return obj
- def construct_python_name(self, suffix, node):
- # type: (Any, Any) -> Any
- value = self.construct_scalar(node)
- if value:
- raise ConstructorError(
- 'while constructing a Python name',
- node.start_mark,
- 'expected the empty value, but found %r' % utf8(value),
- node.start_mark,
- )
- return self.find_python_name(suffix, node.start_mark)
- def construct_python_module(self, suffix, node):
- # type: (Any, Any) -> Any
- value = self.construct_scalar(node)
- if value:
- raise ConstructorError(
- 'while constructing a Python module',
- node.start_mark,
- 'expected the empty value, but found %r' % utf8(value),
- node.start_mark,
- )
- return self.find_python_module(suffix, node.start_mark)
- def make_python_instance(self, suffix, node, args=None, kwds=None, newobj=False):
- # type: (Any, Any, Any, Any, bool) -> Any
- if not args:
- args = []
- if not kwds:
- kwds = {}
- cls = self.find_python_name(suffix, node.start_mark)
- if PY3:
- if newobj and isinstance(cls, type):
- return cls.__new__(cls, *args, **kwds)
- else:
- return cls(*args, **kwds)
- else:
- if newobj and isinstance(cls, type(classobj)) and not args and not kwds:
- instance = classobj()
- instance.__class__ = cls
- return instance
- elif newobj and isinstance(cls, type):
- return cls.__new__(cls, *args, **kwds)
- else:
- return cls(*args, **kwds)
- def set_python_instance_state(self, instance, state):
- # type: (Any, Any) -> None
- if hasattr(instance, '__setstate__'):
- instance.__setstate__(state)
- else:
- slotstate = {} # type: Dict[Any, Any]
- if isinstance(state, tuple) and len(state) == 2:
- state, slotstate = state
- if hasattr(instance, '__dict__'):
- instance.__dict__.update(state)
- elif state:
- slotstate.update(state)
- for key, value in slotstate.items():
- setattr(instance, key, value)
- def construct_python_object(self, suffix, node):
- # type: (Any, Any) -> Any
- # Format:
- # !!python/object:module.name { ... state ... }
- instance = self.make_python_instance(suffix, node, newobj=True)
- self.recursive_objects[node] = instance
- yield instance
- deep = hasattr(instance, '__setstate__')
- state = self.construct_mapping(node, deep=deep)
- self.set_python_instance_state(instance, state)
- def construct_python_object_apply(self, suffix, node, newobj=False):
- # type: (Any, Any, bool) -> Any
- # Format:
- # !!python/object/apply # (or !!python/object/new)
- # args: [ ... arguments ... ]
- # kwds: { ... keywords ... }
- # state: ... state ...
- # listitems: [ ... listitems ... ]
- # dictitems: { ... dictitems ... }
- # or short format:
- # !!python/object/apply [ ... arguments ... ]
- # The difference between !!python/object/apply and !!python/object/new
- # is how an object is created, check make_python_instance for details.
- if isinstance(node, SequenceNode):
- args = self.construct_sequence(node, deep=True)
- kwds = {} # type: Dict[Any, Any]
- state = {} # type: Dict[Any, Any]
- listitems = [] # type: List[Any]
- dictitems = {} # type: Dict[Any, Any]
- else:
- value = self.construct_mapping(node, deep=True)
- args = value.get('args', [])
- kwds = value.get('kwds', {})
- state = value.get('state', {})
- listitems = value.get('listitems', [])
- dictitems = value.get('dictitems', {})
- instance = self.make_python_instance(suffix, node, args, kwds, newobj)
- if bool(state):
- self.set_python_instance_state(instance, state)
- if bool(listitems):
- instance.extend(listitems)
- if bool(dictitems):
- for key in dictitems:
- instance[key] = dictitems[key]
- return instance
- def construct_python_object_new(self, suffix, node):
- # type: (Any, Any) -> Any
- return self.construct_python_object_apply(suffix, node, newobj=True)
- Constructor.add_constructor(u'tag:yaml.org,2002:python/none', Constructor.construct_yaml_null)
- Constructor.add_constructor(u'tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool)
- Constructor.add_constructor(u'tag:yaml.org,2002:python/str', Constructor.construct_python_str)
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode
- )
- if PY3:
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/bytes', Constructor.construct_python_bytes
- )
- Constructor.add_constructor(u'tag:yaml.org,2002:python/int', Constructor.construct_yaml_int)
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/long', Constructor.construct_python_long
- )
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/float', Constructor.construct_yaml_float
- )
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/complex', Constructor.construct_python_complex
- )
- Constructor.add_constructor(u'tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq)
- Constructor.add_constructor(
- u'tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple
- )
- Constructor.add_constructor(u'tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map)
- Constructor.add_multi_constructor(
- u'tag:yaml.org,2002:python/name:', Constructor.construct_python_name
- )
- Constructor.add_multi_constructor(
- u'tag:yaml.org,2002:python/module:', Constructor.construct_python_module
- )
- Constructor.add_multi_constructor(
- u'tag:yaml.org,2002:python/object:', Constructor.construct_python_object
- )
- Constructor.add_multi_constructor(
- u'tag:yaml.org,2002:python/object/apply:', Constructor.construct_python_object_apply
- )
- Constructor.add_multi_constructor(
- u'tag:yaml.org,2002:python/object/new:', Constructor.construct_python_object_new
- )
- class RoundTripConstructor(SafeConstructor):
- """need to store the comments on the node itself,
- as well as on the items
- """
- def construct_scalar(self, node):
- # type: (Any) -> Any
- if not isinstance(node, ScalarNode):
- raise ConstructorError(
- None, None, 'expected a scalar node, but found %s' % node.id, node.start_mark
- )
- if node.style == '|' and isinstance(node.value, text_type):
- lss = LiteralScalarString(node.value, anchor=node.anchor)
- if node.comment and node.comment[1]:
- lss.comment = node.comment[1][0] # type: ignore
- return lss
- if node.style == '>' and isinstance(node.value, text_type):
- fold_positions = [] # type: List[int]
- idx = -1
- while True:
- idx = node.value.find('\a', idx + 1)
- if idx < 0:
- break
- fold_positions.append(idx - len(fold_positions))
- fss = FoldedScalarString(node.value.replace('\a', ''), anchor=node.anchor)
- if node.comment and node.comment[1]:
- fss.comment = node.comment[1][0] # type: ignore
- if fold_positions:
- fss.fold_pos = fold_positions # type: ignore
- return fss
- elif bool(self._preserve_quotes) and isinstance(node.value, text_type):
- if node.style == "'":
- return SingleQuotedScalarString(node.value, anchor=node.anchor)
- if node.style == '"':
- return DoubleQuotedScalarString(node.value, anchor=node.anchor)
- if node.anchor:
- return PlainScalarString(node.value, anchor=node.anchor)
- return node.value
- def construct_yaml_int(self, node):
- # type: (Any) -> Any
- width = None # type: Any
- value_su = to_str(self.construct_scalar(node))
- try:
- sx = value_su.rstrip('_')
- underscore = [len(sx) - sx.rindex('_') - 1, False, False] # type: Any
- except ValueError:
- underscore = None
- except IndexError:
- underscore = None
- value_s = value_su.replace('_', "")
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '0':
- return 0
- elif value_s.startswith('0b'):
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return BinaryInt(
- sign * int(value_s[2:], 2),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif value_s.startswith('0x'):
- # default to lower-case if no a-fA-F in string
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- hex_fun = HexInt # type: Any
- for ch in value_s[2:]:
- if ch in 'ABCDEF': # first non-digit is capital
- hex_fun = HexCapsInt
- break
- if ch in 'abcdef':
- break
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return hex_fun(
- sign * int(value_s[2:], 16),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif value_s.startswith('0o'):
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return OctalInt(
- sign * int(value_s[2:], 8),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif self.resolver.processing_version != (1, 2) and value_s[0] == '0':
- return sign * int(value_s, 8)
- elif self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [int(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- elif self.resolver.processing_version > (1, 1) and value_s[0] == '0':
- # not an octal, an integer with leading zero(s)
- if underscore is not None:
- # cannot have a leading underscore
- underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
- return ScalarInt(sign * int(value_s), width=len(value_s), underscore=underscore)
- elif underscore:
- # cannot have a leading underscore
- underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
- return ScalarInt(
- sign * int(value_s), width=None, underscore=underscore, anchor=node.anchor
- )
- elif node.anchor:
- return ScalarInt(sign * int(value_s), width=None, anchor=node.anchor)
- else:
- return sign * int(value_s)
- def construct_yaml_float(self, node):
- # type: (Any) -> Any
- def leading_zeros(v):
- # type: (Any) -> int
- lead0 = 0
- idx = 0
- while idx < len(v) and v[idx] in '0.':
- if v[idx] == '0':
- lead0 += 1
- idx += 1
- return lead0
- # underscore = None
- m_sign = False # type: Any
- value_so = to_str(self.construct_scalar(node))
- value_s = value_so.replace('_', "").lower()
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- m_sign = value_s[0]
- value_s = value_s[1:]
- if value_s == '.inf':
- return sign * self.inf_value
- if value_s == '.nan':
- return self.nan_value
- if self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [float(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0.0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- if 'e' in value_s:
- try:
- mantissa, exponent = value_so.split('e')
- exp = 'e'
- except ValueError:
- mantissa, exponent = value_so.split('E')
- exp = 'E'
- if self.resolver.processing_version != (1, 2):
- # value_s is lower case independent of input
- if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so))
- lead0 = leading_zeros(mantissa)
- width = len(mantissa)
- prec = mantissa.find('.')
- if m_sign:
- width -= 1
- e_width = len(exponent)
- e_sign = exponent[0] in '+-'
- # nprint('sf', width, prec, m_sign, exp, e_width, e_sign)
- return ScalarFloat(
- sign * float(value_s),
- width=width,
- prec=prec,
- m_sign=m_sign,
- m_lead0=lead0,
- exp=exp,
- e_width=e_width,
- e_sign=e_sign,
- anchor=node.anchor,
- )
- width = len(value_so)
- prec = value_so.index('.') # you can use index, this would not be float without dot
- lead0 = leading_zeros(value_so)
- return ScalarFloat(
- sign * float(value_s),
- width=width,
- prec=prec,
- m_sign=m_sign,
- m_lead0=lead0,
- anchor=node.anchor,
- )
- def construct_yaml_str(self, node):
- # type: (Any) -> Any
- value = self.construct_scalar(node)
- if isinstance(value, ScalarString):
- return value
- if PY3:
- return value
- try:
- return value.encode('ascii')
- except AttributeError:
- # in case you replace the node dynamically e.g. with a dict
- return value
- except UnicodeEncodeError:
- return value
- def construct_rt_sequence(self, node, seqtyp, deep=False):
- # type: (Any, Any, bool) -> Any
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- None, None, 'expected a sequence node, but found %s' % node.id, node.start_mark
- )
- ret_val = []
- if node.comment:
- seqtyp._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- seqtyp.yaml_end_comment_extend(node.comment[2], clear=True)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- seqtyp.yaml_set_anchor(node.anchor)
- for idx, child in enumerate(node.value):
- if child.comment:
- seqtyp._yaml_add_comment(child.comment, key=idx)
- child.comment = None # if moved to sequence remove from child
- ret_val.append(self.construct_object(child, deep=deep))
- seqtyp._yaml_set_idx_line_col(
- idx, [child.start_mark.line, child.start_mark.column]
- )
- return ret_val
- def flatten_mapping(self, node):
- # type: (Any) -> Any
- """
- This implements the merge key feature http://yaml.org/type/merge.html
- by inserting keys from the merge dict/list of dicts if not yet
- available in this node
- """
- def constructed(value_node):
- # type: (Any) -> Any
- # If the contents of a merge are defined within the
- # merge marker, then they won't have been constructed
- # yet. But if they were already constructed, we need to use
- # the existing object.
- if value_node in self.constructed_objects:
- value = self.constructed_objects[value_node]
- else:
- value = self.construct_object(value_node, deep=False)
- return value
- # merge = []
- merge_map_list = [] # type: List[Any]
- index = 0
- while index < len(node.value):
- key_node, value_node = node.value[index]
- if key_node.tag == u'tag:yaml.org,2002:merge':
- if merge_map_list: # double << key
- if self.allow_duplicate_keys:
- del node.value[index]
- index += 1
- continue
- args = [
- 'while constructing a mapping',
- node.start_mark,
- 'found duplicate key "{}"'.format(key_node.value),
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args))
- else:
- raise DuplicateKeyError(*args)
- del node.value[index]
- if isinstance(value_node, MappingNode):
- merge_map_list.append((index, constructed(value_node)))
- # self.flatten_mapping(value_node)
- # merge.extend(value_node.value)
- elif isinstance(value_node, SequenceNode):
- # submerge = []
- for subnode in value_node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping for merging, but found %s' % subnode.id,
- subnode.start_mark,
- )
- merge_map_list.append((index, constructed(subnode)))
- # self.flatten_mapping(subnode)
- # submerge.append(subnode.value)
- # submerge.reverse()
- # for value in submerge:
- # merge.extend(value)
- else:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping or list of mappings for merging, '
- 'but found %s' % value_node.id,
- value_node.start_mark,
- )
- elif key_node.tag == u'tag:yaml.org,2002:value':
- key_node.tag = u'tag:yaml.org,2002:str'
- index += 1
- else:
- index += 1
- return merge_map_list
- # if merge:
- # node.value = merge + node.value
- def _sentinel(self):
- # type: () -> None
- pass
- def construct_mapping(self, node, maptyp, deep=False): # type: ignore
- # type: (Any, Any, bool) -> Any
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
- )
- merge_map = self.flatten_mapping(node)
- # mapping = {}
- if node.comment:
- maptyp._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- maptyp.yaml_end_comment_extend(node.comment[2], clear=True)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- maptyp.yaml_set_anchor(node.anchor)
- last_key, last_value = None, self._sentinel
- for key_node, value_node in node.value:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, MutableSequence):
- key_s = CommentedKeySeq(key)
- if key_node.flow_style is True:
- key_s.fa.set_flow_style()
- elif key_node.flow_style is False:
- key_s.fa.set_block_style()
- key = key_s
- elif isinstance(key, MutableMapping):
- key_m = CommentedKeyMap(key)
- if key_node.flow_style is True:
- key_m.fa.set_flow_style()
- elif key_node.flow_style is False:
- key_m.fa.set_block_style()
- key = key_m
- if PY2:
- try:
- hash(key)
- except TypeError as exc:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unacceptable key (%s)' % exc,
- key_node.start_mark,
- )
- else:
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- value = self.construct_object(value_node, deep=deep)
- if self.check_mapping_key(node, key_node, maptyp, key, value):
- if key_node.comment and len(key_node.comment) > 4 and key_node.comment[4]:
- if last_value is None:
- key_node.comment[0] = key_node.comment.pop(4)
- maptyp._yaml_add_comment(key_node.comment, value=last_key)
- else:
- key_node.comment[2] = key_node.comment.pop(4)
- maptyp._yaml_add_comment(key_node.comment, key=key)
- key_node.comment = None
- if key_node.comment:
- maptyp._yaml_add_comment(key_node.comment, key=key)
- if value_node.comment:
- maptyp._yaml_add_comment(value_node.comment, value=key)
- maptyp._yaml_set_kv_line_col(
- key,
- [
- key_node.start_mark.line,
- key_node.start_mark.column,
- value_node.start_mark.line,
- value_node.start_mark.column,
- ],
- )
- maptyp[key] = value
- last_key, last_value = key, value # could use indexing
- # do this last, or <<: before a key will prevent insertion in instances
- # of collections.OrderedDict (as they have no __contains__
- if merge_map:
- maptyp.add_yaml_merge(merge_map)
- def construct_setting(self, node, typ, deep=False):
- # type: (Any, Any, bool) -> Any
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, 'expected a mapping node, but found %s' % node.id, node.start_mark
- )
- if node.comment:
- typ._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- typ.yaml_end_comment_extend(node.comment[2], clear=True)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- typ.yaml_set_anchor(node.anchor)
- for key_node, value_node in node.value:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, list):
- key = tuple(key)
- if PY2:
- try:
- hash(key)
- except TypeError as exc:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unacceptable key (%s)' % exc,
- key_node.start_mark,
- )
- else:
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- # construct but should be null
- value = self.construct_object(value_node, deep=deep) # NOQA
- self.check_set_key(node, key_node, typ, key)
- if key_node.comment:
- typ._yaml_add_comment(key_node.comment, key=key)
- if value_node.comment:
- typ._yaml_add_comment(value_node.comment, value=key)
- typ.add(key)
- def construct_yaml_seq(self, node):
- # type: (Any) -> Any
- data = CommentedSeq()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.comment:
- data._yaml_add_comment(node.comment)
- yield data
- data.extend(self.construct_rt_sequence(node, data))
- self.set_collection_style(data, node)
- def construct_yaml_map(self, node):
- # type: (Any) -> Any
- data = CommentedMap()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- yield data
- self.construct_mapping(node, data, deep=True)
- self.set_collection_style(data, node)
- def set_collection_style(self, data, node):
- # type: (Any, Any) -> None
- if len(data) == 0:
- return
- if node.flow_style is True:
- data.fa.set_flow_style()
- elif node.flow_style is False:
- data.fa.set_block_style()
- def construct_yaml_object(self, node, cls):
- # type: (Any, Any) -> Any
- data = cls.__new__(cls)
- yield data
- if hasattr(data, '__setstate__'):
- state = SafeConstructor.construct_mapping(self, node, deep=True)
- data.__setstate__(state)
- else:
- state = SafeConstructor.construct_mapping(self, node)
- data.__dict__.update(state)
- def construct_yaml_omap(self, node):
- # type: (Any) -> Any
- # Note: we do now check for duplicate keys
- omap = CommentedOrderedMap()
- omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- omap.fa.set_flow_style()
- elif node.flow_style is False:
- omap.fa.set_block_style()
- yield omap
- if node.comment:
- omap._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- omap.yaml_end_comment_extend(node.comment[2], clear=True)
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a sequence, but found %s' % node.id,
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a mapping of length 1, but found %s' % subnode.id,
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- 'expected a single mapping item, but found %d items' % len(subnode.value),
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- assert key not in omap
- value = self.construct_object(value_node)
- if key_node.comment:
- omap._yaml_add_comment(key_node.comment, key=key)
- if subnode.comment:
- omap._yaml_add_comment(subnode.comment, key=key)
- if value_node.comment:
- omap._yaml_add_comment(value_node.comment, value=key)
- omap[key] = value
- def construct_yaml_set(self, node):
- # type: (Any) -> Any
- data = CommentedSet()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- yield data
- self.construct_setting(node, data)
- def construct_undefined(self, node):
- # type: (Any) -> Any
- try:
- if isinstance(node, MappingNode):
- data = CommentedMap()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- data.fa.set_flow_style()
- elif node.flow_style is False:
- data.fa.set_block_style()
- data.yaml_set_tag(node.tag)
- yield data
- if node.anchor:
- data.yaml_set_anchor(node.anchor)
- self.construct_mapping(node, data)
- return
- elif isinstance(node, ScalarNode):
- data2 = TaggedScalar()
- data2.value = self.construct_scalar(node)
- data2.style = node.style
- data2.yaml_set_tag(node.tag)
- yield data2
- if node.anchor:
- data2.yaml_set_anchor(node.anchor, always_dump=True)
- return
- elif isinstance(node, SequenceNode):
- data3 = CommentedSeq()
- data3._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- data3.fa.set_flow_style()
- elif node.flow_style is False:
- data3.fa.set_block_style()
- data3.yaml_set_tag(node.tag)
- yield data3
- if node.anchor:
- data3.yaml_set_anchor(node.anchor)
- data3.extend(self.construct_sequence(node))
- return
- except: # NOQA
- pass
- raise ConstructorError(
- None,
- None,
- 'could not determine a constructor for the tag %r' % utf8(node.tag),
- node.start_mark,
- )
- def construct_yaml_timestamp(self, node, values=None):
- # type: (Any, Any) -> Any
- try:
- match = self.timestamp_regexp.match(node.value)
- except TypeError:
- match = None
- if match is None:
- raise ConstructorError(
- None,
- None,
- 'failed to construct timestamp from "{}"'.format(node.value),
- node.start_mark,
- )
- values = match.groupdict()
- if not values['hour']:
- return SafeConstructor.construct_yaml_timestamp(self, node, values)
- for part in ['t', 'tz_sign', 'tz_hour', 'tz_minute']:
- if values[part]:
- break
- else:
- return SafeConstructor.construct_yaml_timestamp(self, node, values)
- year = int(values['year'])
- month = int(values['month'])
- day = int(values['day'])
- hour = int(values['hour'])
- minute = int(values['minute'])
- second = int(values['second'])
- fraction = 0
- if values['fraction']:
- fraction_s = values['fraction'][:6]
- while len(fraction_s) < 6:
- fraction_s += '0'
- fraction = int(fraction_s)
- if len(values['fraction']) > 6 and int(values['fraction'][6]) > 4:
- fraction += 1
- delta = None
- if values['tz_sign']:
- tz_hour = int(values['tz_hour'])
- minutes = values['tz_minute']
- tz_minute = int(minutes) if minutes else 0
- delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute)
- if values['tz_sign'] == '-':
- delta = -delta
- # shold check for NOne and solve issue 366 should be tzinfo=delta)
- if delta:
- dt = datetime.datetime(year, month, day, hour, minute)
- dt -= delta
- data = TimeStamp(dt.year, dt.month, dt.day, dt.hour, dt.minute, second, fraction)
- data._yaml['delta'] = delta
- tz = values['tz_sign'] + values['tz_hour']
- if values['tz_minute']:
- tz += ':' + values['tz_minute']
- data._yaml['tz'] = tz
- else:
- data = TimeStamp(year, month, day, hour, minute, second, fraction)
- if values['tz']: # no delta
- data._yaml['tz'] = values['tz']
- if values['t']:
- data._yaml['t'] = True
- return data
- def construct_yaml_bool(self, node):
- # type: (Any) -> Any
- b = SafeConstructor.construct_yaml_bool(self, node)
- if node.anchor:
- return ScalarBoolean(b, anchor=node.anchor)
- return b
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:null', RoundTripConstructor.construct_yaml_null
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:bool', RoundTripConstructor.construct_yaml_bool
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:int', RoundTripConstructor.construct_yaml_int
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:float', RoundTripConstructor.construct_yaml_float
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:binary', RoundTripConstructor.construct_yaml_binary
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:timestamp', RoundTripConstructor.construct_yaml_timestamp
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:omap', RoundTripConstructor.construct_yaml_omap
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:pairs', RoundTripConstructor.construct_yaml_pairs
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:set', RoundTripConstructor.construct_yaml_set
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:str', RoundTripConstructor.construct_yaml_str
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:seq', RoundTripConstructor.construct_yaml_seq
- )
- RoundTripConstructor.add_constructor(
- u'tag:yaml.org,2002:map', RoundTripConstructor.construct_yaml_map
- )
- RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_undefined)
|