123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- from __future__ import annotations
- import collections
- import collections.abc
- import contextlib
- import functools
- import http.cookies
- import inspect
- import itertools
- import re
- import typing
- import xml.etree.ElementTree
- from ._utils import (
- IDENTITY,
- NO_DEFAULT,
- ExtractorError,
- LazyList,
- deprecation_warning,
- get_elements_html_by_class,
- get_elements_html_by_attribute,
- get_elements_by_attribute,
- get_element_by_class,
- get_element_html_by_attribute,
- get_element_by_attribute,
- get_element_html_by_id,
- get_element_by_id,
- get_element_html_by_class,
- get_elements_by_class,
- get_element_text_and_html_by_tag,
- is_iterable_like,
- try_call,
- url_or_none,
- variadic,
- )
- def traverse_obj(
- obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
- casesense=True, is_user_input=NO_DEFAULT, traverse_string=False):
- """
- Safely traverse nested `dict`s and `Iterable`s
- >>> obj = [{}, {"key": "value"}]
- >>> traverse_obj(obj, (1, "key"))
- 'value'
- Each of the provided `paths` is tested and the first producing a valid result will be returned.
- The next path will also be tested if the path branched but no results could be found.
- Supported values for traversal are `Mapping`, `Iterable`, `re.Match`,
- `xml.etree.ElementTree` (xpath) and `http.cookies.Morsel`.
- Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded.
- The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
- The keys in the path can be one of:
- - `None`: Return the current object.
- - `set`: Requires the only item in the set to be a type or function,
- like `{type}`/`{type, type, ...}`/`{func}`. If a `type`, return only
- values of this type. If a function, returns `func(obj)`.
- - `str`/`int`: Return `obj[key]`. For `re.Match`, return `obj.group(key)`.
- - `slice`: Branch out and return all values in `obj[key]`.
- - `Ellipsis`: Branch out and return a list of all values.
- - `tuple`/`list`: Branch out and return a list of all matching values.
- Read as: `[traverse_obj(obj, branch) for branch in branches]`.
- - `function`: Branch out and return values filtered by the function.
- Read as: `[value for key, value in obj if function(key, value)]`.
- For `Iterable`s, `key` is the index of the value.
- For `re.Match`es, `key` is the group number (0 = full match)
- as well as additionally any group names, if given.
- - `dict`: Transform the current object and return a matching dict.
- Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
- - `any`-builtin: Take the first matching object and return it, resetting branching.
- - `all`-builtin: Take all matching objects and return them as a list, resetting branching.
- - `filter`-builtin: Return the value if it is truthy, `None` otherwise.
- `tuple`, `list`, and `dict` all support nested paths and branches.
- @params paths Paths by which to traverse.
- @param default Value to return if the paths do not match.
- If the last key in the path is a `dict`, it will apply to each value inside
- the dict instead, depth first. Try to avoid if using nested `dict` keys.
- @param expected_type If a `type`, only accept final values of this type.
- If any other callable, try to call the function on each result.
- If the last key in the path is a `dict`, it will apply to each value inside
- the dict instead, recursively. This does respect branching paths.
- @param get_all If `False`, return the first matching result, otherwise all matching ones.
- @param casesense If `False`, consider string dictionary keys as case insensitive.
- `traverse_string` is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API
- @param traverse_string Whether to traverse into objects as strings.
- If `True`, any non-compatible object will first be
- converted into a string and then traversed into.
- The return value of that path will be a string instead,
- not respecting any further branching.
- @returns The result of the object traversal.
- If successful, `get_all=True`, and the path branches at least once,
- then a list of results is returned instead.
- If no `default` is given and the last path branches, a `list` of results
- is always returned. If a path ends on a `dict` that result will always be a `dict`.
- """
- if is_user_input is not NO_DEFAULT:
- deprecation_warning('The is_user_input parameter is deprecated and no longer works')
- casefold = lambda k: k.casefold() if isinstance(k, str) else k
- if isinstance(expected_type, type):
- type_test = lambda val: val if isinstance(val, expected_type) else None
- else:
- type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
- def apply_key(key, obj, is_last):
- branching = False
- result = None
- if obj is None and traverse_string:
- if key is ... or callable(key) or isinstance(key, slice):
- branching = True
- result = ()
- elif key is None:
- result = obj
- elif isinstance(key, set):
- item = next(iter(key))
- if len(key) > 1 or isinstance(item, type):
- assert all(isinstance(item, type) for item in key)
- if isinstance(obj, tuple(key)):
- result = obj
- else:
- result = try_call(item, args=(obj,))
- elif isinstance(key, (list, tuple)):
- branching = True
- result = itertools.chain.from_iterable(
- apply_path(obj, branch, is_last)[0] for branch in key)
- elif key is ...:
- branching = True
- if isinstance(obj, http.cookies.Morsel):
- obj = dict(obj, key=obj.key, value=obj.value)
- if isinstance(obj, collections.abc.Mapping):
- result = obj.values()
- elif is_iterable_like(obj) or isinstance(obj, xml.etree.ElementTree.Element):
- result = obj
- elif isinstance(obj, re.Match):
- result = obj.groups()
- elif traverse_string:
- branching = False
- result = str(obj)
- else:
- result = ()
- elif callable(key):
- branching = True
- if isinstance(obj, http.cookies.Morsel):
- obj = dict(obj, key=obj.key, value=obj.value)
- if isinstance(obj, collections.abc.Mapping):
- iter_obj = obj.items()
- elif is_iterable_like(obj) or isinstance(obj, xml.etree.ElementTree.Element):
- iter_obj = enumerate(obj)
- elif isinstance(obj, re.Match):
- iter_obj = itertools.chain(
- enumerate((obj.group(), *obj.groups())),
- obj.groupdict().items())
- elif traverse_string:
- branching = False
- iter_obj = enumerate(str(obj))
- else:
- iter_obj = ()
- result = (v for k, v in iter_obj if try_call(key, args=(k, v)))
- if not branching: # string traversal
- result = ''.join(result)
- elif isinstance(key, dict):
- iter_obj = ((k, _traverse_obj(obj, v, False, is_last)) for k, v in key.items())
- result = {
- k: v if v is not None else default for k, v in iter_obj
- if v is not None or default is not NO_DEFAULT
- } or None
- elif isinstance(obj, collections.abc.Mapping):
- if isinstance(obj, http.cookies.Morsel):
- obj = dict(obj, key=obj.key, value=obj.value)
- result = (try_call(obj.get, args=(key,)) if casesense or try_call(obj.__contains__, args=(key,)) else
- next((v for k, v in obj.items() if casefold(k) == key), None))
- elif isinstance(obj, re.Match):
- if isinstance(key, int) or casesense:
- with contextlib.suppress(IndexError):
- result = obj.group(key)
- elif isinstance(key, str):
- result = next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
- elif isinstance(key, (int, slice)):
- if is_iterable_like(obj, (collections.abc.Sequence, xml.etree.ElementTree.Element)):
- branching = isinstance(key, slice)
- with contextlib.suppress(IndexError):
- result = obj[key]
- elif traverse_string:
- with contextlib.suppress(IndexError):
- result = str(obj)[key]
- elif isinstance(obj, xml.etree.ElementTree.Element) and isinstance(key, str):
- xpath, _, special = key.rpartition('/')
- if not special.startswith('@') and not special.endswith('()'):
- xpath = key
- special = None
- # Allow abbreviations of relative paths, absolute paths error
- if xpath.startswith('/'):
- xpath = f'.{xpath}'
- elif xpath and not xpath.startswith('./'):
- xpath = f'./{xpath}'
- def apply_specials(element):
- if special is None:
- return element
- if special == '@':
- return element.attrib
- if special.startswith('@'):
- return try_call(element.attrib.get, args=(special[1:],))
- if special == 'text()':
- return element.text
- raise SyntaxError(f'apply_specials is missing case for {special!r}')
- if xpath:
- result = list(map(apply_specials, obj.iterfind(xpath)))
- else:
- result = apply_specials(obj)
- return branching, result if branching else (result,)
- def lazy_last(iterable):
- iterator = iter(iterable)
- prev = next(iterator, NO_DEFAULT)
- if prev is NO_DEFAULT:
- return
- for item in iterator:
- yield False, prev
- prev = item
- yield True, prev
- def apply_path(start_obj, path, test_type):
- objs = (start_obj,)
- has_branched = False
- key = None
- for last, key in lazy_last(variadic(path, (str, bytes, dict, set))):
- if not casesense and isinstance(key, str):
- key = key.casefold()
- if key in (any, all):
- has_branched = False
- filtered_objs = (obj for obj in objs if obj not in (None, {}))
- if key is any:
- objs = (next(filtered_objs, None),)
- else:
- objs = (list(filtered_objs),)
- continue
- if key is filter:
- objs = filter(None, objs)
- continue
- if __debug__ and callable(key):
- # Verify function signature
- inspect.signature(key).bind(None, None)
- new_objs = []
- for obj in objs:
- branching, results = apply_key(key, obj, last)
- has_branched |= branching
- new_objs.append(results)
- objs = itertools.chain.from_iterable(new_objs)
- if test_type and not isinstance(key, (dict, list, tuple)):
- objs = map(type_test, objs)
- return objs, has_branched, isinstance(key, dict)
- def _traverse_obj(obj, path, allow_empty, test_type):
- results, has_branched, is_dict = apply_path(obj, path, test_type)
- results = LazyList(item for item in results if item not in (None, {}))
- if get_all and has_branched:
- if results:
- return results.exhaust()
- if allow_empty:
- return [] if default is NO_DEFAULT else default
- return None
- return results[0] if results else {} if allow_empty and is_dict else None
- for index, path in enumerate(paths, 1):
- is_last = index == len(paths)
- try:
- result = _traverse_obj(obj, path, is_last, True)
- if result is not None:
- return result
- except _RequiredError as e:
- if is_last:
- # Reraise to get cleaner stack trace
- raise ExtractorError(e.orig_msg, expected=e.expected) from None
- return None if default is NO_DEFAULT else default
- def value(value, /):
- return lambda _: value
- def require(name, /, *, expected=False):
- def func(value):
- if value is None:
- raise _RequiredError(f'Unable to extract {name}', expected=expected)
- return value
- return func
- class _RequiredError(ExtractorError):
- pass
- @typing.overload
- def subs_list_to_dict(*, lang: str | None = 'und', ext: str | None = None) -> collections.abc.Callable[[list[dict]], dict[str, list[dict]]]: ...
- @typing.overload
- def subs_list_to_dict(subs: list[dict] | None, /, *, lang: str | None = 'und', ext: str | None = None) -> dict[str, list[dict]]: ...
- def subs_list_to_dict(subs: list[dict] | None = None, /, *, lang='und', ext=None):
- """
- Convert subtitles from a traversal into a subtitle dict.
- The path should have an `all` immediately before this function.
- Arguments:
- `ext` The default value for `ext` in the subtitle dict
- In the dict you can set the following additional items:
- `id` The subtitle id to sort the dict into
- `quality` The sort order for each subtitle
- """
- if subs is None:
- return functools.partial(subs_list_to_dict, lang=lang, ext=ext)
- result = collections.defaultdict(list)
- for sub in subs:
- if not url_or_none(sub.get('url')) and not sub.get('data'):
- continue
- sub_id = sub.pop('id', None)
- if not isinstance(sub_id, str):
- if not lang:
- continue
- sub_id = lang
- sub_ext = sub.get('ext')
- if not isinstance(sub_ext, str):
- if not ext:
- sub.pop('ext', None)
- else:
- sub['ext'] = ext
- result[sub_id].append(sub)
- result = dict(result)
- for subs in result.values():
- subs.sort(key=lambda x: x.pop('quality', 0) or 0)
- return result
- @typing.overload
- def find_element(*, attr: str, value: str, tag: str | None = None, html=False, regex=False): ...
- @typing.overload
- def find_element(*, cls: str, html=False): ...
- @typing.overload
- def find_element(*, id: str, tag: str | None = None, html=False, regex=False): ...
- @typing.overload
- def find_element(*, tag: str, html=False, regex=False): ...
- def find_element(*, tag=None, id=None, cls=None, attr=None, value=None, html=False, regex=False):
- # deliberately using `id=` and `cls=` for ease of readability
- assert tag or id or cls or (attr and value), 'One of tag, id, cls or (attr AND value) is required'
- ANY_TAG = r'[\w:.-]+'
- if attr and value:
- assert not cls, 'Cannot match both attr and cls'
- assert not id, 'Cannot match both attr and id'
- func = get_element_html_by_attribute if html else get_element_by_attribute
- return functools.partial(func, attr, value, tag=tag or ANY_TAG, escape_value=not regex)
- elif cls:
- assert not id, 'Cannot match both cls and id'
- assert tag is None, 'Cannot match both cls and tag'
- assert not regex, 'Cannot use regex with cls'
- func = get_element_html_by_class if html else get_element_by_class
- return functools.partial(func, cls)
- elif id:
- func = get_element_html_by_id if html else get_element_by_id
- return functools.partial(func, id, tag=tag or ANY_TAG, escape_value=not regex)
- index = int(bool(html))
- return lambda html: get_element_text_and_html_by_tag(tag, html)[index]
- @typing.overload
- def find_elements(*, cls: str, html=False): ...
- @typing.overload
- def find_elements(*, attr: str, value: str, tag: str | None = None, html=False, regex=False): ...
- def find_elements(*, tag=None, cls=None, attr=None, value=None, html=False, regex=False):
- # deliberately using `cls=` for ease of readability
- assert cls or (attr and value), 'One of cls or (attr AND value) is required'
- if attr and value:
- assert not cls, 'Cannot match both attr and cls'
- func = get_elements_html_by_attribute if html else get_elements_by_attribute
- return functools.partial(func, attr, value, tag=tag or r'[\w:.-]+', escape_value=not regex)
- assert not tag, 'Cannot match both cls and tag'
- assert not regex, 'Cannot use regex with cls'
- func = get_elements_html_by_class if html else get_elements_by_class
- return functools.partial(func, cls)
- def trim_str(*, start=None, end=None):
- def trim(s):
- if s is None:
- return None
- start_idx = 0
- if start and s.startswith(start):
- start_idx = len(start)
- if end and s.endswith(end):
- return s[start_idx:-len(end)]
- return s[start_idx:]
- return trim
- def unpack(func, **kwargs):
- @functools.wraps(func)
- def inner(items):
- return func(*items, **kwargs)
- return inner
- def get_first(obj, *paths, **kwargs):
- return traverse_obj(obj, *((..., *variadic(keys)) for keys in paths), **kwargs, get_all=False)
- def dict_get(d, key_or_keys, default=None, skip_false_values=True):
- for val in map(d.get, variadic(key_or_keys)):
- if val is not None and (val or not skip_false_values):
- return val
- return default
|