traversal.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import collections.abc
  2. import contextlib
  3. import http.cookies
  4. import inspect
  5. import itertools
  6. import re
  7. import xml.etree.ElementTree
  8. from ._utils import (
  9. IDENTITY,
  10. NO_DEFAULT,
  11. LazyList,
  12. deprecation_warning,
  13. is_iterable_like,
  14. try_call,
  15. variadic,
  16. )
  17. def traverse_obj(
  18. obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
  19. casesense=True, is_user_input=NO_DEFAULT, traverse_string=False):
  20. """
  21. Safely traverse nested `dict`s and `Iterable`s
  22. >>> obj = [{}, {"key": "value"}]
  23. >>> traverse_obj(obj, (1, "key"))
  24. 'value'
  25. Each of the provided `paths` is tested and the first producing a valid result will be returned.
  26. The next path will also be tested if the path branched but no results could be found.
  27. Supported values for traversal are `Mapping`, `Iterable`, `re.Match`,
  28. `xml.etree.ElementTree` (xpath) and `http.cookies.Morsel`.
  29. Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
  30. The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
  31. The keys in the path can be one of:
  32. - `None`: Return the current object.
  33. - `set`: Requires the only item in the set to be a type or function,
  34. like `{type}`/`{type, type, ...}/`{func}`. If a `type`, return only
  35. values of this type. If a function, returns `func(obj)`.
  36. - `str`/`int`: Return `obj[key]`. For `re.Match`, return `obj.group(key)`.
  37. - `slice`: Branch out and return all values in `obj[key]`.
  38. - `Ellipsis`: Branch out and return a list of all values.
  39. - `tuple`/`list`: Branch out and return a list of all matching values.
  40. Read as: `[traverse_obj(obj, branch) for branch in branches]`.
  41. - `function`: Branch out and return values filtered by the function.
  42. Read as: `[value for key, value in obj if function(key, value)]`.
  43. For `Iterable`s, `key` is the index of the value.
  44. For `re.Match`es, `key` is the group number (0 = full match)
  45. as well as additionally any group names, if given.
  46. - `dict`: Transform the current object and return a matching dict.
  47. Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
  48. - `any`-builtin: Take the first matching object and return it, resetting branching.
  49. - `all`-builtin: Take all matching objects and return them as a list, resetting branching.
  50. `tuple`, `list`, and `dict` all support nested paths and branches.
  51. @params paths Paths which to traverse by.
  52. @param default Value to return if the paths do not match.
  53. If the last key in the path is a `dict`, it will apply to each value inside
  54. the dict instead, depth first. Try to avoid if using nested `dict` keys.
  55. @param expected_type If a `type`, only accept final values of this type.
  56. If any other callable, try to call the function on each result.
  57. If the last key in the path is a `dict`, it will apply to each value inside
  58. the dict instead, recursively. This does respect branching paths.
  59. @param get_all If `False`, return the first matching result, otherwise all matching ones.
  60. @param casesense If `False`, consider string dictionary keys as case insensitive.
  61. `traverse_string` is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API
  62. @param traverse_string Whether to traverse into objects as strings.
  63. If `True`, any non-compatible object will first be
  64. converted into a string and then traversed into.
  65. The return value of that path will be a string instead,
  66. not respecting any further branching.
  67. @returns The result of the object traversal.
  68. If successful, `get_all=True`, and the path branches at least once,
  69. then a list of results is returned instead.
  70. If no `default` is given and the last path branches, a `list` of results
  71. is always returned. If a path ends on a `dict` that result will always be a `dict`.
  72. """
  73. if is_user_input is not NO_DEFAULT:
  74. deprecation_warning('The is_user_input parameter is deprecated and no longer works')
  75. casefold = lambda k: k.casefold() if isinstance(k, str) else k
  76. if isinstance(expected_type, type):
  77. type_test = lambda val: val if isinstance(val, expected_type) else None
  78. else:
  79. type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
  80. def apply_key(key, obj, is_last):
  81. branching = False
  82. result = None
  83. if obj is None and traverse_string:
  84. if key is ... or callable(key) or isinstance(key, slice):
  85. branching = True
  86. result = ()
  87. elif key is None:
  88. result = obj
  89. elif isinstance(key, set):
  90. item = next(iter(key))
  91. if len(key) > 1 or isinstance(item, type):
  92. assert all(isinstance(item, type) for item in key)
  93. if isinstance(obj, tuple(key)):
  94. result = obj
  95. else:
  96. result = try_call(item, args=(obj,))
  97. elif isinstance(key, (list, tuple)):
  98. branching = True
  99. result = itertools.chain.from_iterable(
  100. apply_path(obj, branch, is_last)[0] for branch in key)
  101. elif key is ...:
  102. branching = True
  103. if isinstance(obj, http.cookies.Morsel):
  104. obj = dict(obj, key=obj.key, value=obj.value)
  105. if isinstance(obj, collections.abc.Mapping):
  106. result = obj.values()
  107. elif is_iterable_like(obj) or isinstance(obj, xml.etree.ElementTree.Element):
  108. result = obj
  109. elif isinstance(obj, re.Match):
  110. result = obj.groups()
  111. elif traverse_string:
  112. branching = False
  113. result = str(obj)
  114. else:
  115. result = ()
  116. elif callable(key):
  117. branching = True
  118. if isinstance(obj, http.cookies.Morsel):
  119. obj = dict(obj, key=obj.key, value=obj.value)
  120. if isinstance(obj, collections.abc.Mapping):
  121. iter_obj = obj.items()
  122. elif is_iterable_like(obj) or isinstance(obj, xml.etree.ElementTree.Element):
  123. iter_obj = enumerate(obj)
  124. elif isinstance(obj, re.Match):
  125. iter_obj = itertools.chain(
  126. enumerate((obj.group(), *obj.groups())),
  127. obj.groupdict().items())
  128. elif traverse_string:
  129. branching = False
  130. iter_obj = enumerate(str(obj))
  131. else:
  132. iter_obj = ()
  133. result = (v for k, v in iter_obj if try_call(key, args=(k, v)))
  134. if not branching: # string traversal
  135. result = ''.join(result)
  136. elif isinstance(key, dict):
  137. iter_obj = ((k, _traverse_obj(obj, v, False, is_last)) for k, v in key.items())
  138. result = {
  139. k: v if v is not None else default for k, v in iter_obj
  140. if v is not None or default is not NO_DEFAULT
  141. } or None
  142. elif isinstance(obj, collections.abc.Mapping):
  143. if isinstance(obj, http.cookies.Morsel):
  144. obj = dict(obj, key=obj.key, value=obj.value)
  145. result = (try_call(obj.get, args=(key,)) if casesense or try_call(obj.__contains__, args=(key,)) else
  146. next((v for k, v in obj.items() if casefold(k) == key), None))
  147. elif isinstance(obj, re.Match):
  148. if isinstance(key, int) or casesense:
  149. with contextlib.suppress(IndexError):
  150. result = obj.group(key)
  151. elif isinstance(key, str):
  152. result = next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
  153. elif isinstance(key, (int, slice)):
  154. if is_iterable_like(obj, (collections.abc.Sequence, xml.etree.ElementTree.Element)):
  155. branching = isinstance(key, slice)
  156. with contextlib.suppress(IndexError):
  157. result = obj[key]
  158. elif traverse_string:
  159. with contextlib.suppress(IndexError):
  160. result = str(obj)[key]
  161. elif isinstance(obj, xml.etree.ElementTree.Element) and isinstance(key, str):
  162. xpath, _, special = key.rpartition('/')
  163. if not special.startswith('@') and not special.endswith('()'):
  164. xpath = key
  165. special = None
  166. # Allow abbreviations of relative paths, absolute paths error
  167. if xpath.startswith('/'):
  168. xpath = f'.{xpath}'
  169. elif xpath and not xpath.startswith('./'):
  170. xpath = f'./{xpath}'
  171. def apply_specials(element):
  172. if special is None:
  173. return element
  174. if special == '@':
  175. return element.attrib
  176. if special.startswith('@'):
  177. return try_call(element.attrib.get, args=(special[1:],))
  178. if special == 'text()':
  179. return element.text
  180. raise SyntaxError(f'apply_specials is missing case for {special!r}')
  181. if xpath:
  182. result = list(map(apply_specials, obj.iterfind(xpath)))
  183. else:
  184. result = apply_specials(obj)
  185. return branching, result if branching else (result,)
  186. def lazy_last(iterable):
  187. iterator = iter(iterable)
  188. prev = next(iterator, NO_DEFAULT)
  189. if prev is NO_DEFAULT:
  190. return
  191. for item in iterator:
  192. yield False, prev
  193. prev = item
  194. yield True, prev
  195. def apply_path(start_obj, path, test_type):
  196. objs = (start_obj,)
  197. has_branched = False
  198. key = None
  199. for last, key in lazy_last(variadic(path, (str, bytes, dict, set))):
  200. if not casesense and isinstance(key, str):
  201. key = key.casefold()
  202. if key in (any, all):
  203. has_branched = False
  204. filtered_objs = (obj for obj in objs if obj not in (None, {}))
  205. if key is any:
  206. objs = (next(filtered_objs, None),)
  207. else:
  208. objs = (list(filtered_objs),)
  209. continue
  210. if __debug__ and callable(key):
  211. # Verify function signature
  212. inspect.signature(key).bind(None, None)
  213. new_objs = []
  214. for obj in objs:
  215. branching, results = apply_key(key, obj, last)
  216. has_branched |= branching
  217. new_objs.append(results)
  218. objs = itertools.chain.from_iterable(new_objs)
  219. if test_type and not isinstance(key, (dict, list, tuple)):
  220. objs = map(type_test, objs)
  221. return objs, has_branched, isinstance(key, dict)
  222. def _traverse_obj(obj, path, allow_empty, test_type):
  223. results, has_branched, is_dict = apply_path(obj, path, test_type)
  224. results = LazyList(item for item in results if item not in (None, {}))
  225. if get_all and has_branched:
  226. if results:
  227. return results.exhaust()
  228. if allow_empty:
  229. return [] if default is NO_DEFAULT else default
  230. return None
  231. return results[0] if results else {} if allow_empty and is_dict else None
  232. for index, path in enumerate(paths, 1):
  233. result = _traverse_obj(obj, path, index == len(paths), True)
  234. if result is not None:
  235. return result
  236. return None if default is NO_DEFAULT else default
  237. def get_first(obj, *paths, **kwargs):
  238. return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)
  239. def dict_get(d, key_or_keys, default=None, skip_false_values=True):
  240. for val in map(d.get, variadic(key_or_keys)):
  241. if val is not None and (val or not skip_false_values):
  242. return val
  243. return default