util.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. # util.py
  2. import contextlib
  3. from functools import lru_cache, wraps
  4. import inspect
  5. import itertools
  6. import types
  7. from typing import Callable, Union, Iterable, TypeVar, cast
  8. import warnings
  9. _bslash = chr(92)
  10. C = TypeVar("C", bound=Callable)
  11. class __config_flags:
  12. """Internal class for defining compatibility and debugging flags"""
  13. _all_names: list[str] = []
  14. _fixed_names: list[str] = []
  15. _type_desc = "configuration"
  16. @classmethod
  17. def _set(cls, dname, value):
  18. if dname in cls._fixed_names:
  19. warnings.warn(
  20. f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
  21. f" and cannot be overridden",
  22. stacklevel=3,
  23. )
  24. return
  25. if dname in cls._all_names:
  26. setattr(cls, dname, value)
  27. else:
  28. raise ValueError(f"no such {cls._type_desc} {dname!r}")
  29. enable = classmethod(lambda cls, name: cls._set(name, True))
  30. disable = classmethod(lambda cls, name: cls._set(name, False))
  31. @lru_cache(maxsize=128)
  32. def col(loc: int, strg: str) -> int:
  33. """
  34. Returns current column within a string, counting newlines as line separators.
  35. The first column is number 1.
  36. Note: the default parsing behavior is to expand tabs in the input string
  37. before starting the parsing process. See
  38. :class:`ParserElement.parse_string` for more
  39. information on parsing strings containing ``<TAB>`` s, and suggested
  40. methods to maintain a consistent view of the parsed string, the parse
  41. location, and line and column positions within the parsed string.
  42. """
  43. s = strg
  44. return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
  45. @lru_cache(maxsize=128)
  46. def lineno(loc: int, strg: str) -> int:
  47. """Returns current line number within a string, counting newlines as line separators.
  48. The first line is number 1.
  49. Note - the default parsing behavior is to expand tabs in the input string
  50. before starting the parsing process. See :class:`ParserElement.parse_string`
  51. for more information on parsing strings containing ``<TAB>`` s, and
  52. suggested methods to maintain a consistent view of the parsed string, the
  53. parse location, and line and column positions within the parsed string.
  54. """
  55. return strg.count("\n", 0, loc) + 1
  56. @lru_cache(maxsize=128)
  57. def line(loc: int, strg: str) -> str:
  58. """
  59. Returns the line of text containing loc within a string, counting newlines as line separators.
  60. """
  61. last_cr = strg.rfind("\n", 0, loc)
  62. next_cr = strg.find("\n", loc)
  63. return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
  64. class _UnboundedCache:
  65. def __init__(self):
  66. cache = {}
  67. cache_get = cache.get
  68. self.not_in_cache = not_in_cache = object()
  69. def get(_, key):
  70. return cache_get(key, not_in_cache)
  71. def set_(_, key, value):
  72. cache[key] = value
  73. def clear(_):
  74. cache.clear()
  75. self.size = None
  76. self.get = types.MethodType(get, self)
  77. self.set = types.MethodType(set_, self)
  78. self.clear = types.MethodType(clear, self)
  79. class _FifoCache:
  80. def __init__(self, size):
  81. cache = {}
  82. self.size = size
  83. self.not_in_cache = not_in_cache = object()
  84. cache_get = cache.get
  85. cache_pop = cache.pop
  86. def get(_, key):
  87. return cache_get(key, not_in_cache)
  88. def set_(_, key, value):
  89. cache[key] = value
  90. while len(cache) > size:
  91. # pop oldest element in cache by getting the first key
  92. cache_pop(next(iter(cache)))
  93. def clear(_):
  94. cache.clear()
  95. self.get = types.MethodType(get, self)
  96. self.set = types.MethodType(set_, self)
  97. self.clear = types.MethodType(clear, self)
  98. class LRUMemo:
  99. """
  100. A memoizing mapping that retains `capacity` deleted items
  101. The memo tracks retained items by their access order; once `capacity` items
  102. are retained, the least recently used item is discarded.
  103. """
  104. def __init__(self, capacity):
  105. self._capacity = capacity
  106. self._active = {}
  107. self._memory = {}
  108. def __getitem__(self, key):
  109. try:
  110. return self._active[key]
  111. except KeyError:
  112. self._memory[key] = self._memory.pop(key)
  113. return self._memory[key]
  114. def __setitem__(self, key, value):
  115. self._memory.pop(key, None)
  116. self._active[key] = value
  117. def __delitem__(self, key):
  118. try:
  119. value = self._active.pop(key)
  120. except KeyError:
  121. pass
  122. else:
  123. oldest_keys = list(self._memory)[: -(self._capacity + 1)]
  124. for key_to_delete in oldest_keys:
  125. self._memory.pop(key_to_delete)
  126. self._memory[key] = value
  127. def clear(self):
  128. self._active.clear()
  129. self._memory.clear()
  130. class UnboundedMemo(dict):
  131. """
  132. A memoizing mapping that retains all deleted items
  133. """
  134. def __delitem__(self, key):
  135. pass
  136. def _escape_regex_range_chars(s: str) -> str:
  137. # escape these chars: ^-[]
  138. for c in r"\^-[]":
  139. s = s.replace(c, _bslash + c)
  140. s = s.replace("\n", r"\n")
  141. s = s.replace("\t", r"\t")
  142. return str(s)
  143. class _GroupConsecutive:
  144. """
  145. Used as a callable `key` for itertools.groupby to group
  146. characters that are consecutive:
  147. itertools.groupby("abcdejkmpqrs", key=IsConsecutive())
  148. yields:
  149. (0, iter(['a', 'b', 'c', 'd', 'e']))
  150. (1, iter(['j', 'k']))
  151. (2, iter(['m']))
  152. (3, iter(['p', 'q', 'r', 's']))
  153. """
  154. def __init__(self):
  155. self.prev = 0
  156. self.counter = itertools.count()
  157. self.value = -1
  158. def __call__(self, char: str) -> int:
  159. c_int = ord(char)
  160. self.prev, prev = c_int, self.prev
  161. if c_int - prev > 1:
  162. self.value = next(self.counter)
  163. return self.value
  164. def _collapse_string_to_ranges(
  165. s: Union[str, Iterable[str]], re_escape: bool = True
  166. ) -> str:
  167. r"""
  168. Take a string or list of single-character strings, and return
  169. a string of the consecutive characters in that string collapsed
  170. into groups, as might be used in a regular expression '[a-z]'
  171. character set:
  172. 'a' -> 'a' -> '[a]'
  173. 'bc' -> 'bc' -> '[bc]'
  174. 'defgh' -> 'd-h' -> '[d-h]'
  175. 'fdgeh' -> 'd-h' -> '[d-h]'
  176. 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  177. Duplicates get collapsed out:
  178. 'aaa' -> 'a' -> '[a]'
  179. 'bcbccb' -> 'bc' -> '[bc]'
  180. 'defghhgf' -> 'd-h' -> '[d-h]'
  181. 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  182. Spaces are preserved:
  183. 'ab c' -> ' a-c' -> '[ a-c]'
  184. Characters that are significant when defining regex ranges
  185. get escaped:
  186. 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]'
  187. """
  188. # Developer notes:
  189. # - Do not optimize this code assuming that the given input string
  190. # or internal lists will be short (such as in loading generators into
  191. # lists to make it easier to find the last element); this method is also
  192. # used to generate regex ranges for character sets in the pyparsing.unicode
  193. # classes, and these can be _very_ long lists of strings
  194. def escape_re_range_char(c: str) -> str:
  195. return "\\" + c if c in r"\^-][" else c
  196. def no_escape_re_range_char(c: str) -> str:
  197. return c
  198. if not re_escape:
  199. escape_re_range_char = no_escape_re_range_char
  200. ret = []
  201. # reduce input string to remove duplicates, and put in sorted order
  202. s_chars: list[str] = sorted(set(s))
  203. if len(s_chars) > 2:
  204. # find groups of characters that are consecutive (can be collapsed
  205. # down to "<first>-<last>")
  206. for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()):
  207. # _ is unimportant, is just used to identify groups
  208. # chars is an iterator of one or more consecutive characters
  209. # that comprise the current group
  210. first = last = next(chars)
  211. with contextlib.suppress(ValueError):
  212. *_, last = chars
  213. if first == last:
  214. # there was only a single char in this group
  215. ret.append(escape_re_range_char(first))
  216. elif last == chr(ord(first) + 1):
  217. # there were only 2 characters in this group
  218. # 'a','b' -> 'ab'
  219. ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}")
  220. else:
  221. # there were > 2 characters in this group, make into a range
  222. # 'c','d','e' -> 'c-e'
  223. ret.append(
  224. f"{escape_re_range_char(first)}-{escape_re_range_char(last)}"
  225. )
  226. else:
  227. # only 1 or 2 chars were given to form into groups
  228. # 'a' -> ['a']
  229. # 'bc' -> ['b', 'c']
  230. # 'dg' -> ['d', 'g']
  231. # no need to list them with "-", just return as a list
  232. # (after escaping)
  233. ret = [escape_re_range_char(c) for c in s_chars]
  234. return "".join(ret)
  235. def _flatten(ll: Iterable) -> list:
  236. ret = []
  237. to_visit = [*ll]
  238. while to_visit:
  239. i = to_visit.pop(0)
  240. if isinstance(i, Iterable) and not isinstance(i, str):
  241. to_visit[:0] = i
  242. else:
  243. ret.append(i)
  244. return ret
  245. def make_compressed_re(
  246. word_list: Iterable[str], max_level: int = 2, _level: int = 1
  247. ) -> str:
  248. """
  249. Create a regular expression string from a list of words, collapsing by common
  250. prefixes and optional suffixes.
  251. Calls itself recursively to build nested sublists for each group of suffixes
  252. that have a shared prefix.
  253. """
  254. def get_suffixes_from_common_prefixes(namelist: list[str]):
  255. if len(namelist) > 1:
  256. for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]):
  257. yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True)
  258. else:
  259. yield namelist[0][0], [namelist[0][1:]]
  260. if max_level == 0:
  261. return "|".join(sorted(word_list, key=len, reverse=True))
  262. ret = []
  263. sep = ""
  264. for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)):
  265. ret.append(sep)
  266. sep = "|"
  267. trailing = ""
  268. if "" in suffixes:
  269. trailing = "?"
  270. suffixes.remove("")
  271. if len(suffixes) > 1:
  272. if all(len(s) == 1 for s in suffixes):
  273. ret.append(f"{initial}[{''.join(suffixes)}]{trailing}")
  274. else:
  275. if _level < max_level:
  276. suffix_re = make_compressed_re(
  277. sorted(suffixes), max_level, _level + 1
  278. )
  279. ret.append(f"{initial}({suffix_re}){trailing}")
  280. else:
  281. suffixes.sort(key=len, reverse=True)
  282. ret.append(f"{initial}({'|'.join(suffixes)}){trailing}")
  283. else:
  284. if suffixes:
  285. suffix = suffixes[0]
  286. if len(suffix) > 1 and trailing:
  287. ret.append(f"{initial}({suffix}){trailing}")
  288. else:
  289. ret.append(f"{initial}{suffix}{trailing}")
  290. else:
  291. ret.append(initial)
  292. return "".join(ret)
  293. def replaced_by_pep8(compat_name: str, fn: C) -> C:
  294. # In a future version, uncomment the code in the internal _inner() functions
  295. # to begin emitting DeprecationWarnings.
  296. # Unwrap staticmethod/classmethod
  297. fn = getattr(fn, "__func__", fn)
  298. # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
  299. # some extra steps to add it if present in decorated function.)
  300. if ["self"] == list(inspect.signature(fn).parameters)[:1]:
  301. @wraps(fn)
  302. def _inner(self, *args, **kwargs):
  303. # warnings.warn(
  304. # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
  305. # )
  306. return fn(self, *args, **kwargs)
  307. else:
  308. @wraps(fn)
  309. def _inner(*args, **kwargs):
  310. # warnings.warn(
  311. # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
  312. # )
  313. return fn(*args, **kwargs)
  314. _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
  315. _inner.__name__ = compat_name
  316. _inner.__annotations__ = fn.__annotations__
  317. if isinstance(fn, types.FunctionType):
  318. _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined]
  319. elif isinstance(fn, type) and hasattr(fn, "__init__"):
  320. _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined]
  321. else:
  322. _inner.__kwdefaults__ = None # type: ignore [attr-defined]
  323. _inner.__qualname__ = fn.__qualname__
  324. return cast(C, _inner)