compat.py 6.7 KB


  1. # coding: utf-8
  2. # partially from package six by Benjamin Peterson
  3. import sys
  4. import os
  5. import io
  6. import traceback
  7. from abc import abstractmethod
  8. import collections.abc
  9. # fmt: off
  10. from typing import Any, Dict, Optional, List, Union, BinaryIO, IO, Text, Tuple # NOQA
  11. from typing import Optional # NOQA
  12. try:
  13. from typing import SupportsIndex as SupportsIndex # in order to reexport for mypy
  14. except ImportError:
  15. SupportsIndex = int # type: ignore
  16. # fmt: on
  17. _DEFAULT_YAML_VERSION = (1, 2)
  18. try:
  19. from collections import OrderedDict
  20. except ImportError:
  21. from ordereddict import OrderedDict # type: ignore
  22. # to get the right name import ... as ordereddict doesn't do that
  23. class ordereddict(OrderedDict): # type: ignore
  24. if not hasattr(OrderedDict, 'insert'):
  25. def insert(self, pos: int, key: Any, value: Any) -> None:
  26. if pos >= len(self):
  27. self[key] = value
  28. return
  29. od = ordereddict()
  30. od.update(self)
  31. for k in od:
  32. del self[k]
  33. for index, old_key in enumerate(od):
  34. if pos == index:
  35. self[key] = value
  36. self[old_key] = od[old_key]
  37. StringIO = io.StringIO
  38. BytesIO = io.BytesIO
  39. # StreamType = Union[BinaryIO, IO[str], IO[unicode], StringIO]
  40. # StreamType = Union[BinaryIO, IO[str], StringIO] # type: ignore
  41. StreamType = Any
  42. StreamTextType = StreamType # Union[Text, StreamType]
  43. VersionType = Union[List[int], str, Tuple[int, int]]
  44. builtins_module = 'builtins'
  45. def with_metaclass(meta: Any, *bases: Any) -> Any:
  46. """Create a base class with a metaclass."""
  47. return meta('NewBase', bases, {})
  48. DBG_TOKEN = 1
  49. DBG_EVENT = 2
  50. DBG_NODE = 4
  51. _debug: Optional[int] = None
  52. if 'RUAMELDEBUG' in os.environ:
  53. _debugx = os.environ.get('RUAMELDEBUG')
  54. if _debugx is None:
  55. _debug = 0
  56. else:
  57. _debug = int(_debugx)
  58. if bool(_debug):
  59. class ObjectCounter:
  60. def __init__(self) -> None:
  61. self.map: Dict[Any, Any] = {}
  62. def __call__(self, k: Any) -> None:
  63. self.map[k] = self.map.get(k, 0) + 1
  64. def dump(self) -> None:
  65. for k in sorted(self.map):
  66. sys.stdout.write(f'{k} -> {self.map[k]}')
  67. object_counter = ObjectCounter()
  68. # used from yaml util when testing
  69. def dbg(val: Any = None) -> Any:
  70. global _debug
  71. if _debug is None:
  72. # set to true or false
  73. _debugx = os.environ.get('YAMLDEBUG')
  74. if _debugx is None:
  75. _debug = 0
  76. else:
  77. _debug = int(_debugx)
  78. if val is None:
  79. return _debug
  80. return _debug & val
  81. class Nprint:
  82. def __init__(self, file_name: Any = None) -> None:
  83. self._max_print: Any = None
  84. self._count: Any = None
  85. self._file_name = file_name
  86. def __call__(self, *args: Any, **kw: Any) -> None:
  87. if not bool(_debug):
  88. return
  89. out = sys.stdout if self._file_name is None else open(self._file_name, 'a')
  90. dbgprint = print # to fool checking for print statements by dv utility
  91. kw1 = kw.copy()
  92. kw1['file'] = out
  93. dbgprint(*args, **kw1)
  94. out.flush()
  95. if self._max_print is not None:
  96. if self._count is None:
  97. self._count = self._max_print
  98. self._count -= 1
  99. if self._count == 0:
  100. dbgprint('forced exit\n')
  101. traceback.print_stack()
  102. out.flush()
  103. sys.exit(0)
  104. if self._file_name:
  105. out.close()
  106. def set_max_print(self, i: int) -> None:
  107. self._max_print = i
  108. self._count = None
  109. def fp(self, mode: str = 'a') -> Any:
  110. out = sys.stdout if self._file_name is None else open(self._file_name, mode)
  111. return out
  112. nprint = Nprint()
  113. nprintf = Nprint('/var/tmp/ruamel.yaml.log')
  114. # char checkers following production rules
  115. def check_namespace_char(ch: Any) -> bool:
  116. if '\x21' <= ch <= '\x7E': # ! to ~
  117. return True
  118. if '\xA0' <= ch <= '\uD7FF':
  119. return True
  120. if ('\uE000' <= ch <= '\uFFFD') and ch != '\uFEFF': # excl. byte order mark
  121. return True
  122. if '\U00010000' <= ch <= '\U0010FFFF':
  123. return True
  124. return False
  125. def check_anchorname_char(ch: Any) -> bool:
  126. if ch in ',[]{}':
  127. return False
  128. return check_namespace_char(ch)
  129. def version_tnf(t1: Any, t2: Any = None) -> Any:
  130. """
  131. return True if ruamel.yaml version_info < t1, None if t2 is specified and bigger else False
  132. """
  133. from ruamel.yaml import version_info # NOQA
  134. if version_info < t1:
  135. return True
  136. if t2 is not None and version_info < t2:
  137. return None
  138. return False
  139. class MutableSliceableSequence(collections.abc.MutableSequence): # type: ignore
  140. __slots__ = ()
  141. def __getitem__(self, index: Any) -> Any:
  142. if not isinstance(index, slice):
  143. return self.__getsingleitem__(index)
  144. return type(self)([self[i] for i in range(*index.indices(len(self)))]) # type: ignore
  145. def __setitem__(self, index: Any, value: Any) -> None:
  146. if not isinstance(index, slice):
  147. return self.__setsingleitem__(index, value)
  148. assert iter(value)
  149. # nprint(index.start, index.stop, index.step, index.indices(len(self)))
  150. if index.step is None:
  151. del self[index.start : index.stop]
  152. for elem in reversed(value):
  153. self.insert(0 if index.start is None else index.start, elem)
  154. else:
  155. range_parms = index.indices(len(self))
  156. nr_assigned_items = (range_parms[1] - range_parms[0] - 1) // range_parms[2] + 1
  157. # need to test before changing, in case TypeError is caught
  158. if nr_assigned_items < len(value):
  159. raise TypeError(
  160. f'too many elements in value {nr_assigned_items} < {len(value)}',
  161. )
  162. elif nr_assigned_items > len(value):
  163. raise TypeError(
  164. f'not enough elements in value {nr_assigned_items} > {len(value)}',
  165. )
  166. for idx, i in enumerate(range(*range_parms)):
  167. self[i] = value[idx]
  168. def __delitem__(self, index: Any) -> None:
  169. if not isinstance(index, slice):
  170. return self.__delsingleitem__(index)
  171. # nprint(index.start, index.stop, index.step, index.indices(len(self)))
  172. for i in reversed(range(*index.indices(len(self)))):
  173. del self[i]
  174. @abstractmethod
  175. def __getsingleitem__(self, index: Any) -> Any:
  176. raise IndexError
  177. @abstractmethod
  178. def __setsingleitem__(self, index: Any, value: Any) -> None:
  179. raise IndexError
  180. @abstractmethod
  181. def __delsingleitem__(self, index: Any) -> None:
  182. raise IndexError