123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- from __future__ import annotations
- import contextlib
- import functools
- import os
- import socket
- import ssl
- import sys
- import typing
- import urllib.parse
- import urllib.request
- from .exceptions import RequestError
- from ..dependencies import certifi
- from ..socks import ProxyType, sockssocket
- from ..utils import format_field, traverse_obj
- if typing.TYPE_CHECKING:
- from collections.abc import Iterable
- from ..utils.networking import HTTPHeaderDict
- def ssl_load_certs(context: ssl.SSLContext, use_certifi=True):
- if certifi and use_certifi:
- context.load_verify_locations(cafile=certifi.where())
- else:
- try:
- context.load_default_certs()
- # Work around the issue in load_default_certs when there are bad certificates. See:
- # https://github.com/yt-dlp/yt-dlp/issues/1060,
- # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
- except ssl.SSLError:
- # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
- if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
- for storename in ('CA', 'ROOT'):
- ssl_load_windows_store_certs(context, storename)
- context.set_default_verify_paths()
- def ssl_load_windows_store_certs(ssl_context, storename):
- # Code adapted from _load_windows_store_certs in https://github.com/python/cpython/blob/main/Lib/ssl.py
- try:
- certs = [cert for cert, encoding, trust in ssl.enum_certificates(storename)
- if encoding == 'x509_asn' and (
- trust is True or ssl.Purpose.SERVER_AUTH.oid in trust)]
- except PermissionError:
- return
- for cert in certs:
- with contextlib.suppress(ssl.SSLError):
- ssl_context.load_verify_locations(cadata=cert)
- def make_socks_proxy_opts(socks_proxy):
- url_components = urllib.parse.urlparse(socks_proxy)
- if url_components.scheme.lower() == 'socks5':
- socks_type = ProxyType.SOCKS5
- rdns = False
- elif url_components.scheme.lower() == 'socks5h':
- socks_type = ProxyType.SOCKS5
- rdns = True
- elif url_components.scheme.lower() == 'socks4':
- socks_type = ProxyType.SOCKS4
- rdns = False
- elif url_components.scheme.lower() == 'socks4a':
- socks_type = ProxyType.SOCKS4A
- rdns = True
- else:
- raise ValueError(f'Unknown SOCKS proxy version: {url_components.scheme.lower()}')
- def unquote_if_non_empty(s):
- if not s:
- return s
- return urllib.parse.unquote_plus(s)
- return {
- 'proxytype': socks_type,
- 'addr': url_components.hostname,
- 'port': url_components.port or 1080,
- 'rdns': rdns,
- 'username': unquote_if_non_empty(url_components.username),
- 'password': unquote_if_non_empty(url_components.password),
- }
- def select_proxy(url, proxies):
- """Unified proxy selector for all backends"""
- url_components = urllib.parse.urlparse(url)
- if 'no' in proxies:
- hostport = url_components.hostname + format_field(url_components.port, None, ':%s')
- if urllib.request.proxy_bypass_environment(hostport, {'no': proxies['no']}):
- return
- elif urllib.request.proxy_bypass(hostport): # check system settings
- return
- return traverse_obj(proxies, url_components.scheme or 'http', 'all')
- def get_redirect_method(method, status):
- """Unified redirect method handling"""
- # A 303 must either use GET or HEAD for subsequent request
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.4
- if status == 303 and method != 'HEAD':
- method = 'GET'
- # 301 and 302 redirects are commonly turned into a GET from a POST
- # for subsequent requests by browsers, so we'll do the same.
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.2
- # https://datatracker.ietf.org/doc/html/rfc7231#section-6.4.3
- if status in (301, 302) and method == 'POST':
- method = 'GET'
- return method
- def make_ssl_context(
- verify=True,
- client_certificate=None,
- client_certificate_key=None,
- client_certificate_password=None,
- legacy_support=False,
- use_certifi=True,
- ):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- context.check_hostname = verify
- context.verify_mode = ssl.CERT_REQUIRED if verify else ssl.CERT_NONE
- # OpenSSL 1.1.1+ Python 3.8+ keylog file
- if hasattr(context, 'keylog_filename'):
- context.keylog_filename = os.environ.get('SSLKEYLOGFILE') or None
- # Some servers may reject requests if ALPN extension is not sent. See:
- # https://github.com/python/cpython/issues/85140
- # https://github.com/yt-dlp/yt-dlp/issues/3878
- with contextlib.suppress(NotImplementedError):
- context.set_alpn_protocols(['http/1.1'])
- if verify:
- ssl_load_certs(context, use_certifi)
- if legacy_support:
- context.options |= 4 # SSL_OP_LEGACY_SERVER_CONNECT
- context.set_ciphers('DEFAULT') # compat
- elif ssl.OPENSSL_VERSION_INFO >= (1, 1, 1) and not ssl.OPENSSL_VERSION.startswith('LibreSSL'):
- # Use the default SSL ciphers and minimum TLS version settings from Python 3.10 [1].
- # This is to ensure consistent behavior across Python versions and libraries, and help avoid fingerprinting
- # in some situations [2][3].
- # Python 3.10 only supports OpenSSL 1.1.1+ [4]. Because this change is likely
- # untested on older versions, we only apply this to OpenSSL 1.1.1+ to be safe.
- # LibreSSL is excluded until further investigation due to cipher support issues [5][6].
- # 1. https://github.com/python/cpython/commit/e983252b516edb15d4338b0a47631b59ef1e2536
- # 2. https://github.com/yt-dlp/yt-dlp/issues/4627
- # 3. https://github.com/yt-dlp/yt-dlp/pull/5294
- # 4. https://peps.python.org/pep-0644/
- # 5. https://peps.python.org/pep-0644/#libressl-support
- # 6. https://github.com/yt-dlp/yt-dlp/commit/5b9f253fa0aee996cf1ed30185d4b502e00609c4#commitcomment-89054368
- context.set_ciphers(
- '@SECLEVEL=2:ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES:DHE+AES:!aNULL:!eNULL:!aDSS:!SHA1:!AESCCM')
- context.minimum_version = ssl.TLSVersion.TLSv1_2
- if client_certificate:
- try:
- context.load_cert_chain(
- client_certificate, keyfile=client_certificate_key,
- password=client_certificate_password)
- except ssl.SSLError:
- raise RequestError('Unable to load client certificate')
- if getattr(context, 'post_handshake_auth', None) is not None:
- context.post_handshake_auth = True
- return context
- class InstanceStoreMixin:
- def __init__(self, **kwargs):
- self.__instances = []
- super().__init__(**kwargs) # So that both MRO works
- @staticmethod
- def _create_instance(**kwargs):
- raise NotImplementedError
- def _get_instance(self, **kwargs):
- for key, instance in self.__instances:
- if key == kwargs:
- return instance
- instance = self._create_instance(**kwargs)
- self.__instances.append((kwargs, instance))
- return instance
- def _close_instance(self, instance):
- if callable(getattr(instance, 'close', None)):
- instance.close()
- def _clear_instances(self):
- for _, instance in self.__instances:
- self._close_instance(instance)
- self.__instances.clear()
- def add_accept_encoding_header(headers: HTTPHeaderDict, supported_encodings: Iterable[str]):
- if 'Accept-Encoding' not in headers:
- headers['Accept-Encoding'] = ', '.join(supported_encodings) or 'identity'
- def wrap_request_errors(func):
- @functools.wraps(func)
- def wrapper(self, *args, **kwargs):
- try:
- return func(self, *args, **kwargs)
- except RequestError as e:
- if e.handler is None:
- e.handler = self
- raise
- return wrapper
- def _socket_connect(ip_addr, timeout, source_address):
- af, socktype, proto, canonname, sa = ip_addr
- sock = socket.socket(af, socktype, proto)
- try:
- if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
- sock.settimeout(timeout)
- if source_address:
- sock.bind(source_address)
- sock.connect(sa)
- return sock
- except OSError:
- sock.close()
- raise
- def create_socks_proxy_socket(dest_addr, proxy_args, proxy_ip_addr, timeout, source_address):
- af, socktype, proto, canonname, sa = proxy_ip_addr
- sock = sockssocket(af, socktype, proto)
- try:
- connect_proxy_args = proxy_args.copy()
- connect_proxy_args.update({'addr': sa[0], 'port': sa[1]})
- sock.setproxy(**connect_proxy_args)
- if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
- sock.settimeout(timeout)
- if source_address:
- sock.bind(source_address)
- sock.connect(dest_addr)
- return sock
- except OSError:
- sock.close()
- raise
- def create_connection(
- address,
- timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
- source_address=None,
- *,
- _create_socket_func=_socket_connect,
- ):
- # Work around socket.create_connection() which tries all addresses from getaddrinfo() including IPv6.
- # This filters the addresses based on the given source_address.
- # Based on: https://github.com/python/cpython/blob/main/Lib/socket.py#L810
- host, port = address
- ip_addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
- if not ip_addrs:
- raise OSError('getaddrinfo returns an empty list')
- if source_address is not None:
- af = socket.AF_INET if ':' not in source_address[0] else socket.AF_INET6
- ip_addrs = [addr for addr in ip_addrs if addr[0] == af]
- if not ip_addrs:
- raise OSError(
- f'No remote IPv{4 if af == socket.AF_INET else 6} addresses available for connect. '
- f'Can\'t use "{source_address[0]}" as source address')
- err = None
- for ip_addr in ip_addrs:
- try:
- sock = _create_socket_func(ip_addr, timeout, source_address)
- # Explicitly break __traceback__ reference cycle
- # https://bugs.python.org/issue36820
- err = None
- return sock
- except OSError as e:
- err = e
- try:
- raise err
- finally:
- # Explicitly break __traceback__ reference cycle
- # https://bugs.python.org/issue36820
- err = None
|