1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723 |
- # coding: utf-8
- import datetime
- import base64
- import binascii
- import sys
- import types
- import warnings
- from collections.abc import Hashable, MutableSequence, MutableMapping
- # fmt: off
- from ruamel.yaml.error import (MarkedYAMLError, MarkedYAMLFutureWarning,
- MantissaNoDotYAML1_1Warning)
- from ruamel.yaml.nodes import * # NOQA
- from ruamel.yaml.nodes import (SequenceNode, MappingNode, ScalarNode)
- from ruamel.yaml.compat import (builtins_module, # NOQA
- nprint, nprintf, version_tnf)
- from ruamel.yaml.compat import ordereddict
- from ruamel.yaml.tag import Tag
- from ruamel.yaml.comments import * # NOQA
- from ruamel.yaml.comments import (CommentedMap, CommentedOrderedMap, CommentedSet,
- CommentedKeySeq, CommentedSeq, TaggedScalar,
- CommentedKeyMap,
- C_KEY_PRE, C_KEY_EOL, C_KEY_POST,
- C_VALUE_PRE, C_VALUE_EOL, C_VALUE_POST,
- )
- from ruamel.yaml.scalarstring import (SingleQuotedScalarString, DoubleQuotedScalarString,
- LiteralScalarString, FoldedScalarString,
- PlainScalarString, ScalarString)
- from ruamel.yaml.scalarint import ScalarInt, BinaryInt, OctalInt, HexInt, HexCapsInt
- from ruamel.yaml.scalarfloat import ScalarFloat
- from ruamel.yaml.scalarbool import ScalarBoolean
- from ruamel.yaml.timestamp import TimeStamp
- from ruamel.yaml.util import timestamp_regexp, create_timestamp
- from typing import Any, Dict, List, Set, Iterator, Union, Optional # NOQA
- __all__ = ['BaseConstructor', 'SafeConstructor', 'Constructor',
- 'ConstructorError', 'RoundTripConstructor']
- # fmt: on
- class ConstructorError(MarkedYAMLError):
- pass
- class DuplicateKeyFutureWarning(MarkedYAMLFutureWarning):
- pass
- class DuplicateKeyError(MarkedYAMLError):
- pass
- class BaseConstructor:
- yaml_constructors = {} # type: Dict[Any, Any]
- yaml_multi_constructors = {} # type: Dict[Any, Any]
- def __init__(self, preserve_quotes: Optional[bool] = None, loader: Any = None) -> None:
- self.loader = loader
- if self.loader is not None and getattr(self.loader, '_constructor', None) is None:
- self.loader._constructor = self
- self.loader = loader
- self.yaml_base_dict_type = dict
- self.yaml_base_list_type = list
- self.constructed_objects: Dict[Any, Any] = {}
- self.recursive_objects: Dict[Any, Any] = {}
- self.state_generators: List[Any] = []
- self.deep_construct = False
- self._preserve_quotes = preserve_quotes
- self.allow_duplicate_keys = version_tnf((0, 15, 1), (0, 16))
- @property
- def composer(self) -> Any:
- if hasattr(self.loader, 'typ'):
- return self.loader.composer
- try:
- return self.loader._composer
- except AttributeError:
- sys.stdout.write(f'slt {type(self)}\n')
- sys.stdout.write(f'slc {self.loader._composer}\n')
- sys.stdout.write(f'{dir(self)}\n')
- raise
- @property
- def resolver(self) -> Any:
- if hasattr(self.loader, 'typ'):
- return self.loader.resolver
- return self.loader._resolver
- @property
- def scanner(self) -> Any:
- # needed to get to the expanded comments
- if hasattr(self.loader, 'typ'):
- return self.loader.scanner
- return self.loader._scanner
- def check_data(self) -> Any:
- # If there are more documents available?
- return self.composer.check_node()
- def get_data(self) -> Any:
- # Construct and return the next document.
- if self.composer.check_node():
- return self.construct_document(self.composer.get_node())
- def get_single_data(self) -> Any:
- # Ensure that the stream contains a single document and construct it.
- node = self.composer.get_single_node()
- if node is not None:
- return self.construct_document(node)
- return None
- def construct_document(self, node: Any) -> Any:
- data = self.construct_object(node)
- while bool(self.state_generators):
- state_generators = self.state_generators
- self.state_generators = []
- for generator in state_generators:
- for _dummy in generator:
- pass
- self.constructed_objects = {}
- self.recursive_objects = {}
- self.deep_construct = False
- return data
- def construct_object(self, node: Any, deep: bool = False) -> Any:
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if node in self.constructed_objects:
- return self.constructed_objects[node]
- if deep:
- old_deep = self.deep_construct
- self.deep_construct = True
- if node in self.recursive_objects:
- return self.recursive_objects[node]
- # raise ConstructorError(
- # None, None, 'found unconstructable recursive node', node.start_mark
- # )
- self.recursive_objects[node] = None
- data = self.construct_non_recursive_object(node)
- self.constructed_objects[node] = data
- del self.recursive_objects[node]
- if deep:
- self.deep_construct = old_deep
- return data
- def construct_non_recursive_object(self, node: Any, tag: Optional[str] = None) -> Any:
- constructor: Any = None
- tag_suffix = None
- if tag is None:
- tag = node.tag
- if tag in self.yaml_constructors:
- constructor = self.yaml_constructors[tag]
- else:
- for tag_prefix in self.yaml_multi_constructors:
- if tag.startswith(tag_prefix):
- tag_suffix = tag[len(tag_prefix) :]
- constructor = self.yaml_multi_constructors[tag_prefix]
- break
- else:
- if None in self.yaml_multi_constructors:
- tag_suffix = tag
- constructor = self.yaml_multi_constructors[None]
- elif None in self.yaml_constructors:
- constructor = self.yaml_constructors[None]
- elif isinstance(node, ScalarNode):
- constructor = self.__class__.construct_scalar
- elif isinstance(node, SequenceNode):
- constructor = self.__class__.construct_sequence
- elif isinstance(node, MappingNode):
- constructor = self.__class__.construct_mapping
- if tag_suffix is None:
- data = constructor(self, node)
- else:
- data = constructor(self, tag_suffix, node)
- if isinstance(data, types.GeneratorType):
- generator = data
- data = next(generator)
- if self.deep_construct:
- for _dummy in generator:
- pass
- else:
- self.state_generators.append(generator)
- return data
- def construct_scalar(self, node: Any) -> Any:
- if not isinstance(node, ScalarNode):
- raise ConstructorError(
- None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark,
- )
- return node.value
- def construct_sequence(self, node: Any, deep: bool = False) -> Any:
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- None,
- None,
- f'expected a sequence node, but found {node.id!s}',
- node.start_mark,
- )
- return [self.construct_object(child, deep=deep) for child in node.value]
- def construct_mapping(self, node: Any, deep: bool = False) -> Any:
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
- )
- total_mapping = self.yaml_base_dict_type()
- if getattr(node, 'merge', None) is not None:
- todo = [(node.merge, False), (node.value, False)]
- else:
- todo = [(node.value, True)]
- for values, check in todo:
- mapping: Dict[Any, Any] = self.yaml_base_dict_type()
- for key_node, value_node in values:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, list):
- key = tuple(key)
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- value = self.construct_object(value_node, deep=deep)
- if check:
- if self.check_mapping_key(node, key_node, mapping, key, value):
- mapping[key] = value
- else:
- mapping[key] = value
- total_mapping.update(mapping)
- return total_mapping
- def check_mapping_key(
- self, node: Any, key_node: Any, mapping: Any, key: Any, value: Any,
- ) -> bool:
- """return True if key is unique"""
- if key in mapping:
- if not self.allow_duplicate_keys:
- mk = mapping.get(key)
- args = [
- 'while constructing a mapping',
- node.start_mark,
- f'found duplicate key "{key}" with value "{value}" '
- f'(original value: "{mk}")',
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
- else:
- raise DuplicateKeyError(*args)
- return False
- return True
- def check_set_key(self: Any, node: Any, key_node: Any, setting: Any, key: Any) -> None:
- if key in setting:
- if not self.allow_duplicate_keys:
- args = [
- 'while constructing a set',
- node.start_mark,
- f'found duplicate key "{key}"',
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
- else:
- raise DuplicateKeyError(*args)
- def construct_pairs(self, node: Any, deep: bool = False) -> Any:
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
- )
- pairs = []
- for key_node, value_node in node.value:
- key = self.construct_object(key_node, deep=deep)
- value = self.construct_object(value_node, deep=deep)
- pairs.append((key, value))
- return pairs
- # ToDo: putting stuff on the class makes it global, consider making this to work on an
- # instance variable once function load is dropped.
- @classmethod
- def add_constructor(cls, tag: Any, constructor: Any) -> Any:
- if isinstance(tag, Tag):
- tag = str(tag)
- if 'yaml_constructors' not in cls.__dict__:
- cls.yaml_constructors = cls.yaml_constructors.copy()
- ret_val = cls.yaml_constructors.get(tag, None)
- cls.yaml_constructors[tag] = constructor
- return ret_val
- @classmethod
- def add_multi_constructor(cls, tag_prefix: Any, multi_constructor: Any) -> None:
- if 'yaml_multi_constructors' not in cls.__dict__:
- cls.yaml_multi_constructors = cls.yaml_multi_constructors.copy()
- cls.yaml_multi_constructors[tag_prefix] = multi_constructor
- @classmethod
- def add_default_constructor(
- cls, tag: str, method: Any = None, tag_base: str = 'tag:yaml.org,2002:',
- ) -> None:
- if not tag.startswith('tag:'):
- if method is None:
- method = 'construct_yaml_' + tag
- tag = tag_base + tag
- cls.add_constructor(tag, getattr(cls, method))
- class SafeConstructor(BaseConstructor):
- def construct_scalar(self, node: Any) -> Any:
- if isinstance(node, MappingNode):
- for key_node, value_node in node.value:
- if key_node.tag == 'tag:yaml.org,2002:value':
- return self.construct_scalar(value_node)
- return BaseConstructor.construct_scalar(self, node)
- def flatten_mapping(self, node: Any) -> Any:
- """
- This implements the merge key feature http://yaml.org/type/merge.html
- by inserting keys from the merge dict/list of dicts if not yet
- available in this node
- """
- merge: List[Any] = []
- index = 0
- while index < len(node.value):
- key_node, value_node = node.value[index]
- if key_node.tag == 'tag:yaml.org,2002:merge':
- if merge: # double << key
- if self.allow_duplicate_keys:
- del node.value[index]
- index += 1
- continue
- args = [
- 'while constructing a mapping',
- node.start_mark,
- f'found duplicate key "{key_node.value}"',
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
- else:
- raise DuplicateKeyError(*args)
- del node.value[index]
- if isinstance(value_node, MappingNode):
- self.flatten_mapping(value_node)
- merge.extend(value_node.value)
- elif isinstance(value_node, SequenceNode):
- submerge = []
- for subnode in value_node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- f'expected a mapping for merging, but found {subnode.id!s}',
- subnode.start_mark,
- )
- self.flatten_mapping(subnode)
- submerge.append(subnode.value)
- submerge.reverse()
- for value in submerge:
- merge.extend(value)
- else:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping or list of mappings for merging, '
- f'but found {value_node.id!s}',
- value_node.start_mark,
- )
- elif key_node.tag == 'tag:yaml.org,2002:value':
- key_node.tag = 'tag:yaml.org,2002:str'
- index += 1
- else:
- index += 1
- if bool(merge):
- node.merge = merge # separate merge keys to be able to update without duplicate
- node.value = merge + node.value
- def construct_mapping(self, node: Any, deep: bool = False) -> Any:
- """deep is True when creating an object/mapping recursively,
- in that case want the underlying elements available during construction
- """
- if isinstance(node, MappingNode):
- self.flatten_mapping(node)
- return BaseConstructor.construct_mapping(self, node, deep=deep)
- def construct_yaml_null(self, node: Any) -> Any:
- self.construct_scalar(node)
- return None
- # YAML 1.2 spec doesn't mention yes/no etc any more, 1.1 does
- bool_values = {
- 'yes': True,
- 'no': False,
- 'y': True,
- 'n': False,
- 'true': True,
- 'false': False,
- 'on': True,
- 'off': False,
- }
- def construct_yaml_bool(self, node: Any) -> bool:
- value = self.construct_scalar(node)
- return self.bool_values[value.lower()]
- def construct_yaml_int(self, node: Any) -> int:
- value_s = self.construct_scalar(node)
- value_s = value_s.replace('_', "")
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '0':
- return 0
- elif value_s.startswith('0b'):
- return sign * int(value_s[2:], 2)
- elif value_s.startswith('0x'):
- return sign * int(value_s[2:], 16)
- elif value_s.startswith('0o'):
- return sign * int(value_s[2:], 8)
- elif self.resolver.processing_version == (1, 1) and value_s[0] == '0':
- return sign * int(value_s, 8)
- elif self.resolver.processing_version == (1, 1) and ':' in value_s:
- digits = [int(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- else:
- return sign * int(value_s)
- inf_value = 1e300
- while inf_value != inf_value * inf_value:
- inf_value *= inf_value
- nan_value = -inf_value / inf_value # Trying to make a quiet NaN (like C99).
- def construct_yaml_float(self, node: Any) -> float:
- value_so = self.construct_scalar(node)
- value_s = value_so.replace('_', "").lower()
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '.inf':
- return sign * self.inf_value
- elif value_s == '.nan':
- return self.nan_value
- elif self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [float(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0.0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- else:
- if self.resolver.processing_version != (1, 2) and 'e' in value_s:
- # value_s is lower case independent of input
- mantissa, exponent = value_s.split('e')
- if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1)
- return sign * float(value_s)
- def construct_yaml_binary(self, node: Any) -> Any:
- try:
- value = self.construct_scalar(node).encode('ascii')
- except UnicodeEncodeError as exc:
- raise ConstructorError(
- None,
- None,
- f'failed to convert base64 data into ascii: {exc!s}',
- node.start_mark,
- )
- try:
- return base64.decodebytes(value)
- except binascii.Error as exc:
- raise ConstructorError(
- None, None, f'failed to decode base64 data: {exc!s}', node.start_mark,
- )
- timestamp_regexp = timestamp_regexp # moved to util 0.17.17
- def construct_yaml_timestamp(self, node: Any, values: Any = None) -> Any:
- if values is None:
- try:
- match = self.timestamp_regexp.match(node.value)
- except TypeError:
- match = None
- if match is None:
- raise ConstructorError(
- None,
- None,
- f'failed to construct timestamp from "{node.value}"',
- node.start_mark,
- )
- values = match.groupdict()
- return create_timestamp(**values)
- def construct_yaml_omap(self, node: Any) -> Any:
- # Note: we do now check for duplicate keys
- omap = ordereddict()
- yield omap
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a sequence, but found {node.id!s}',
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a mapping of length 1, but found {subnode.id!s}',
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a single mapping item, but found {len(subnode.value):d} items',
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- assert key not in omap
- value = self.construct_object(value_node)
- omap[key] = value
- def construct_yaml_pairs(self, node: Any) -> Any:
- # Note: the same code as `construct_yaml_omap`.
- pairs: List[Any] = []
- yield pairs
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- f'expected a sequence, but found {node.id!s}',
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- f'expected a mapping of length 1, but found {subnode.id!s}',
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing pairs',
- node.start_mark,
- f'expected a single mapping item, but found {len(subnode.value):d} items',
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- value = self.construct_object(value_node)
- pairs.append((key, value))
- def construct_yaml_set(self, node: Any) -> Any:
- data: Set[Any] = set()
- yield data
- value = self.construct_mapping(node)
- data.update(value)
- def construct_yaml_str(self, node: Any) -> Any:
- value = self.construct_scalar(node)
- return value
- def construct_yaml_seq(self, node: Any) -> Any:
- data: List[Any] = self.yaml_base_list_type()
- yield data
- data.extend(self.construct_sequence(node))
- def construct_yaml_map(self, node: Any) -> Any:
- data: Dict[Any, Any] = self.yaml_base_dict_type()
- yield data
- value = self.construct_mapping(node)
- data.update(value)
- def construct_yaml_object(self, node: Any, cls: Any) -> Any:
- data = cls.__new__(cls)
- yield data
- if hasattr(data, '__setstate__'):
- state = self.construct_mapping(node, deep=True)
- data.__setstate__(state)
- else:
- state = self.construct_mapping(node)
- data.__dict__.update(state)
- def construct_undefined(self, node: Any) -> None:
- raise ConstructorError(
- None,
- None,
- f'could not determine a constructor for the tag {node.tag!r}',
- node.start_mark,
- )
- for tag in 'null bool int float binary timestamp omap pairs set str seq map'.split():
- SafeConstructor.add_default_constructor(tag)
- SafeConstructor.add_constructor(None, SafeConstructor.construct_undefined)
- class Constructor(SafeConstructor):
- def construct_python_str(self, node: Any) -> Any:
- return self.construct_scalar(node)
- def construct_python_unicode(self, node: Any) -> Any:
- return self.construct_scalar(node)
- def construct_python_bytes(self, node: Any) -> Any:
- try:
- value = self.construct_scalar(node).encode('ascii')
- except UnicodeEncodeError as exc:
- raise ConstructorError(
- None,
- None,
- f'failed to convert base64 data into ascii: {exc!s}',
- node.start_mark,
- )
- try:
- return base64.decodebytes(value)
- except binascii.Error as exc:
- raise ConstructorError(
- None, None, f'failed to decode base64 data: {exc!s}', node.start_mark,
- )
- def construct_python_long(self, node: Any) -> int:
- val = self.construct_yaml_int(node)
- return val
- def construct_python_complex(self, node: Any) -> Any:
- return complex(self.construct_scalar(node))
- def construct_python_tuple(self, node: Any) -> Any:
- return tuple(self.construct_sequence(node))
- def find_python_module(self, name: Any, mark: Any) -> Any:
- if not name:
- raise ConstructorError(
- 'while constructing a Python module',
- mark,
- 'expected non-empty name appended to the tag',
- mark,
- )
- try:
- __import__(name)
- except ImportError as exc:
- raise ConstructorError(
- 'while constructing a Python module',
- mark,
- f'cannot find module {name!r} ({exc!s})',
- mark,
- )
- return sys.modules[name]
- def find_python_name(self, name: Any, mark: Any) -> Any:
- if not name:
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- 'expected non-empty name appended to the tag',
- mark,
- )
- if '.' in name:
- lname = name.split('.')
- lmodule_name = lname
- lobject_name: List[Any] = []
- while len(lmodule_name) > 1:
- lobject_name.insert(0, lmodule_name.pop())
- module_name = '.'.join(lmodule_name)
- try:
- __import__(module_name)
- # object_name = '.'.join(object_name)
- break
- except ImportError:
- continue
- else:
- module_name = builtins_module
- lobject_name = [name]
- try:
- __import__(module_name)
- except ImportError as exc:
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- f'cannot find module {module_name!r} ({exc!s})',
- mark,
- )
- module = sys.modules[module_name]
- object_name = '.'.join(lobject_name)
- obj = module
- while lobject_name:
- if not hasattr(obj, lobject_name[0]):
- raise ConstructorError(
- 'while constructing a Python object',
- mark,
- f'cannot find {object_name!r} in the module {module.__name__!r}',
- mark,
- )
- obj = getattr(obj, lobject_name.pop(0))
- return obj
- def construct_python_name(self, suffix: Any, node: Any) -> Any:
- value = self.construct_scalar(node)
- if value:
- raise ConstructorError(
- 'while constructing a Python name',
- node.start_mark,
- f'expected the empty value, but found {value!r}',
- node.start_mark,
- )
- return self.find_python_name(suffix, node.start_mark)
- def construct_python_module(self, suffix: Any, node: Any) -> Any:
- value = self.construct_scalar(node)
- if value:
- raise ConstructorError(
- 'while constructing a Python module',
- node.start_mark,
- f'expected the empty value, but found {value!r}',
- node.start_mark,
- )
- return self.find_python_module(suffix, node.start_mark)
- def make_python_instance(
- self, suffix: Any, node: Any, args: Any = None, kwds: Any = None, newobj: bool = False,
- ) -> Any:
- if not args:
- args = []
- if not kwds:
- kwds = {}
- cls = self.find_python_name(suffix, node.start_mark)
- if newobj and isinstance(cls, type):
- return cls.__new__(cls, *args, **kwds)
- else:
- return cls(*args, **kwds)
- def set_python_instance_state(self, instance: Any, state: Any) -> None:
- if hasattr(instance, '__setstate__'):
- instance.__setstate__(state)
- else:
- slotstate: Dict[Any, Any] = {}
- if isinstance(state, tuple) and len(state) == 2:
- state, slotstate = state
- if hasattr(instance, '__dict__'):
- instance.__dict__.update(state)
- elif state:
- slotstate.update(state)
- for key, value in slotstate.items():
- setattr(instance, key, value)
- def construct_python_object(self, suffix: Any, node: Any) -> Any:
- # Format:
- # !!python/object:module.name { ... state ... }
- instance = self.make_python_instance(suffix, node, newobj=True)
- self.recursive_objects[node] = instance
- yield instance
- deep = hasattr(instance, '__setstate__')
- state = self.construct_mapping(node, deep=deep)
- self.set_python_instance_state(instance, state)
- def construct_python_object_apply(
- self, suffix: Any, node: Any, newobj: bool = False,
- ) -> Any:
- # Format:
- # !!python/object/apply # (or !!python/object/new)
- # args: [ ... arguments ... ]
- # kwds: { ... keywords ... }
- # state: ... state ...
- # listitems: [ ... listitems ... ]
- # dictitems: { ... dictitems ... }
- # or short format:
- # !!python/object/apply [ ... arguments ... ]
- # The difference between !!python/object/apply and !!python/object/new
- # is how an object is created, check make_python_instance for details.
- if isinstance(node, SequenceNode):
- args = self.construct_sequence(node, deep=True)
- kwds: Dict[Any, Any] = {}
- state: Dict[Any, Any] = {}
- listitems: List[Any] = []
- dictitems: Dict[Any, Any] = {}
- else:
- value = self.construct_mapping(node, deep=True)
- args = value.get('args', [])
- kwds = value.get('kwds', {})
- state = value.get('state', {})
- listitems = value.get('listitems', [])
- dictitems = value.get('dictitems', {})
- instance = self.make_python_instance(suffix, node, args, kwds, newobj)
- if bool(state):
- self.set_python_instance_state(instance, state)
- if bool(listitems):
- instance.extend(listitems)
- if bool(dictitems):
- for key in dictitems:
- instance[key] = dictitems[key]
- return instance
- def construct_python_object_new(self, suffix: Any, node: Any) -> Any:
- return self.construct_python_object_apply(suffix, node, newobj=True)
- @classmethod
- def add_default_constructor(
- cls, tag: str, method: Any = None, tag_base: str = 'tag:yaml.org,2002:python/',
- ) -> None:
- if not tag.startswith('tag:'):
- if method is None:
- method = 'construct_yaml_' + tag
- tag = tag_base + tag
- cls.add_constructor(tag, getattr(cls, method))
- Constructor.add_constructor('tag:yaml.org,2002:python/none', Constructor.construct_yaml_null)
- Constructor.add_constructor('tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool)
- Constructor.add_constructor('tag:yaml.org,2002:python/str', Constructor.construct_python_str)
- Constructor.add_constructor(
- 'tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode,
- )
- Constructor.add_constructor(
- 'tag:yaml.org,2002:python/bytes', Constructor.construct_python_bytes,
- )
- Constructor.add_constructor('tag:yaml.org,2002:python/int', Constructor.construct_yaml_int)
- Constructor.add_constructor('tag:yaml.org,2002:python/long', Constructor.construct_python_long)
- Constructor.add_constructor('tag:yaml.org,2002:python/float', Constructor.construct_yaml_float)
- Constructor.add_constructor(
- 'tag:yaml.org,2002:python/complex', Constructor.construct_python_complex,
- )
- Constructor.add_constructor('tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq)
- Constructor.add_constructor(
- 'tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple,
- )
- # for tag in 'bool str unicode bytes int long float complex tuple'.split():
- # Constructor.add_default_constructor(tag)
- Constructor.add_constructor('tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map)
- Constructor.add_multi_constructor(
- 'tag:yaml.org,2002:python/name:', Constructor.construct_python_name,
- )
- Constructor.add_multi_constructor(
- 'tag:yaml.org,2002:python/module:', Constructor.construct_python_module,
- )
- Constructor.add_multi_constructor(
- 'tag:yaml.org,2002:python/object:', Constructor.construct_python_object,
- )
- Constructor.add_multi_constructor(
- 'tag:yaml.org,2002:python/object/apply:', Constructor.construct_python_object_apply,
- )
- Constructor.add_multi_constructor(
- 'tag:yaml.org,2002:python/object/new:', Constructor.construct_python_object_new,
- )
- class RoundTripConstructor(SafeConstructor):
- """need to store the comments on the node itself,
- as well as on the items
- """
- def comment(self, idx: Any) -> Any:
- assert self.loader.comment_handling is not None
- x = self.scanner.comments[idx]
- x.set_assigned()
- return x
- def comments(self, list_of_comments: Any, idx: Optional[Any] = None) -> Any:
- # hand in the comment and optional pre, eol, post segment
- if list_of_comments is None:
- return []
- if idx is not None:
- if list_of_comments[idx] is None:
- return []
- list_of_comments = list_of_comments[idx]
- for x in list_of_comments:
- yield self.comment(x)
- def construct_scalar(self, node: Any) -> Any:
- if not isinstance(node, ScalarNode):
- raise ConstructorError(
- None, None, f'expected a scalar node, but found {node.id!s}', node.start_mark,
- )
- if node.style == '|' and isinstance(node.value, str):
- lss = LiteralScalarString(node.value, anchor=node.anchor)
- if self.loader and self.loader.comment_handling is None:
- if node.comment and node.comment[1]:
- lss.comment = node.comment[1][0] # type: ignore
- else:
- # NEWCMNT
- if node.comment is not None and node.comment[1]:
- # nprintf('>>>>nc1', node.comment)
- # EOL comment after |
- lss.comment = self.comment(node.comment[1][0]) # type: ignore
- return lss
- if node.style == '>' and isinstance(node.value, str):
- fold_positions: List[int] = []
- idx = -1
- while True:
- idx = node.value.find('\a', idx + 1)
- if idx < 0:
- break
- fold_positions.append(idx - len(fold_positions))
- fss = FoldedScalarString(node.value.replace('\a', ''), anchor=node.anchor)
- if self.loader and self.loader.comment_handling is None:
- if node.comment and node.comment[1]:
- fss.comment = node.comment[1][0] # type: ignore
- else:
- # NEWCMNT
- if node.comment is not None and node.comment[1]:
- # nprintf('>>>>nc2', node.comment)
- # EOL comment after >
- fss.comment = self.comment(node.comment[1][0]) # type: ignore
- if fold_positions:
- fss.fold_pos = fold_positions # type: ignore
- return fss
- elif bool(self._preserve_quotes) and isinstance(node.value, str):
- if node.style == "'":
- return SingleQuotedScalarString(node.value, anchor=node.anchor)
- if node.style == '"':
- return DoubleQuotedScalarString(node.value, anchor=node.anchor)
- # if node.ctag:
- # data2 = TaggedScalar()
- # data2.value = node.value
- # data2.style = node.style
- # data2.yaml_set_ctag(node.ctag)
- # if node.anchor:
- # from ruamel.yaml.serializer import templated_id
- # if not templated_id(node.anchor):
- # data2.yaml_set_anchor(node.anchor, always_dump=True)
- # return data2
- if node.anchor:
- return PlainScalarString(node.value, anchor=node.anchor)
- return node.value
- def construct_yaml_int(self, node: Any) -> Any:
- width: Any = None
- value_su = self.construct_scalar(node)
- try:
- sx = value_su.rstrip('_')
- underscore: Any = [len(sx) - sx.rindex('_') - 1, False, False]
- except ValueError:
- underscore = None
- except IndexError:
- underscore = None
- value_s = value_su.replace('_', "")
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- value_s = value_s[1:]
- if value_s == '0':
- return 0
- elif value_s.startswith('0b'):
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return BinaryInt(
- sign * int(value_s[2:], 2),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif value_s.startswith('0x'):
- # default to lower-case if no a-fA-F in string
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- hex_fun: Any = HexInt
- for ch in value_s[2:]:
- if ch in 'ABCDEF': # first non-digit is capital
- hex_fun = HexCapsInt
- break
- if ch in 'abcdef':
- break
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return hex_fun(
- sign * int(value_s[2:], 16),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif value_s.startswith('0o'):
- if self.resolver.processing_version > (1, 1) and value_s[2] == '0':
- width = len(value_s[2:])
- if underscore is not None:
- underscore[1] = value_su[2] == '_'
- underscore[2] = len(value_su[2:]) > 1 and value_su[-1] == '_'
- return OctalInt(
- sign * int(value_s[2:], 8),
- width=width,
- underscore=underscore,
- anchor=node.anchor,
- )
- elif self.resolver.processing_version != (1, 2) and value_s[0] == '0':
- return OctalInt(
- sign * int(value_s, 8), width=width, underscore=underscore, anchor=node.anchor,
- )
- elif self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [int(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- elif self.resolver.processing_version > (1, 1) and value_s[0] == '0':
- # not an octal, an integer with leading zero(s)
- if underscore is not None:
- # cannot have a leading underscore
- underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
- return ScalarInt(sign * int(value_s), width=len(value_s), underscore=underscore)
- elif underscore:
- # cannot have a leading underscore
- underscore[2] = len(value_su) > 1 and value_su[-1] == '_'
- return ScalarInt(
- sign * int(value_s), width=None, underscore=underscore, anchor=node.anchor,
- )
- elif node.anchor:
- return ScalarInt(sign * int(value_s), width=None, anchor=node.anchor)
- else:
- return sign * int(value_s)
- def construct_yaml_float(self, node: Any) -> Any:
- def leading_zeros(v: Any) -> int:
- lead0 = 0
- idx = 0
- while idx < len(v) and v[idx] in '0.':
- if v[idx] == '0':
- lead0 += 1
- idx += 1
- return lead0
- # underscore = None
- m_sign: Any = False
- value_so = self.construct_scalar(node)
- value_s = value_so.replace('_', "").lower()
- sign = +1
- if value_s[0] == '-':
- sign = -1
- if value_s[0] in '+-':
- m_sign = value_s[0]
- value_s = value_s[1:]
- if value_s == '.inf':
- return sign * self.inf_value
- if value_s == '.nan':
- return self.nan_value
- if self.resolver.processing_version != (1, 2) and ':' in value_s:
- digits = [float(part) for part in value_s.split(':')]
- digits.reverse()
- base = 1
- value = 0.0
- for digit in digits:
- value += digit * base
- base *= 60
- return sign * value
- if 'e' in value_s:
- try:
- mantissa, exponent = value_so.split('e')
- exp = 'e'
- except ValueError:
- mantissa, exponent = value_so.split('E')
- exp = 'E'
- if self.resolver.processing_version != (1, 2):
- # value_s is lower case independent of input
- if '.' not in mantissa:
- warnings.warn(MantissaNoDotYAML1_1Warning(node, value_so), stacklevel=1)
- lead0 = leading_zeros(mantissa)
- width = len(mantissa)
- prec = mantissa.find('.')
- if m_sign:
- width -= 1
- e_width = len(exponent)
- e_sign = exponent[0] in '+-'
- # nprint('sf', width, prec, m_sign, exp, e_width, e_sign)
- return ScalarFloat(
- sign * float(value_s),
- width=width,
- prec=prec,
- m_sign=m_sign,
- m_lead0=lead0,
- exp=exp,
- e_width=e_width,
- e_sign=e_sign,
- anchor=node.anchor,
- )
- width = len(value_so)
- # you can't use index, !!float 42 would be a float without a dot
- prec = value_so.find('.')
- lead0 = leading_zeros(value_so)
- return ScalarFloat(
- sign * float(value_s),
- width=width,
- prec=prec,
- m_sign=m_sign,
- m_lead0=lead0,
- anchor=node.anchor,
- )
- def construct_yaml_str(self, node: Any) -> Any:
- if node.ctag.handle:
- value = self.construct_unknown(node)
- else:
- value = self.construct_scalar(node)
- if isinstance(value, ScalarString):
- return value
- return value
- def construct_rt_sequence(self, node: Any, seqtyp: Any, deep: bool = False) -> Any:
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- None,
- None,
- f'expected a sequence node, but found {node.id!s}',
- node.start_mark,
- )
- ret_val = []
- if self.loader and self.loader.comment_handling is None:
- if node.comment:
- seqtyp._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- # this happens e.g. if you have a sequence element that is a flow-style
- # mapping and that has no EOL comment but a following commentline or
- # empty line
- seqtyp.yaml_end_comment_extend(node.comment[2], clear=True)
- else:
- # NEWCMNT
- if node.comment:
- nprintf('nc3', node.comment)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- seqtyp.yaml_set_anchor(node.anchor)
- for idx, child in enumerate(node.value):
- if child.comment:
- seqtyp._yaml_add_comment(child.comment, key=idx)
- child.comment = None # if moved to sequence remove from child
- ret_val.append(self.construct_object(child, deep=deep))
- seqtyp._yaml_set_idx_line_col(
- idx, [child.start_mark.line, child.start_mark.column],
- )
- return ret_val
- def flatten_mapping(self, node: Any) -> Any:
- """
- This implements the merge key feature http://yaml.org/type/merge.html
- by inserting keys from the merge dict/list of dicts if not yet
- available in this node
- """
- def constructed(value_node: Any) -> Any:
- # If the contents of a merge are defined within the
- # merge marker, then they won't have been constructed
- # yet. But if they were already constructed, we need to use
- # the existing object.
- if value_node in self.constructed_objects:
- value = self.constructed_objects[value_node]
- else:
- value = self.construct_object(value_node, deep=True)
- return value
- # merge = []
- merge_map_list: List[Any] = []
- index = 0
- while index < len(node.value):
- key_node, value_node = node.value[index]
- if key_node.tag == 'tag:yaml.org,2002:merge':
- if merge_map_list: # double << key
- if self.allow_duplicate_keys:
- del node.value[index]
- index += 1
- continue
- args = [
- 'while constructing a mapping',
- node.start_mark,
- f'found duplicate key "{key_node.value}"',
- key_node.start_mark,
- """
- To suppress this check see:
- http://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
- """,
- """\
- Duplicate keys will become an error in future releases, and are errors
- by default when using the new API.
- """,
- ]
- if self.allow_duplicate_keys is None:
- warnings.warn(DuplicateKeyFutureWarning(*args), stacklevel=1)
- else:
- raise DuplicateKeyError(*args)
- del node.value[index]
- if isinstance(value_node, MappingNode):
- merge_map_list.append((index, constructed(value_node)))
- # self.flatten_mapping(value_node)
- # merge.extend(value_node.value)
- elif isinstance(value_node, SequenceNode):
- # submerge = []
- for subnode in value_node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- f'expected a mapping for merging, but found {subnode.id!s}',
- subnode.start_mark,
- )
- merge_map_list.append((index, constructed(subnode)))
- # self.flatten_mapping(subnode)
- # submerge.append(subnode.value)
- # submerge.reverse()
- # for value in submerge:
- # merge.extend(value)
- else:
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'expected a mapping or list of mappings for merging, '
- f'but found {value_node.id!s}',
- value_node.start_mark,
- )
- elif key_node.tag == 'tag:yaml.org,2002:value':
- key_node.tag = 'tag:yaml.org,2002:str'
- index += 1
- else:
- index += 1
- return merge_map_list
- # if merge:
- # node.value = merge + node.value
- def _sentinel(self) -> None:
- pass
- def construct_mapping(self, node: Any, maptyp: Any, deep: bool = False) -> Any: # type: ignore # NOQA
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
- )
- merge_map = self.flatten_mapping(node)
- # mapping = {}
- if self.loader and self.loader.comment_handling is None:
- if node.comment:
- maptyp._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- maptyp.yaml_end_comment_extend(node.comment[2], clear=True)
- else:
- # NEWCMNT
- if node.comment:
- # nprintf('nc4', node.comment, node.start_mark)
- if maptyp.ca.pre is None:
- maptyp.ca.pre = []
- for cmnt in self.comments(node.comment, 0):
- maptyp.ca.pre.append(cmnt)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- maptyp.yaml_set_anchor(node.anchor)
- last_key, last_value = None, self._sentinel
- for key_node, value_node in node.value:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, MutableSequence):
- key_s = CommentedKeySeq(key)
- if key_node.flow_style is True:
- key_s.fa.set_flow_style()
- elif key_node.flow_style is False:
- key_s.fa.set_block_style()
- key_s._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore
- key = key_s
- elif isinstance(key, MutableMapping):
- key_m = CommentedKeyMap(key)
- if key_node.flow_style is True:
- key_m.fa.set_flow_style()
- elif key_node.flow_style is False:
- key_m.fa.set_block_style()
- key_m._yaml_set_line_col(key.lc.line, key.lc.col) # type: ignore
- key = key_m
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- value = self.construct_object(value_node, deep=deep)
- if self.check_mapping_key(node, key_node, maptyp, key, value):
- if self.loader and self.loader.comment_handling is None:
- if key_node.comment and len(key_node.comment) > 4 and key_node.comment[4]:
- if last_value is None:
- key_node.comment[0] = key_node.comment.pop(4)
- maptyp._yaml_add_comment(key_node.comment, value=last_key)
- else:
- key_node.comment[2] = key_node.comment.pop(4)
- maptyp._yaml_add_comment(key_node.comment, key=key)
- key_node.comment = None
- if key_node.comment:
- maptyp._yaml_add_comment(key_node.comment, key=key)
- if value_node.comment:
- maptyp._yaml_add_comment(value_node.comment, value=key)
- else:
- # NEWCMNT
- if key_node.comment:
- nprintf('nc5a', key, key_node.comment)
- if key_node.comment[0]:
- maptyp.ca.set(key, C_KEY_PRE, key_node.comment[0])
- if key_node.comment[1]:
- maptyp.ca.set(key, C_KEY_EOL, key_node.comment[1])
- if key_node.comment[2]:
- maptyp.ca.set(key, C_KEY_POST, key_node.comment[2])
- if value_node.comment:
- nprintf('nc5b', key, value_node.comment)
- if value_node.comment[0]:
- maptyp.ca.set(key, C_VALUE_PRE, value_node.comment[0])
- if value_node.comment[1]:
- maptyp.ca.set(key, C_VALUE_EOL, value_node.comment[1])
- if value_node.comment[2]:
- maptyp.ca.set(key, C_VALUE_POST, value_node.comment[2])
- maptyp._yaml_set_kv_line_col(
- key,
- [
- key_node.start_mark.line,
- key_node.start_mark.column,
- value_node.start_mark.line,
- value_node.start_mark.column,
- ],
- )
- maptyp[key] = value
- last_key, last_value = key, value # could use indexing
- # do this last, or <<: before a key will prevent insertion in instances
- # of collections.OrderedDict (as they have no __contains__
- if merge_map:
- maptyp.add_yaml_merge(merge_map)
- def construct_setting(self, node: Any, typ: Any, deep: bool = False) -> Any:
- if not isinstance(node, MappingNode):
- raise ConstructorError(
- None, None, f'expected a mapping node, but found {node.id!s}', node.start_mark,
- )
- if self.loader and self.loader.comment_handling is None:
- if node.comment:
- typ._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- typ.yaml_end_comment_extend(node.comment[2], clear=True)
- else:
- # NEWCMNT
- if node.comment:
- nprintf('nc6', node.comment)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- typ.yaml_set_anchor(node.anchor)
- for key_node, value_node in node.value:
- # keys can be list -> deep
- key = self.construct_object(key_node, deep=True)
- # lists are not hashable, but tuples are
- if not isinstance(key, Hashable):
- if isinstance(key, list):
- key = tuple(key)
- if not isinstance(key, Hashable):
- raise ConstructorError(
- 'while constructing a mapping',
- node.start_mark,
- 'found unhashable key',
- key_node.start_mark,
- )
- # construct but should be null
- value = self.construct_object(value_node, deep=deep) # NOQA
- self.check_set_key(node, key_node, typ, key)
- if self.loader and self.loader.comment_handling is None:
- if key_node.comment:
- typ._yaml_add_comment(key_node.comment, key=key)
- if value_node.comment:
- typ._yaml_add_comment(value_node.comment, value=key)
- else:
- # NEWCMNT
- if key_node.comment:
- nprintf('nc7a', key_node.comment)
- if value_node.comment:
- nprintf('nc7b', value_node.comment)
- typ.add(key)
- def construct_yaml_seq(self, node: Any) -> Iterator[CommentedSeq]:
- data = CommentedSeq()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- # if node.comment:
- # data._yaml_add_comment(node.comment)
- yield data
- data.extend(self.construct_rt_sequence(node, data))
- self.set_collection_style(data, node)
- def construct_yaml_map(self, node: Any) -> Iterator[CommentedMap]:
- data = CommentedMap()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- yield data
- self.construct_mapping(node, data, deep=True)
- self.set_collection_style(data, node)
- def set_collection_style(self, data: Any, node: Any) -> None:
- if len(data) == 0:
- return
- if node.flow_style is True:
- data.fa.set_flow_style()
- elif node.flow_style is False:
- data.fa.set_block_style()
- def construct_yaml_object(self, node: Any, cls: Any) -> Any:
- from dataclasses import is_dataclass, InitVar, MISSING
- data = cls.__new__(cls)
- yield data
- if hasattr(data, '__setstate__'):
- state = SafeConstructor.construct_mapping(self, node, deep=True)
- data.__setstate__(state)
- elif is_dataclass(data):
- mapping = SafeConstructor.construct_mapping(self, node)
- init_var_defaults = {}
- for field in data.__dataclass_fields__.values():
- # nprintf('field', field, field.default is MISSING,
- # isinstance(field.type, InitVar))
- # in 3.7, InitVar is a singleton
- if (
- isinstance(field.type, InitVar) or field.type is InitVar
- ) and field.default is not MISSING:
- init_var_defaults[field.name] = field.default
- for attr, value in mapping.items():
- if attr not in init_var_defaults:
- setattr(data, attr, value)
- post_init = getattr(data, '__post_init__', None)
- if post_init is not None:
- kw = {}
- for name, default in init_var_defaults.items():
- kw[name] = mapping.get(name, default)
- post_init(**kw)
- else:
- state = SafeConstructor.construct_mapping(self, node)
- if hasattr(data, '__attrs_attrs__'): # issue 394
- data.__init__(**state)
- else:
- data.__dict__.update(state)
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- from ruamel.yaml.anchor import Anchor
- if not templated_id(node.anchor):
- if not hasattr(data, Anchor.attrib):
- a = Anchor()
- setattr(data, Anchor.attrib, a)
- else:
- a = getattr(data, Anchor.attrib)
- a.value = node.anchor
- def construct_yaml_omap(self, node: Any) -> Iterator[CommentedOrderedMap]:
- # Note: we do now check for duplicate keys
- omap = CommentedOrderedMap()
- omap._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- omap.fa.set_flow_style()
- elif node.flow_style is False:
- omap.fa.set_block_style()
- yield omap
- if self.loader and self.loader.comment_handling is None:
- if node.comment:
- omap._yaml_add_comment(node.comment[:2])
- if len(node.comment) > 2:
- omap.yaml_end_comment_extend(node.comment[2], clear=True)
- else:
- # NEWCMNT
- if node.comment:
- nprintf('nc8', node.comment)
- if not isinstance(node, SequenceNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a sequence, but found {node.id!s}',
- node.start_mark,
- )
- for subnode in node.value:
- if not isinstance(subnode, MappingNode):
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a mapping of length 1, but found {subnode.id!s}',
- subnode.start_mark,
- )
- if len(subnode.value) != 1:
- raise ConstructorError(
- 'while constructing an ordered map',
- node.start_mark,
- f'expected a single mapping item, but found {len(subnode.value):d} items',
- subnode.start_mark,
- )
- key_node, value_node = subnode.value[0]
- key = self.construct_object(key_node)
- assert key not in omap
- value = self.construct_object(value_node)
- if self.loader and self.loader.comment_handling is None:
- if key_node.comment:
- omap._yaml_add_comment(key_node.comment, key=key)
- if subnode.comment:
- omap._yaml_add_comment(subnode.comment, key=key)
- if value_node.comment:
- omap._yaml_add_comment(value_node.comment, value=key)
- else:
- # NEWCMNT
- if key_node.comment:
- nprintf('nc9a', key_node.comment)
- if subnode.comment:
- nprintf('nc9b', subnode.comment)
- if value_node.comment:
- nprintf('nc9c', value_node.comment)
- omap[key] = value
- def construct_yaml_set(self, node: Any) -> Iterator[CommentedSet]:
- data = CommentedSet()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- data.fa.set_flow_style()
- elif node.flow_style is False:
- data.fa.set_block_style()
- yield data
- self.construct_setting(node, data)
- def construct_unknown(
- self, node: Any,
- ) -> Iterator[Union[CommentedMap, TaggedScalar, CommentedSeq]]:
- try:
- if isinstance(node, MappingNode):
- data = CommentedMap()
- data._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- data.fa.set_flow_style()
- elif node.flow_style is False:
- data.fa.set_block_style()
- data.yaml_set_ctag(node.ctag)
- yield data
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- data.yaml_set_anchor(node.anchor)
- self.construct_mapping(node, data)
- return
- elif isinstance(node, ScalarNode):
- data2 = TaggedScalar()
- data2.value = self.construct_scalar(node)
- data2.style = node.style
- data2.yaml_set_ctag(node.ctag)
- yield data2
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- data2.yaml_set_anchor(node.anchor, always_dump=True)
- return
- elif isinstance(node, SequenceNode):
- data3 = CommentedSeq()
- data3._yaml_set_line_col(node.start_mark.line, node.start_mark.column)
- if node.flow_style is True:
- data3.fa.set_flow_style()
- elif node.flow_style is False:
- data3.fa.set_block_style()
- data3.yaml_set_ctag(node.ctag)
- yield data3
- if node.anchor:
- from ruamel.yaml.serializer import templated_id
- if not templated_id(node.anchor):
- data3.yaml_set_anchor(node.anchor)
- data3.extend(self.construct_sequence(node))
- return
- except: # NOQA
- pass
- raise ConstructorError(
- None,
- None,
- f'could not determine a constructor for the tag {node.tag!r}',
- node.start_mark,
- )
- def construct_yaml_timestamp(
- self, node: Any, values: Any = None,
- ) -> Union[datetime.date, datetime.datetime, TimeStamp]:
- try:
- match = self.timestamp_regexp.match(node.value)
- except TypeError:
- match = None
- if match is None:
- raise ConstructorError(
- None,
- None,
- f'failed to construct timestamp from "{node.value}"',
- node.start_mark,
- )
- values = match.groupdict()
- if not values['hour']:
- return create_timestamp(**values)
- # return SafeConstructor.construct_yaml_timestamp(self, node, values)
- for part in ['t', 'tz_sign', 'tz_hour', 'tz_minute']:
- if values[part]:
- break
- else:
- return create_timestamp(**values)
- # return SafeConstructor.construct_yaml_timestamp(self, node, values)
- dd = create_timestamp(**values) # this has delta applied
- delta = None
- if values['tz_sign']:
- tz_hour = int(values['tz_hour'])
- minutes = values['tz_minute']
- tz_minute = int(minutes) if minutes else 0
- delta = datetime.timedelta(hours=tz_hour, minutes=tz_minute)
- if values['tz_sign'] == '-':
- delta = -delta
- # should check for None and solve issue 366 should be tzinfo=delta)
- # isinstance(datetime.datetime.now, datetime.date) is true)
- if isinstance(dd, datetime.datetime):
- data = TimeStamp(
- dd.year, dd.month, dd.day, dd.hour, dd.minute, dd.second, dd.microsecond,
- )
- else:
- # ToDo: make this into a DateStamp?
- data = TimeStamp(dd.year, dd.month, dd.day, 0, 0, 0, 0)
- return data
- if delta:
- data._yaml['delta'] = delta
- tz = values['tz_sign'] + values['tz_hour']
- if values['tz_minute']:
- tz += ':' + values['tz_minute']
- data._yaml['tz'] = tz
- else:
- if values['tz']: # no delta
- data._yaml['tz'] = values['tz']
- if values['t']:
- data._yaml['t'] = True
- return data
- def construct_yaml_sbool(self, node: Any) -> Union[bool, ScalarBoolean]:
- b = SafeConstructor.construct_yaml_bool(self, node)
- if node.anchor:
- return ScalarBoolean(b, anchor=node.anchor)
- return b
- RoundTripConstructor.add_default_constructor('bool', method='construct_yaml_sbool')
- for tag in 'null int float binary timestamp omap pairs set str seq map'.split():
- RoundTripConstructor.add_default_constructor(tag)
- RoundTripConstructor.add_constructor(None, RoundTripConstructor.construct_unknown)
|