123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- from __future__ import annotations
- import io
- import math
- import re
- import urllib.parse
- from ._helper import InstanceStoreMixin, select_proxy
- from .common import (
- Features,
- Request,
- Response,
- register_preference,
- register_rh,
- )
- from .exceptions import (
- CertificateVerifyError,
- HTTPError,
- IncompleteRead,
- ProxyError,
- SSLError,
- TransportError,
- )
- from .impersonate import ImpersonateRequestHandler, ImpersonateTarget
- from ..dependencies import curl_cffi, certifi
- from ..utils import int_or_none
- if curl_cffi is None:
- raise ImportError('curl_cffi is not installed')
- curl_cffi_version = tuple(map(int, re.split(r'[^\d]+', curl_cffi.__version__)[:3]))
- if curl_cffi_version != (0, 5, 10) and not ((0, 7, 0) <= curl_cffi_version < (0, 7, 2)):
- curl_cffi._yt_dlp__version = f'{curl_cffi.__version__} (unsupported)'
- raise ImportError('Only curl_cffi versions 0.5.10, 0.7.0 and 0.7.1 are supported')
- import curl_cffi.requests
- from curl_cffi.const import CurlECode, CurlOpt
- class CurlCFFIResponseReader(io.IOBase):
- def __init__(self, response: curl_cffi.requests.Response):
- self._response = response
- self._iterator = response.iter_content()
- self._buffer = b''
- self.bytes_read = 0
- def readable(self):
- return True
- def read(self, size=None):
- exception_raised = True
- try:
- while self._iterator and (size is None or len(self._buffer) < size):
- chunk = next(self._iterator, None)
- if chunk is None:
- self._iterator = None
- break
- self._buffer += chunk
- self.bytes_read += len(chunk)
- if size is None:
- size = len(self._buffer)
- data = self._buffer[:size]
- self._buffer = self._buffer[size:]
- # "free" the curl instance if the response is fully read.
- # curl_cffi doesn't do this automatically and only allows one open response per thread
- if not self._iterator and not self._buffer:
- self.close()
- exception_raised = False
- return data
- finally:
- if exception_raised:
- self.close()
- def close(self):
- if not self.closed:
- self._response.close()
- self._buffer = b''
- super().close()
- class CurlCFFIResponseAdapter(Response):
- fp: CurlCFFIResponseReader
- def __init__(self, response: curl_cffi.requests.Response):
- super().__init__(
- fp=CurlCFFIResponseReader(response),
- headers=response.headers,
- url=response.url,
- status=response.status_code)
- def read(self, amt=None):
- try:
- return self.fp.read(amt)
- except curl_cffi.requests.errors.RequestsError as e:
- if e.code == CurlECode.PARTIAL_FILE:
- content_length = int_or_none(e.response.headers.get('Content-Length'))
- raise IncompleteRead(
- partial=self.fp.bytes_read,
- expected=content_length - self.fp.bytes_read if content_length is not None else None,
- cause=e) from e
- raise TransportError(cause=e) from e
- @register_rh
- class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
- RH_NAME = 'curl_cffi'
- _SUPPORTED_URL_SCHEMES = ('http', 'https')
- _SUPPORTED_FEATURES = (Features.NO_PROXY, Features.ALL_PROXY)
- _SUPPORTED_PROXY_SCHEMES = ('http', 'https', 'socks4', 'socks4a', 'socks5', 'socks5h')
- _SUPPORTED_IMPERSONATE_TARGET_MAP = {
- **({
- ImpersonateTarget('chrome', '124', 'macos', '14'): curl_cffi.requests.BrowserType.chrome124,
- ImpersonateTarget('chrome', '123', 'macos', '14'): curl_cffi.requests.BrowserType.chrome123,
- ImpersonateTarget('chrome', '120', 'macos', '14'): curl_cffi.requests.BrowserType.chrome120,
- ImpersonateTarget('chrome', '119', 'macos', '14'): curl_cffi.requests.BrowserType.chrome119,
- ImpersonateTarget('chrome', '116', 'windows', '10'): curl_cffi.requests.BrowserType.chrome116,
- } if curl_cffi_version >= (0, 7, 0) else {}),
- ImpersonateTarget('chrome', '110', 'windows', '10'): curl_cffi.requests.BrowserType.chrome110,
- ImpersonateTarget('chrome', '107', 'windows', '10'): curl_cffi.requests.BrowserType.chrome107,
- ImpersonateTarget('chrome', '104', 'windows', '10'): curl_cffi.requests.BrowserType.chrome104,
- ImpersonateTarget('chrome', '101', 'windows', '10'): curl_cffi.requests.BrowserType.chrome101,
- ImpersonateTarget('chrome', '100', 'windows', '10'): curl_cffi.requests.BrowserType.chrome100,
- ImpersonateTarget('chrome', '99', 'windows', '10'): curl_cffi.requests.BrowserType.chrome99,
- ImpersonateTarget('edge', '101', 'windows', '10'): curl_cffi.requests.BrowserType.edge101,
- ImpersonateTarget('edge', '99', 'windows', '10'): curl_cffi.requests.BrowserType.edge99,
- **({
- ImpersonateTarget('safari', '17.0', 'macos', '14'): curl_cffi.requests.BrowserType.safari17_0,
- } if curl_cffi_version >= (0, 7, 0) else {}),
- ImpersonateTarget('safari', '15.5', 'macos', '12'): curl_cffi.requests.BrowserType.safari15_5,
- ImpersonateTarget('safari', '15.3', 'macos', '11'): curl_cffi.requests.BrowserType.safari15_3,
- ImpersonateTarget('chrome', '99', 'android', '12'): curl_cffi.requests.BrowserType.chrome99_android,
- **({
- ImpersonateTarget('safari', '17.2', 'ios', '17.2'): curl_cffi.requests.BrowserType.safari17_2_ios,
- } if curl_cffi_version >= (0, 7, 0) else {}),
- }
- def _create_instance(self, cookiejar=None):
- return curl_cffi.requests.Session(cookies=cookiejar)
- def _check_extensions(self, extensions):
- super()._check_extensions(extensions)
- extensions.pop('impersonate', None)
- extensions.pop('cookiejar', None)
- extensions.pop('timeout', None)
- # CurlCFFIRH ignores legacy ssl options currently.
- # Impersonation generally uses a looser SSL configuration than urllib/requests.
- extensions.pop('legacy_ssl', None)
- def send(self, request: Request) -> Response:
- target = self._get_request_target(request)
- try:
- response = super().send(request)
- except HTTPError as e:
- e.response.extensions['impersonate'] = target
- raise
- response.extensions['impersonate'] = target
- return response
- def _send(self, request: Request):
- max_redirects_exceeded = False
- session: curl_cffi.requests.Session = self._get_instance(
- cookiejar=self._get_cookiejar(request) if 'cookie' not in request.headers else None)
- if self.verbose:
- session.curl.setopt(CurlOpt.VERBOSE, 1)
- proxies = self._get_proxies(request)
- if 'no' in proxies:
- session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
- proxies.pop('no', None)
- # curl doesn't support per protocol proxies, so we select the one that matches the request protocol
- proxy = select_proxy(request.url, proxies=proxies)
- if proxy:
- session.curl.setopt(CurlOpt.PROXY, proxy)
- scheme = urllib.parse.urlparse(request.url).scheme.lower()
- if scheme != 'http':
- # Enable HTTP CONNECT for HTTPS urls.
- # Don't use CONNECT for http for compatibility with urllib behaviour.
- # See: https://curl.se/libcurl/c/CURLOPT_HTTPPROXYTUNNEL.html
- session.curl.setopt(CurlOpt.HTTPPROXYTUNNEL, 1)
- # curl_cffi does not currently set these for proxies
- session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where())
- if not self.verify:
- session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0)
- session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0)
- headers = self._get_impersonate_headers(request)
- if self._client_cert:
- session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
- client_certificate_key = self._client_cert.get('client_certificate_key')
- client_certificate_password = self._client_cert.get('client_certificate_password')
- if client_certificate_key:
- session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
- if client_certificate_password:
- session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
- timeout = self._calculate_timeout(request)
- # set CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME to act as a read timeout. [1]
- # This is required only for 0.5.10 [2]
- # Note: CURLOPT_LOW_SPEED_TIME is in seconds, so we need to round up to the nearest second. [3]
- # [1] https://unix.stackexchange.com/a/305311
- # [2] https://github.com/yifeikong/curl_cffi/issues/156
- # [3] https://curl.se/libcurl/c/CURLOPT_LOW_SPEED_TIME.html
- session.curl.setopt(CurlOpt.LOW_SPEED_LIMIT, 1) # 1 byte per second
- session.curl.setopt(CurlOpt.LOW_SPEED_TIME, math.ceil(timeout))
- try:
- curl_response = session.request(
- method=request.method,
- url=request.url,
- headers=headers,
- data=request.data,
- verify=self.verify,
- max_redirects=5,
- timeout=(timeout, timeout),
- impersonate=self._SUPPORTED_IMPERSONATE_TARGET_MAP.get(
- self._get_request_target(request)),
- interface=self.source_address,
- stream=True,
- )
- except curl_cffi.requests.errors.RequestsError as e:
- if e.code == CurlECode.PEER_FAILED_VERIFICATION:
- raise CertificateVerifyError(cause=e) from e
- elif e.code == CurlECode.SSL_CONNECT_ERROR:
- raise SSLError(cause=e) from e
- elif e.code == CurlECode.TOO_MANY_REDIRECTS:
- max_redirects_exceeded = True
- curl_response = e.response
- elif (
- e.code == CurlECode.PROXY
- or (e.code == CurlECode.RECV_ERROR and 'CONNECT' in str(e))
- ):
- raise ProxyError(cause=e) from e
- else:
- raise TransportError(cause=e) from e
- response = CurlCFFIResponseAdapter(curl_response)
- if not 200 <= response.status < 300:
- raise HTTPError(response, redirect_loop=max_redirects_exceeded)
- return response
- @register_preference(CurlCFFIRH)
- def curl_cffi_preference(rh, request):
- return -100
|