|
@@ -27,6 +27,7 @@ import zlib
|
|
|
from email.message import Message
|
|
|
from http.cookiejar import CookieJar
|
|
|
|
|
|
+from test.conftest import validate_and_send
|
|
|
from test.helper import FakeYDL, http_server_port, verify_address_availability
|
|
|
from yt_dlp.cookies import YoutubeDLCookieJar
|
|
|
from yt_dlp.dependencies import brotli, requests, urllib3
|
|
@@ -50,11 +51,14 @@ from yt_dlp.networking.exceptions import (
|
|
|
TransportError,
|
|
|
UnsupportedRequest,
|
|
|
)
|
|
|
+from yt_dlp.networking.impersonate import (
|
|
|
+ ImpersonateRequestHandler,
|
|
|
+ ImpersonateTarget,
|
|
|
+)
|
|
|
+from yt_dlp.utils import YoutubeDLError
|
|
|
from yt_dlp.utils._utils import _YDLLogger as FakeLogger
|
|
|
from yt_dlp.utils.networking import HTTPHeaderDict
|
|
|
|
|
|
-from test.conftest import validate_and_send
|
|
|
-
|
|
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
|
|
@@ -1113,6 +1117,10 @@ class FakeResponse(Response):
|
|
|
|
|
|
class FakeRH(RequestHandler):
|
|
|
|
|
|
+ def __init__(self, *args, **params):
|
|
|
+ self.params = params
|
|
|
+ super().__init__(*args, **params)
|
|
|
+
|
|
|
def _validate(self, request):
|
|
|
return
|
|
|
|
|
@@ -1271,15 +1279,10 @@ class TestYoutubeDLNetworking:
|
|
|
('', {'all': '__noproxy__'}),
|
|
|
(None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}) # env, set https
|
|
|
])
|
|
|
- def test_proxy(self, proxy, expected):
|
|
|
- old_http_proxy = os.environ.get('HTTP_PROXY')
|
|
|
- try:
|
|
|
- os.environ['HTTP_PROXY'] = 'http://127.0.0.1:8081' # ensure that provided proxies override env
|
|
|
- with FakeYDL({'proxy': proxy}) as ydl:
|
|
|
- assert ydl.proxies == expected
|
|
|
- finally:
|
|
|
- if old_http_proxy:
|
|
|
- os.environ['HTTP_PROXY'] = old_http_proxy
|
|
|
+ def test_proxy(self, proxy, expected, monkeypatch):
|
|
|
+ monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
|
|
|
+ with FakeYDL({'proxy': proxy}) as ydl:
|
|
|
+ assert ydl.proxies == expected
|
|
|
|
|
|
def test_compat_request(self):
|
|
|
with FakeRHYDL() as ydl:
|
|
@@ -1331,6 +1334,95 @@ class TestYoutubeDLNetworking:
|
|
|
with pytest.raises(SSLError, match='testerror'):
|
|
|
ydl.urlopen('ssl://testerror')
|
|
|
|
|
|
+ def test_unsupported_impersonate_target(self):
|
|
|
+ class FakeImpersonationRHYDL(FakeYDL):
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ class HTTPRH(RequestHandler):
|
|
|
+ def _send(self, request: Request):
|
|
|
+ pass
|
|
|
+ _SUPPORTED_URL_SCHEMES = ('http',)
|
|
|
+ _SUPPORTED_PROXY_SCHEMES = None
|
|
|
+
|
|
|
+ super().__init__(*args, **kwargs)
|
|
|
+ self._request_director = self.build_request_director([HTTPRH])
|
|
|
+
|
|
|
+ with FakeImpersonationRHYDL() as ydl:
|
|
|
+ with pytest.raises(
|
|
|
+ RequestError,
|
|
|
+ match=r'Impersonate target "test" is not available'
|
|
|
+ ):
|
|
|
+ ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
|
|
|
+
|
|
|
+ def test_unsupported_impersonate_extension(self):
|
|
|
+ class FakeHTTPRHYDL(FakeYDL):
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ class IRH(ImpersonateRequestHandler):
|
|
|
+ def _send(self, request: Request):
|
|
|
+ pass
|
|
|
+
|
|
|
+ _SUPPORTED_URL_SCHEMES = ('http',)
|
|
|
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc',): 'test'}
|
|
|
+ _SUPPORTED_PROXY_SCHEMES = None
|
|
|
+
|
|
|
+ super().__init__(*args, **kwargs)
|
|
|
+ self._request_director = self.build_request_director([IRH])
|
|
|
+
|
|
|
+ with FakeHTTPRHYDL() as ydl:
|
|
|
+ with pytest.raises(
|
|
|
+ RequestError,
|
|
|
+ match=r'Impersonate target "test" is not available'
|
|
|
+ ):
|
|
|
+ ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
|
|
|
+
|
|
|
+ def test_raise_impersonate_error(self):
|
|
|
+ with pytest.raises(
|
|
|
+ YoutubeDLError,
|
|
|
+ match=r'Impersonate target "test" is not available'
|
|
|
+ ):
|
|
|
+ FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
|
|
|
+
|
|
|
+ def test_pass_impersonate_param(self, monkeypatch):
|
|
|
+
|
|
|
+ class IRH(ImpersonateRequestHandler):
|
|
|
+ def _send(self, request: Request):
|
|
|
+ pass
|
|
|
+
|
|
|
+ _SUPPORTED_URL_SCHEMES = ('http',)
|
|
|
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
|
|
|
+
|
|
|
+ # Bypass the check on initialize
|
|
|
+ brh = FakeYDL.build_request_director
|
|
|
+ monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
|
|
|
+
|
|
|
+ with FakeYDL({
|
|
|
+ 'impersonate': ImpersonateTarget('abc', None, None, None)
|
|
|
+ }) as ydl:
|
|
|
+ rh = self.build_handler(ydl, IRH)
|
|
|
+ assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
|
|
|
+
|
|
|
+ def test_get_impersonate_targets(self):
|
|
|
+ handlers = []
|
|
|
+ for target_client in ('abc', 'xyz', 'asd'):
|
|
|
+ class TestRH(ImpersonateRequestHandler):
|
|
|
+ def _send(self, request: Request):
|
|
|
+ pass
|
|
|
+ _SUPPORTED_URL_SCHEMES = ('http',)
|
|
|
+ _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client,): 'test'}
|
|
|
+ RH_KEY = target_client
|
|
|
+ RH_NAME = target_client
|
|
|
+ handlers.append(TestRH)
|
|
|
+
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ ydl._request_director = ydl.build_request_director(handlers)
|
|
|
+ assert set(ydl._get_available_impersonate_targets()) == {
|
|
|
+ (ImpersonateTarget('xyz'), 'xyz'),
|
|
|
+ (ImpersonateTarget('abc'), 'abc'),
|
|
|
+ (ImpersonateTarget('asd'), 'asd')
|
|
|
+ }
|
|
|
+ assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
|
|
|
+ assert ydl._impersonate_target_available(ImpersonateTarget())
|
|
|
+ assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
|
|
|
+
|
|
|
@pytest.mark.parametrize('proxy_key,proxy_url,expected', [
|
|
|
('http', '__noproxy__', None),
|
|
|
('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
|
|
@@ -1341,23 +1433,17 @@ class TestYoutubeDLNetworking:
|
|
|
('http', 'socks4://example.com', 'socks4://example.com'),
|
|
|
('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
|
|
|
])
|
|
|
- def test_clean_proxy(self, proxy_key, proxy_url, expected):
|
|
|
+ def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
|
|
|
# proxies should be cleaned in urlopen()
|
|
|
with FakeRHYDL() as ydl:
|
|
|
req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
|
|
|
assert req.proxies[proxy_key] == expected
|
|
|
|
|
|
# and should also be cleaned when building the handler
|
|
|
- env_key = f'{proxy_key.upper()}_PROXY'
|
|
|
- old_env_proxy = os.environ.get(env_key)
|
|
|
- try:
|
|
|
- os.environ[env_key] = proxy_url # ensure that provided proxies override env
|
|
|
- with FakeYDL() as ydl:
|
|
|
- rh = self.build_handler(ydl)
|
|
|
- assert rh.proxies[proxy_key] == expected
|
|
|
- finally:
|
|
|
- if old_env_proxy:
|
|
|
- os.environ[env_key] = old_env_proxy
|
|
|
+ monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ rh = self.build_handler(ydl)
|
|
|
+ assert rh.proxies[proxy_key] == expected
|
|
|
|
|
|
def test_clean_proxy_header(self):
|
|
|
with FakeRHYDL() as ydl:
|
|
@@ -1629,3 +1715,71 @@ class TestResponse:
|
|
|
assert res.geturl() == res.url
|
|
|
assert res.info() is res.headers
|
|
|
assert res.getheader('test') == res.get_header('test')
|
|
|
+
|
|
|
+
|
|
|
+class TestImpersonateTarget:
|
|
|
+ @pytest.mark.parametrize('target_str,expected', [
|
|
|
+ ('abc', ImpersonateTarget('abc', None, None, None)),
|
|
|
+ ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
|
|
|
+ ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
|
|
|
+ ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
|
|
|
+ ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
|
|
|
+ ('abc:', ImpersonateTarget('abc', None, None, None)),
|
|
|
+ ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
|
|
|
+ (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
|
|
|
+ (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
|
|
|
+ (':', ImpersonateTarget(None, None, None, None)),
|
|
|
+ ('', ImpersonateTarget(None, None, None, None)),
|
|
|
+ ])
|
|
|
+ def test_target_from_str(self, target_str, expected):
|
|
|
+ assert ImpersonateTarget.from_str(target_str) == expected
|
|
|
+
|
|
|
+ @pytest.mark.parametrize('target_str', [
|
|
|
+ '-120', ':-12.0', '-12:-12', '-:-',
|
|
|
+ '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:'
|
|
|
+ ])
|
|
|
+ def test_target_from_invalid_str(self, target_str):
|
|
|
+ with pytest.raises(ValueError):
|
|
|
+ ImpersonateTarget.from_str(target_str)
|
|
|
+
|
|
|
+ @pytest.mark.parametrize('target,expected', [
|
|
|
+ (ImpersonateTarget('abc', None, None, None), 'abc'),
|
|
|
+ (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
|
|
|
+ (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
|
|
|
+ (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
|
|
|
+ (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
|
|
|
+ (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
|
|
|
+ (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
|
|
|
+ (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
|
|
|
+ (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
|
|
|
+ (ImpersonateTarget('abc', ), 'abc'),
|
|
|
+ (ImpersonateTarget(None, None, None, None), ''),
|
|
|
+ ])
|
|
|
+ def test_str(self, target, expected):
|
|
|
+ assert str(target) == expected
|
|
|
+
|
|
|
+ @pytest.mark.parametrize('args', [
|
|
|
+ ('abc', None, None, '5'),
|
|
|
+ ('abc', '120', None, '5'),
|
|
|
+ (None, '120', None, None),
|
|
|
+ (None, '120', None, '5'),
|
|
|
+ (None, None, None, '5'),
|
|
|
+ (None, '120', 'xyz', '5'),
|
|
|
+ ])
|
|
|
+ def test_invalid_impersonate_target(self, args):
|
|
|
+ with pytest.raises(ValueError):
|
|
|
+ ImpersonateTarget(*args)
|
|
|
+
|
|
|
+ @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
|
|
|
+ (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
|
|
|
+ (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
|
|
|
+ (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
|
|
|
+ (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
|
|
|
+ (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
|
|
|
+ (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
|
|
|
+ (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
|
|
|
+ (ImpersonateTarget(), ImpersonateTarget(), True, True),
|
|
|
+ ])
|
|
|
+ def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
|
|
|
+ assert (target1 in target2) is is_in
|
|
|
+ assert (target1 == target2) is is_eq
|