123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- import os
- import sys
- import pytest
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import contextlib
- import io
- import platform
- import random
- import ssl
- import urllib.error
- import warnings
- from yt_dlp.cookies import YoutubeDLCookieJar
- from yt_dlp.dependencies import certifi
- from yt_dlp.networking import Response
- from yt_dlp.networking._helper import (
- InstanceStoreMixin,
- add_accept_encoding_header,
- get_redirect_method,
- make_socks_proxy_opts,
- select_proxy,
- ssl_load_certs,
- )
- from yt_dlp.networking.exceptions import (
- HTTPError,
- IncompleteRead,
- _CompatHTTPError,
- )
- from yt_dlp.socks import ProxyType
- from yt_dlp.utils.networking import HTTPHeaderDict
- TEST_DIR = os.path.dirname(os.path.abspath(__file__))
- class TestNetworkingUtils:
- def test_select_proxy(self):
- proxies = {
- 'all': 'socks5://example.com',
- 'http': 'http://example.com:1080',
- 'no': 'bypass.example.com,yt-dl.org'
- }
- assert select_proxy('https://example.com', proxies) == proxies['all']
- assert select_proxy('http://example.com', proxies) == proxies['http']
- assert select_proxy('http://bypass.example.com', proxies) is None
- assert select_proxy('https://yt-dl.org', proxies) is None
- @pytest.mark.parametrize('socks_proxy,expected', [
- ('socks5h://example.com', {
- 'proxytype': ProxyType.SOCKS5,
- 'addr': 'example.com',
- 'port': 1080,
- 'rdns': True,
- 'username': None,
- 'password': None
- }),
- ('socks5://user:@example.com:5555', {
- 'proxytype': ProxyType.SOCKS5,
- 'addr': 'example.com',
- 'port': 5555,
- 'rdns': False,
- 'username': 'user',
- 'password': ''
- }),
- ('socks4://u%40ser:pa%20ss@', {
- 'proxytype': ProxyType.SOCKS4,
- 'addr': '',
- 'port': 1080,
- 'rdns': False,
- 'username': 'u@ser',
- 'password': 'pa ss'
- }),
- ('socks4a://:pa%20ss@', {
- 'proxytype': ProxyType.SOCKS4A,
- 'addr': '',
- 'port': 1080,
- 'rdns': True,
- 'username': '',
- 'password': 'pa ss'
- })
- ])
- def test_make_socks_proxy_opts(self, socks_proxy, expected):
- assert make_socks_proxy_opts(socks_proxy) == expected
- def test_make_socks_proxy_unknown(self):
- with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
- make_socks_proxy_opts('socks://')
- @pytest.mark.skipif(not certifi, reason='certifi is not installed')
- def test_load_certifi(self):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- ssl_load_certs(context, use_certifi=True)
- context2.load_verify_locations(cafile=certifi.where())
- assert context.get_ca_certs() == context2.get_ca_certs()
- context3 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- ssl_load_certs(context3, use_certifi=False)
- assert context3.get_ca_certs() != context.get_ca_certs()
- @pytest.mark.parametrize('method,status,expected', [
- ('GET', 303, 'GET'),
- ('HEAD', 303, 'HEAD'),
- ('PUT', 303, 'GET'),
- ('POST', 301, 'GET'),
- ('HEAD', 301, 'HEAD'),
- ('POST', 302, 'GET'),
- ('HEAD', 302, 'HEAD'),
- ('PUT', 302, 'PUT'),
- ('POST', 308, 'POST'),
- ('POST', 307, 'POST'),
- ('HEAD', 308, 'HEAD'),
- ('HEAD', 307, 'HEAD'),
- ])
- def test_get_redirect_method(self, method, status, expected):
- assert get_redirect_method(method, status) == expected
- @pytest.mark.parametrize('headers,supported_encodings,expected', [
- ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
- ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
- ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
- ])
- def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
- headers = HTTPHeaderDict(headers)
- add_accept_encoding_header(headers, supported_encodings)
- assert headers == HTTPHeaderDict(expected)
- class TestInstanceStoreMixin:
- class FakeInstanceStoreMixin(InstanceStoreMixin):
- def _create_instance(self, **kwargs):
- return random.randint(0, 1000000)
- def _close_instance(self, instance):
- pass
- def test_mixin(self):
- mixin = self.FakeInstanceStoreMixin()
- assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
- assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
- assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
- assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
- assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
- cookiejar = YoutubeDLCookieJar()
- assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
- assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
- assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
- m = mixin._get_instance(t=1234)
- assert mixin._get_instance(t=1234) == m
- mixin._clear_instances()
- assert mixin._get_instance(t=1234) != m
- class TestNetworkingExceptions:
- @staticmethod
- def create_response(status):
- return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
- @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
- def test_http_error(self, http_error_class):
- response = self.create_response(403)
- error = http_error_class(response)
- assert error.status == 403
- assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
- assert error.reason == response.reason
- assert error.response is response
- data = error.response.read()
- assert data == b'test'
- assert repr(error) == '<HTTPError 403: Forbidden>'
- @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
- def test_redirect_http_error(self, http_error_class):
- response = self.create_response(301)
- error = http_error_class(response, redirect_loop=True)
- assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
- assert error.reason == 'Moved Permanently'
- def test_compat_http_error(self):
- response = self.create_response(403)
- error = _CompatHTTPError(HTTPError(response))
- assert isinstance(error, HTTPError)
- assert isinstance(error, urllib.error.HTTPError)
- @contextlib.contextmanager
- def raises_deprecation_warning():
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always')
- yield
- if len(w) == 0:
- pytest.fail('Did not raise DeprecationWarning')
- if len(w) > 1:
- pytest.fail(f'Raised multiple warnings: {w}')
- if not issubclass(w[-1].category, DeprecationWarning):
- pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
- w.clear()
- with raises_deprecation_warning():
- assert error.code == 403
- with raises_deprecation_warning():
- assert error.getcode() == 403
- with raises_deprecation_warning():
- assert error.hdrs is error.response.headers
- with raises_deprecation_warning():
- assert error.info() is error.response.headers
- with raises_deprecation_warning():
- assert error.headers is error.response.headers
- with raises_deprecation_warning():
- assert error.filename == error.response.url
- with raises_deprecation_warning():
- assert error.url == error.response.url
- with raises_deprecation_warning():
- assert error.geturl() == error.response.url
- with raises_deprecation_warning():
- assert error.read() == b'test'
- with raises_deprecation_warning():
- assert not error.closed
- with raises_deprecation_warning():
- assert error.get_header('test') == 'test'
- error.close()
- @pytest.mark.skipif(
- platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
- def test_compat_http_error_autoclose(self):
- response = self.create_response(403)
- _CompatHTTPError(HTTPError(response))
- assert not response.closed
- def test_incomplete_read_error(self):
- error = IncompleteRead(b'test', 3, cause='test')
- assert isinstance(error, IncompleteRead)
- assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
- assert str(error) == error.msg == '4 bytes read, 3 more expected'
- assert error.partial == b'test'
- assert error.expected == 3
- assert error.cause == 'test'
- error = IncompleteRead(b'aaa')
- assert repr(error) == '<IncompleteRead: 3 bytes read>'
- assert str(error) == '3 bytes read'