12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154 |
- # coding: utf-8
- from __future__ import absolute_import, print_function
- """
- stuff to deal with comments and formatting on dict/list/ordereddict/set
- these are not really related, formatting could be factored out as
- a separate base
- """
- import sys
- import copy
- from ruamel.yaml.compat import ordereddict # type: ignore
- from ruamel.yaml.compat import PY2, string_types, MutableSliceableSequence
- from ruamel.yaml.scalarstring import ScalarString
- from ruamel.yaml.anchor import Anchor
- if PY2:
- from collections import MutableSet, Sized, Set, Mapping
- else:
- from collections.abc import MutableSet, Sized, Set, Mapping
- if False: # MYPY
- from typing import Any, Dict, Optional, List, Union, Optional, Iterator # NOQA
- # fmt: off
- __all__ = ['CommentedSeq', 'CommentedKeySeq',
- 'CommentedMap', 'CommentedOrderedMap',
- 'CommentedSet', 'comment_attrib', 'merge_attrib']
- # fmt: on
- comment_attrib = '_yaml_comment'
- format_attrib = '_yaml_format'
- line_col_attrib = '_yaml_line_col'
- merge_attrib = '_yaml_merge'
- tag_attrib = '_yaml_tag'
- class Comment(object):
- # sys.getsize tested the Comment objects, __slots__ makes them bigger
- # and adding self.end did not matter
- __slots__ = 'comment', '_items', '_end', '_start'
- attrib = comment_attrib
- def __init__(self):
- # type: () -> None
- self.comment = None # [post, [pre]]
- # map key (mapping/omap/dict) or index (sequence/list) to a list of
- # dict: post_key, pre_key, post_value, pre_value
- # list: pre item, post item
- self._items = {} # type: Dict[Any, Any]
- # self._start = [] # should not put these on first item
- self._end = [] # type: List[Any] # end of document comments
- def __str__(self):
- # type: () -> str
- if bool(self._end):
- end = ',\n end=' + str(self._end)
- else:
- end = ""
- return 'Comment(comment={0},\n items={1}{2})'.format(self.comment, self._items, end)
- @property
- def items(self):
- # type: () -> Any
- return self._items
- @property
- def end(self):
- # type: () -> Any
- return self._end
- @end.setter
- def end(self, value):
- # type: (Any) -> None
- self._end = value
- @property
- def start(self):
- # type: () -> Any
- return self._start
- @start.setter
- def start(self, value):
- # type: (Any) -> None
- self._start = value
- # to distinguish key from None
- def NoComment():
- # type: () -> None
- pass
- class Format(object):
- __slots__ = ('_flow_style',)
- attrib = format_attrib
- def __init__(self):
- # type: () -> None
- self._flow_style = None # type: Any
- def set_flow_style(self):
- # type: () -> None
- self._flow_style = True
- def set_block_style(self):
- # type: () -> None
- self._flow_style = False
- def flow_style(self, default=None):
- # type: (Optional[Any]) -> Any
- """if default (the flow_style) is None, the flow style tacked on to
- the object explicitly will be taken. If that is None as well the
- default flow style rules the format down the line, or the type
- of the constituent values (simple -> flow, map/list -> block)"""
- if self._flow_style is None:
- return default
- return self._flow_style
- class LineCol(object):
- attrib = line_col_attrib
- def __init__(self):
- # type: () -> None
- self.line = None
- self.col = None
- self.data = None # type: Optional[Dict[Any, Any]]
- def add_kv_line_col(self, key, data):
- # type: (Any, Any) -> None
- if self.data is None:
- self.data = {}
- self.data[key] = data
- def key(self, k):
- # type: (Any) -> Any
- return self._kv(k, 0, 1)
- def value(self, k):
- # type: (Any) -> Any
- return self._kv(k, 2, 3)
- def _kv(self, k, x0, x1):
- # type: (Any, Any, Any) -> Any
- if self.data is None:
- return None
- data = self.data[k]
- return data[x0], data[x1]
- def item(self, idx):
- # type: (Any) -> Any
- if self.data is None:
- return None
- return self.data[idx][0], self.data[idx][1]
- def add_idx_line_col(self, key, data):
- # type: (Any, Any) -> None
- if self.data is None:
- self.data = {}
- self.data[key] = data
- class Tag(object):
- """store tag information for roundtripping"""
- __slots__ = ('value',)
- attrib = tag_attrib
- def __init__(self):
- # type: () -> None
- self.value = None
- def __repr__(self):
- # type: () -> Any
- return '{0.__class__.__name__}({0.value!r})'.format(self)
- class CommentedBase(object):
- @property
- def ca(self):
- # type: () -> Any
- if not hasattr(self, Comment.attrib):
- setattr(self, Comment.attrib, Comment())
- return getattr(self, Comment.attrib)
- def yaml_end_comment_extend(self, comment, clear=False):
- # type: (Any, bool) -> None
- if comment is None:
- return
- if clear or self.ca.end is None:
- self.ca.end = []
- self.ca.end.extend(comment)
- def yaml_key_comment_extend(self, key, comment, clear=False):
- # type: (Any, Any, bool) -> None
- r = self.ca._items.setdefault(key, [None, None, None, None])
- if clear or r[1] is None:
- if comment[1] is not None:
- assert isinstance(comment[1], list)
- r[1] = comment[1]
- else:
- r[1].extend(comment[0])
- r[0] = comment[0]
- def yaml_value_comment_extend(self, key, comment, clear=False):
- # type: (Any, Any, bool) -> None
- r = self.ca._items.setdefault(key, [None, None, None, None])
- if clear or r[3] is None:
- if comment[1] is not None:
- assert isinstance(comment[1], list)
- r[3] = comment[1]
- else:
- r[3].extend(comment[0])
- r[2] = comment[0]
- def yaml_set_start_comment(self, comment, indent=0):
- # type: (Any, Any) -> None
- """overwrites any preceding comment lines on an object
- expects comment to be without `#` and possible have multiple lines
- """
- from .error import CommentMark
- from .tokens import CommentToken
- pre_comments = self._yaml_get_pre_comment()
- if comment[-1] == '\n':
- comment = comment[:-1] # strip final newline if there
- start_mark = CommentMark(indent)
- for com in comment.split('\n'):
- c = com.strip()
- if len(c) > 0 and c[0] != '#':
- com = '# ' + com
- pre_comments.append(CommentToken(com + '\n', start_mark, None))
- def yaml_set_comment_before_after_key(
- self, key, before=None, indent=0, after=None, after_indent=None
- ):
- # type: (Any, Any, Any, Any, Any) -> None
- """
- expects comment (before/after) to be without `#` and possible have multiple lines
- """
- from ruamel.yaml.error import CommentMark
- from ruamel.yaml.tokens import CommentToken
- def comment_token(s, mark):
- # type: (Any, Any) -> Any
- # handle empty lines as having no comment
- return CommentToken(('# ' if s else "") + s + '\n', mark, None)
- if after_indent is None:
- after_indent = indent + 2
- if before and (len(before) > 1) and before[-1] == '\n':
- before = before[:-1] # strip final newline if there
- if after and after[-1] == '\n':
- after = after[:-1] # strip final newline if there
- start_mark = CommentMark(indent)
- c = self.ca.items.setdefault(key, [None, [], None, None])
- if before == '\n':
- c[1].append(comment_token("", start_mark))
- elif before:
- for com in before.split('\n'):
- c[1].append(comment_token(com, start_mark))
- if after:
- start_mark = CommentMark(after_indent)
- if c[3] is None:
- c[3] = []
- for com in after.split('\n'):
- c[3].append(comment_token(com, start_mark)) # type: ignore
- @property
- def fa(self):
- # type: () -> Any
- """format attribute
- set_flow_style()/set_block_style()"""
- if not hasattr(self, Format.attrib):
- setattr(self, Format.attrib, Format())
- return getattr(self, Format.attrib)
- def yaml_add_eol_comment(self, comment, key=NoComment, column=None):
- # type: (Any, Optional[Any], Optional[Any]) -> None
- """
- there is a problem as eol comments should start with ' #'
- (but at the beginning of the line the space doesn't have to be before
- the #. The column index is for the # mark
- """
- from .tokens import CommentToken
- from .error import CommentMark
- if column is None:
- try:
- column = self._yaml_get_column(key)
- except AttributeError:
- column = 0
- if comment[0] != '#':
- comment = '# ' + comment
- if column is None:
- if comment[0] == '#':
- comment = ' ' + comment
- column = 0
- start_mark = CommentMark(column)
- ct = [CommentToken(comment, start_mark, None), None]
- self._yaml_add_eol_comment(ct, key=key)
- @property
- def lc(self):
- # type: () -> Any
- if not hasattr(self, LineCol.attrib):
- setattr(self, LineCol.attrib, LineCol())
- return getattr(self, LineCol.attrib)
- def _yaml_set_line_col(self, line, col):
- # type: (Any, Any) -> None
- self.lc.line = line
- self.lc.col = col
- def _yaml_set_kv_line_col(self, key, data):
- # type: (Any, Any) -> None
- self.lc.add_kv_line_col(key, data)
- def _yaml_set_idx_line_col(self, key, data):
- # type: (Any, Any) -> None
- self.lc.add_idx_line_col(key, data)
- @property
- def anchor(self):
- # type: () -> Any
- if not hasattr(self, Anchor.attrib):
- setattr(self, Anchor.attrib, Anchor())
- return getattr(self, Anchor.attrib)
- def yaml_anchor(self):
- # type: () -> Any
- if not hasattr(self, Anchor.attrib):
- return None
- return self.anchor
- def yaml_set_anchor(self, value, always_dump=False):
- # type: (Any, bool) -> None
- self.anchor.value = value
- self.anchor.always_dump = always_dump
- @property
- def tag(self):
- # type: () -> Any
- if not hasattr(self, Tag.attrib):
- setattr(self, Tag.attrib, Tag())
- return getattr(self, Tag.attrib)
- def yaml_set_tag(self, value):
- # type: (Any) -> None
- self.tag.value = value
- def copy_attributes(self, t, memo=None):
- # type: (Any, Any) -> None
- # fmt: off
- for a in [Comment.attrib, Format.attrib, LineCol.attrib, Anchor.attrib,
- Tag.attrib, merge_attrib]:
- if hasattr(self, a):
- if memo is not None:
- setattr(t, a, copy.deepcopy(getattr(self, a, memo)))
- else:
- setattr(t, a, getattr(self, a))
- # fmt: on
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- raise NotImplementedError
- def _yaml_get_pre_comment(self):
- # type: () -> Any
- raise NotImplementedError
- def _yaml_get_column(self, key):
- # type: (Any) -> Any
- raise NotImplementedError
- class CommentedSeq(MutableSliceableSequence, list, CommentedBase): # type: ignore
- __slots__ = (Comment.attrib, '_lst')
- def __init__(self, *args, **kw):
- # type: (Any, Any) -> None
- list.__init__(self, *args, **kw)
- def __getsingleitem__(self, idx):
- # type: (Any) -> Any
- return list.__getitem__(self, idx)
- def __setsingleitem__(self, idx, value):
- # type: (Any, Any) -> None
- # try to preserve the scalarstring type if setting an existing key to a new value
- if idx < len(self):
- if (
- isinstance(value, string_types)
- and not isinstance(value, ScalarString)
- and isinstance(self[idx], ScalarString)
- ):
- value = type(self[idx])(value)
- list.__setitem__(self, idx, value)
- def __delsingleitem__(self, idx=None):
- # type: (Any) -> Any
- list.__delitem__(self, idx)
- self.ca.items.pop(idx, None) # might not be there -> default value
- for list_index in sorted(self.ca.items):
- if list_index < idx:
- continue
- self.ca.items[list_index - 1] = self.ca.items.pop(list_index)
- def __len__(self):
- # type: () -> int
- return list.__len__(self)
- def insert(self, idx, val):
- # type: (Any, Any) -> None
- """the comments after the insertion have to move forward"""
- list.insert(self, idx, val)
- for list_index in sorted(self.ca.items, reverse=True):
- if list_index < idx:
- break
- self.ca.items[list_index + 1] = self.ca.items.pop(list_index)
- def extend(self, val):
- # type: (Any) -> None
- list.extend(self, val)
- def __eq__(self, other):
- # type: (Any) -> bool
- return list.__eq__(self, other)
- def _yaml_add_comment(self, comment, key=NoComment):
- # type: (Any, Optional[Any]) -> None
- if key is not NoComment:
- self.yaml_key_comment_extend(key, comment)
- else:
- self.ca.comment = comment
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- self._yaml_add_comment(comment, key=key)
- def _yaml_get_columnX(self, key):
- # type: (Any) -> Any
- return self.ca.items[key][0].start_mark.column
- def _yaml_get_column(self, key):
- # type: (Any) -> Any
- column = None
- sel_idx = None
- pre, post = key - 1, key + 1
- if pre in self.ca.items:
- sel_idx = pre
- elif post in self.ca.items:
- sel_idx = post
- else:
- # self.ca.items is not ordered
- for row_idx, _k1 in enumerate(self):
- if row_idx >= key:
- break
- if row_idx not in self.ca.items:
- continue
- sel_idx = row_idx
- if sel_idx is not None:
- column = self._yaml_get_columnX(sel_idx)
- return column
- def _yaml_get_pre_comment(self):
- # type: () -> Any
- pre_comments = [] # type: List[Any]
- if self.ca.comment is None:
- self.ca.comment = [None, pre_comments]
- else:
- self.ca.comment[1] = pre_comments
- return pre_comments
- def __deepcopy__(self, memo):
- # type: (Any) -> Any
- res = self.__class__()
- memo[id(self)] = res
- for k in self:
- res.append(copy.deepcopy(k, memo))
- self.copy_attributes(res, memo=memo)
- return res
- def __add__(self, other):
- # type: (Any) -> Any
- return list.__add__(self, other)
- def sort(self, key=None, reverse=False): # type: ignore
- # type: (Any, bool) -> None
- if key is None:
- tmp_lst = sorted(zip(self, range(len(self))), reverse=reverse)
- list.__init__(self, [x[0] for x in tmp_lst])
- else:
- tmp_lst = sorted(
- zip(map(key, list.__iter__(self)), range(len(self))), reverse=reverse
- )
- list.__init__(self, [list.__getitem__(self, x[1]) for x in tmp_lst])
- itm = self.ca.items
- self.ca._items = {}
- for idx, x in enumerate(tmp_lst):
- old_index = x[1]
- if old_index in itm:
- self.ca.items[idx] = itm[old_index]
- def __repr__(self):
- # type: () -> Any
- return list.__repr__(self)
- class CommentedKeySeq(tuple, CommentedBase): # type: ignore
- """This primarily exists to be able to roundtrip keys that are sequences"""
- def _yaml_add_comment(self, comment, key=NoComment):
- # type: (Any, Optional[Any]) -> None
- if key is not NoComment:
- self.yaml_key_comment_extend(key, comment)
- else:
- self.ca.comment = comment
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- self._yaml_add_comment(comment, key=key)
- def _yaml_get_columnX(self, key):
- # type: (Any) -> Any
- return self.ca.items[key][0].start_mark.column
- def _yaml_get_column(self, key):
- # type: (Any) -> Any
- column = None
- sel_idx = None
- pre, post = key - 1, key + 1
- if pre in self.ca.items:
- sel_idx = pre
- elif post in self.ca.items:
- sel_idx = post
- else:
- # self.ca.items is not ordered
- for row_idx, _k1 in enumerate(self):
- if row_idx >= key:
- break
- if row_idx not in self.ca.items:
- continue
- sel_idx = row_idx
- if sel_idx is not None:
- column = self._yaml_get_columnX(sel_idx)
- return column
- def _yaml_get_pre_comment(self):
- # type: () -> Any
- pre_comments = [] # type: List[Any]
- if self.ca.comment is None:
- self.ca.comment = [None, pre_comments]
- else:
- self.ca.comment[1] = pre_comments
- return pre_comments
- class CommentedMapView(Sized):
- __slots__ = ('_mapping',)
- def __init__(self, mapping):
- # type: (Any) -> None
- self._mapping = mapping
- def __len__(self):
- # type: () -> int
- count = len(self._mapping)
- return count
- class CommentedMapKeysView(CommentedMapView, Set): # type: ignore
- __slots__ = ()
- @classmethod
- def _from_iterable(self, it):
- # type: (Any) -> Any
- return set(it)
- def __contains__(self, key):
- # type: (Any) -> Any
- return key in self._mapping
- def __iter__(self):
- # type: () -> Any # yield from self._mapping # not in py27, pypy
- # for x in self._mapping._keys():
- for x in self._mapping:
- yield x
- class CommentedMapItemsView(CommentedMapView, Set): # type: ignore
- __slots__ = ()
- @classmethod
- def _from_iterable(self, it):
- # type: (Any) -> Any
- return set(it)
- def __contains__(self, item):
- # type: (Any) -> Any
- key, value = item
- try:
- v = self._mapping[key]
- except KeyError:
- return False
- else:
- return v == value
- def __iter__(self):
- # type: () -> Any
- for key in self._mapping._keys():
- yield (key, self._mapping[key])
- class CommentedMapValuesView(CommentedMapView):
- __slots__ = ()
- def __contains__(self, value):
- # type: (Any) -> Any
- for key in self._mapping:
- if value == self._mapping[key]:
- return True
- return False
- def __iter__(self):
- # type: () -> Any
- for key in self._mapping._keys():
- yield self._mapping[key]
- class CommentedMap(ordereddict, CommentedBase): # type: ignore
- __slots__ = (Comment.attrib, '_ok', '_ref')
- def __init__(self, *args, **kw):
- # type: (Any, Any) -> None
- self._ok = set() # type: MutableSet[Any] # own keys
- self._ref = [] # type: List[CommentedMap]
- ordereddict.__init__(self, *args, **kw)
- def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
- # type: (Any, Optional[Any], Optional[Any]) -> None
- """values is set to key to indicate a value attachment of comment"""
- if key is not NoComment:
- self.yaml_key_comment_extend(key, comment)
- return
- if value is not NoComment:
- self.yaml_value_comment_extend(value, comment)
- else:
- self.ca.comment = comment
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- """add on the value line, with value specified by the key"""
- self._yaml_add_comment(comment, value=key)
- def _yaml_get_columnX(self, key):
- # type: (Any) -> Any
- return self.ca.items[key][2].start_mark.column
- def _yaml_get_column(self, key):
- # type: (Any) -> Any
- column = None
- sel_idx = None
- pre, post, last = None, None, None
- for x in self:
- if pre is not None and x != key:
- post = x
- break
- if x == key:
- pre = last
- last = x
- if pre in self.ca.items:
- sel_idx = pre
- elif post in self.ca.items:
- sel_idx = post
- else:
- # self.ca.items is not ordered
- for k1 in self:
- if k1 >= key:
- break
- if k1 not in self.ca.items:
- continue
- sel_idx = k1
- if sel_idx is not None:
- column = self._yaml_get_columnX(sel_idx)
- return column
- def _yaml_get_pre_comment(self):
- # type: () -> Any
- pre_comments = [] # type: List[Any]
- if self.ca.comment is None:
- self.ca.comment = [None, pre_comments]
- else:
- self.ca.comment[1] = pre_comments
- return pre_comments
- def update(self, *vals, **kw):
- # type: (Any, Any) -> None
- try:
- ordereddict.update(self, *vals, **kw)
- except TypeError:
- # probably a dict that is used
- for x in vals[0]:
- self[x] = vals[0][x]
- try:
- self._ok.update(vals.keys()) # type: ignore
- except AttributeError:
- # assume one argument that is a list/tuple of two element lists/tuples
- for x in vals[0]:
- self._ok.add(x[0])
- if kw:
- self._ok.add(*kw.keys())
- def insert(self, pos, key, value, comment=None):
- # type: (Any, Any, Any, Optional[Any]) -> None
- """insert key value into given position
- attach comment if provided
- """
- ordereddict.insert(self, pos, key, value)
- self._ok.add(key)
- if comment is not None:
- self.yaml_add_eol_comment(comment, key=key)
- def mlget(self, key, default=None, list_ok=False):
- # type: (Any, Any, Any) -> Any
- """multi-level get that expects dicts within dicts"""
- if not isinstance(key, list):
- return self.get(key, default)
- # assume that the key is a list of recursively accessible dicts
- def get_one_level(key_list, level, d):
- # type: (Any, Any, Any) -> Any
- if not list_ok:
- assert isinstance(d, dict)
- if level >= len(key_list):
- if level > len(key_list):
- raise IndexError
- return d[key_list[level - 1]]
- return get_one_level(key_list, level + 1, d[key_list[level - 1]])
- try:
- return get_one_level(key, 1, self)
- except KeyError:
- return default
- except (TypeError, IndexError):
- if not list_ok:
- raise
- return default
- def __getitem__(self, key):
- # type: (Any) -> Any
- try:
- return ordereddict.__getitem__(self, key)
- except KeyError:
- for merged in getattr(self, merge_attrib, []):
- if key in merged[1]:
- return merged[1][key]
- raise
- def __setitem__(self, key, value):
- # type: (Any, Any) -> None
- # try to preserve the scalarstring type if setting an existing key to a new value
- if key in self:
- if (
- isinstance(value, string_types)
- and not isinstance(value, ScalarString)
- and isinstance(self[key], ScalarString)
- ):
- value = type(self[key])(value)
- ordereddict.__setitem__(self, key, value)
- self._ok.add(key)
- def _unmerged_contains(self, key):
- # type: (Any) -> Any
- if key in self._ok:
- return True
- return None
- def __contains__(self, key):
- # type: (Any) -> bool
- return bool(ordereddict.__contains__(self, key))
- def get(self, key, default=None):
- # type: (Any, Any) -> Any
- try:
- return self.__getitem__(key)
- except: # NOQA
- return default
- def __repr__(self):
- # type: () -> Any
- return ordereddict.__repr__(self).replace('CommentedMap', 'ordereddict')
- def non_merged_items(self):
- # type: () -> Any
- for x in ordereddict.__iter__(self):
- if x in self._ok:
- yield x, ordereddict.__getitem__(self, x)
- def __delitem__(self, key):
- # type: (Any) -> None
- # for merged in getattr(self, merge_attrib, []):
- # if key in merged[1]:
- # value = merged[1][key]
- # break
- # else:
- # # not found in merged in stuff
- # ordereddict.__delitem__(self, key)
- # for referer in self._ref:
- # referer.update_key_value(key)
- # return
- #
- # ordereddict.__setitem__(self, key, value) # merge might have different value
- # self._ok.discard(key)
- self._ok.discard(key)
- ordereddict.__delitem__(self, key)
- for referer in self._ref:
- referer.update_key_value(key)
- def __iter__(self):
- # type: () -> Any
- for x in ordereddict.__iter__(self):
- yield x
- def _keys(self):
- # type: () -> Any
- for x in ordereddict.__iter__(self):
- yield x
- def __len__(self):
- # type: () -> int
- return int(ordereddict.__len__(self))
- def __eq__(self, other):
- # type: (Any) -> bool
- return bool(dict(self) == other)
- if PY2:
- def keys(self):
- # type: () -> Any
- return list(self._keys())
- def iterkeys(self):
- # type: () -> Any
- return self._keys()
- def viewkeys(self):
- # type: () -> Any
- return CommentedMapKeysView(self)
- else:
- def keys(self):
- # type: () -> Any
- return CommentedMapKeysView(self)
- if PY2:
- def _values(self):
- # type: () -> Any
- for x in ordereddict.__iter__(self):
- yield ordereddict.__getitem__(self, x)
- def values(self):
- # type: () -> Any
- return list(self._values())
- def itervalues(self):
- # type: () -> Any
- return self._values()
- def viewvalues(self):
- # type: () -> Any
- return CommentedMapValuesView(self)
- else:
- def values(self):
- # type: () -> Any
- return CommentedMapValuesView(self)
- def _items(self):
- # type: () -> Any
- for x in ordereddict.__iter__(self):
- yield x, ordereddict.__getitem__(self, x)
- if PY2:
- def items(self):
- # type: () -> Any
- return list(self._items())
- def iteritems(self):
- # type: () -> Any
- return self._items()
- def viewitems(self):
- # type: () -> Any
- return CommentedMapItemsView(self)
- else:
- def items(self):
- # type: () -> Any
- return CommentedMapItemsView(self)
- @property
- def merge(self):
- # type: () -> Any
- if not hasattr(self, merge_attrib):
- setattr(self, merge_attrib, [])
- return getattr(self, merge_attrib)
- def copy(self):
- # type: () -> Any
- x = type(self)() # update doesn't work
- for k, v in self._items():
- x[k] = v
- self.copy_attributes(x)
- return x
- def add_referent(self, cm):
- # type: (Any) -> None
- if cm not in self._ref:
- self._ref.append(cm)
- def add_yaml_merge(self, value):
- # type: (Any) -> None
- for v in value:
- v[1].add_referent(self)
- for k, v in v[1].items():
- if ordereddict.__contains__(self, k):
- continue
- ordereddict.__setitem__(self, k, v)
- self.merge.extend(value)
- def update_key_value(self, key):
- # type: (Any) -> None
- if key in self._ok:
- return
- for v in self.merge:
- if key in v[1]:
- ordereddict.__setitem__(self, key, v[1][key])
- return
- ordereddict.__delitem__(self, key)
- def __deepcopy__(self, memo):
- # type: (Any) -> Any
- res = self.__class__()
- memo[id(self)] = res
- for k in self:
- res[k] = copy.deepcopy(self[k], memo)
- self.copy_attributes(res, memo=memo)
- return res
- # based on brownie mappings
- @classmethod # type: ignore
- def raise_immutable(cls, *args, **kwargs):
- # type: (Any, *Any, **Any) -> None
- raise TypeError('{} objects are immutable'.format(cls.__name__))
- class CommentedKeyMap(CommentedBase, Mapping): # type: ignore
- __slots__ = Comment.attrib, '_od'
- """This primarily exists to be able to roundtrip keys that are mappings"""
- def __init__(self, *args, **kw):
- # type: (Any, Any) -> None
- if hasattr(self, '_od'):
- raise_immutable(self)
- try:
- self._od = ordereddict(*args, **kw)
- except TypeError:
- if PY2:
- self._od = ordereddict(args[0].items())
- else:
- raise
- __delitem__ = __setitem__ = clear = pop = popitem = setdefault = update = raise_immutable
- # need to implement __getitem__, __iter__ and __len__
- def __getitem__(self, index):
- # type: (Any) -> Any
- return self._od[index]
- def __iter__(self):
- # type: () -> Iterator[Any]
- for x in self._od.__iter__():
- yield x
- def __len__(self):
- # type: () -> int
- return len(self._od)
- def __hash__(self):
- # type: () -> Any
- return hash(tuple(self.items()))
- def __repr__(self):
- # type: () -> Any
- if not hasattr(self, merge_attrib):
- return self._od.__repr__()
- return 'ordereddict(' + repr(list(self._od.items())) + ')'
- @classmethod
- def fromkeys(keys, v=None):
- # type: (Any, Any) -> Any
- return CommentedKeyMap(dict.fromkeys(keys, v))
- def _yaml_add_comment(self, comment, key=NoComment):
- # type: (Any, Optional[Any]) -> None
- if key is not NoComment:
- self.yaml_key_comment_extend(key, comment)
- else:
- self.ca.comment = comment
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- self._yaml_add_comment(comment, key=key)
- def _yaml_get_columnX(self, key):
- # type: (Any) -> Any
- return self.ca.items[key][0].start_mark.column
- def _yaml_get_column(self, key):
- # type: (Any) -> Any
- column = None
- sel_idx = None
- pre, post = key - 1, key + 1
- if pre in self.ca.items:
- sel_idx = pre
- elif post in self.ca.items:
- sel_idx = post
- else:
- # self.ca.items is not ordered
- for row_idx, _k1 in enumerate(self):
- if row_idx >= key:
- break
- if row_idx not in self.ca.items:
- continue
- sel_idx = row_idx
- if sel_idx is not None:
- column = self._yaml_get_columnX(sel_idx)
- return column
- def _yaml_get_pre_comment(self):
- # type: () -> Any
- pre_comments = [] # type: List[Any]
- if self.ca.comment is None:
- self.ca.comment = [None, pre_comments]
- else:
- self.ca.comment[1] = pre_comments
- return pre_comments
- class CommentedOrderedMap(CommentedMap):
- __slots__ = (Comment.attrib,)
- class CommentedSet(MutableSet, CommentedBase): # type: ignore # NOQA
- __slots__ = Comment.attrib, 'odict'
- def __init__(self, values=None):
- # type: (Any) -> None
- self.odict = ordereddict()
- MutableSet.__init__(self)
- if values is not None:
- self |= values # type: ignore
- def _yaml_add_comment(self, comment, key=NoComment, value=NoComment):
- # type: (Any, Optional[Any], Optional[Any]) -> None
- """values is set to key to indicate a value attachment of comment"""
- if key is not NoComment:
- self.yaml_key_comment_extend(key, comment)
- return
- if value is not NoComment:
- self.yaml_value_comment_extend(value, comment)
- else:
- self.ca.comment = comment
- def _yaml_add_eol_comment(self, comment, key):
- # type: (Any, Any) -> None
- """add on the value line, with value specified by the key"""
- self._yaml_add_comment(comment, value=key)
- def add(self, value):
- # type: (Any) -> None
- """Add an element."""
- self.odict[value] = None
- def discard(self, value):
- # type: (Any) -> None
- """Remove an element. Do not raise an exception if absent."""
- del self.odict[value]
- def __contains__(self, x):
- # type: (Any) -> Any
- return x in self.odict
- def __iter__(self):
- # type: () -> Any
- for x in self.odict:
- yield x
- def __len__(self):
- # type: () -> int
- return len(self.odict)
- def __repr__(self):
- # type: () -> str
- return 'set({0!r})'.format(self.odict.keys())
- class TaggedScalar(CommentedBase):
- # the value and style attributes are set during roundtrip construction
- def __init__(self, value=None, style=None, tag=None):
- # type: (Any, Any, Any) -> None
- self.value = value
- self.style = style
- if tag is not None:
- self.yaml_set_tag(tag)
- def __str__(self):
- # type: () -> Any
- return self.value
- def dump_comments(d, name="", sep='.', out=sys.stdout):
- # type: (Any, str, str, Any) -> None
- """
- recursively dump comments, all but the toplevel preceded by the path
- in dotted form x.0.a
- """
- if isinstance(d, dict) and hasattr(d, 'ca'):
- if name:
- sys.stdout.write('{}\n'.format(name))
- out.write('{}\n'.format(d.ca)) # type: ignore
- for k in d:
- dump_comments(d[k], name=(name + sep + k) if name else k, sep=sep, out=out)
- elif isinstance(d, list) and hasattr(d, 'ca'):
- if name:
- sys.stdout.write('{}\n'.format(name))
- out.write('{}\n'.format(d.ca)) # type: ignore
- for idx, k in enumerate(d):
- dump_comments(
- k, name=(name + sep + str(idx)) if name else str(idx), sep=sep, out=out
- )
|