_websockets.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from __future__ import annotations
  2. import contextlib
  3. import functools
  4. import io
  5. import logging
  6. import ssl
  7. import sys
  8. from ._helper import (
  9. create_connection,
  10. create_socks_proxy_socket,
  11. make_socks_proxy_opts,
  12. select_proxy,
  13. )
  14. from .common import Features, Response, register_rh
  15. from .exceptions import (
  16. CertificateVerifyError,
  17. HTTPError,
  18. ProxyError,
  19. RequestError,
  20. SSLError,
  21. TransportError,
  22. )
  23. from .websocket import WebSocketRequestHandler, WebSocketResponse
  24. from ..dependencies import websockets
  25. from ..socks import ProxyError as SocksProxyError
  26. from ..utils import int_or_none
  27. if not websockets:
  28. raise ImportError('websockets is not installed')
  29. import websockets.version
  30. websockets_version = tuple(map(int_or_none, websockets.version.version.split('.')))
  31. if websockets_version < (13, 0):
  32. raise ImportError('Only websockets>=13.0 is supported')
  33. import websockets.sync.client
  34. from websockets.uri import parse_uri
  35. # In websockets Connection, recv_exc and recv_events_exc are defined
  36. # after the recv events handler thread is started [1].
  37. # On our CI using PyPy, in some cases a race condition may occur
  38. # where the recv events handler thread tries to use these attributes before they are defined [2].
  39. # 1: https://github.com/python-websockets/websockets/blame/de768cf65e7e2b1a3b67854fb9e08816a5ff7050/src/websockets/sync/connection.py#L93
  40. # 2: "AttributeError: 'ClientConnection' object has no attribute 'recv_events_exc'. Did you mean: 'recv_events'?"
  41. import websockets.sync.connection # isort: split
  42. with contextlib.suppress(Exception):
  43. websockets.sync.connection.Connection.recv_exc = None
  44. class WebsocketsResponseAdapter(WebSocketResponse):
  45. def __init__(self, ws: websockets.sync.client.ClientConnection, url):
  46. super().__init__(
  47. fp=io.BytesIO(ws.response.body or b''),
  48. url=url,
  49. headers=ws.response.headers,
  50. status=ws.response.status_code,
  51. reason=ws.response.reason_phrase,
  52. )
  53. self._ws = ws
  54. def close(self):
  55. self._ws.close()
  56. super().close()
  57. def send(self, message):
  58. # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.send
  59. try:
  60. return self._ws.send(message)
  61. except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
  62. raise TransportError(cause=e) from e
  63. except SocksProxyError as e:
  64. raise ProxyError(cause=e) from e
  65. except TypeError as e:
  66. raise RequestError(cause=e) from e
  67. def recv(self):
  68. # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv
  69. try:
  70. return self._ws.recv()
  71. except SocksProxyError as e:
  72. raise ProxyError(cause=e) from e
  73. except (websockets.exceptions.WebSocketException, RuntimeError, TimeoutError) as e:
  74. raise TransportError(cause=e) from e
  75. @register_rh
  76. class WebsocketsRH(WebSocketRequestHandler):
  77. """
  78. Websockets request handler
  79. https://websockets.readthedocs.io
  80. https://github.com/python-websockets/websockets
  81. """
  82. _SUPPORTED_URL_SCHEMES = ('wss', 'ws')
  83. _SUPPORTED_PROXY_SCHEMES = ('socks4', 'socks4a', 'socks5', 'socks5h')
  84. _SUPPORTED_FEATURES = (Features.ALL_PROXY, Features.NO_PROXY)
  85. RH_NAME = 'websockets'
  86. def __init__(self, *args, **kwargs):
  87. super().__init__(*args, **kwargs)
  88. self.__logging_handlers = {}
  89. for name in ('websockets.client', 'websockets.server'):
  90. logger = logging.getLogger(name)
  91. handler = logging.StreamHandler(stream=sys.stdout)
  92. handler.setFormatter(logging.Formatter(f'{self.RH_NAME}: %(message)s'))
  93. self.__logging_handlers[name] = handler
  94. logger.addHandler(handler)
  95. if self.verbose:
  96. logger.setLevel(logging.DEBUG)
  97. def _check_extensions(self, extensions):
  98. super()._check_extensions(extensions)
  99. extensions.pop('timeout', None)
  100. extensions.pop('cookiejar', None)
  101. extensions.pop('legacy_ssl', None)
  102. def close(self):
  103. # Remove the logging handler that contains a reference to our logger
  104. # See: https://github.com/yt-dlp/yt-dlp/issues/8922
  105. for name, handler in self.__logging_handlers.items():
  106. logging.getLogger(name).removeHandler(handler)
  107. def _send(self, request):
  108. timeout = self._calculate_timeout(request)
  109. headers = self._merge_headers(request.headers)
  110. if 'cookie' not in headers:
  111. cookiejar = self._get_cookiejar(request)
  112. cookie_header = cookiejar.get_cookie_header(request.url)
  113. if cookie_header:
  114. headers['cookie'] = cookie_header
  115. wsuri = parse_uri(request.url)
  116. create_conn_kwargs = {
  117. 'source_address': (self.source_address, 0) if self.source_address else None,
  118. 'timeout': timeout,
  119. }
  120. proxy = select_proxy(request.url, self._get_proxies(request))
  121. try:
  122. if proxy:
  123. socks_proxy_options = make_socks_proxy_opts(proxy)
  124. sock = create_connection(
  125. address=(socks_proxy_options['addr'], socks_proxy_options['port']),
  126. _create_socket_func=functools.partial(
  127. create_socks_proxy_socket, (wsuri.host, wsuri.port), socks_proxy_options),
  128. **create_conn_kwargs,
  129. )
  130. else:
  131. sock = create_connection(
  132. address=(wsuri.host, wsuri.port),
  133. **create_conn_kwargs,
  134. )
  135. ssl_ctx = self._make_sslcontext(legacy_ssl_support=request.extensions.get('legacy_ssl'))
  136. conn = websockets.sync.client.connect(
  137. sock=sock,
  138. uri=request.url,
  139. additional_headers=headers,
  140. open_timeout=timeout,
  141. user_agent_header=None,
  142. ssl=ssl_ctx if wsuri.secure else None,
  143. close_timeout=0, # not ideal, but prevents yt-dlp hanging
  144. )
  145. return WebsocketsResponseAdapter(conn, url=request.url)
  146. # Exceptions as per https://websockets.readthedocs.io/en/stable/reference/sync/client.html
  147. except SocksProxyError as e:
  148. raise ProxyError(cause=e) from e
  149. except websockets.exceptions.InvalidURI as e:
  150. raise RequestError(cause=e) from e
  151. except ssl.SSLCertVerificationError as e:
  152. raise CertificateVerifyError(cause=e) from e
  153. except ssl.SSLError as e:
  154. raise SSLError(cause=e) from e
  155. except websockets.exceptions.InvalidStatus as e:
  156. raise HTTPError(
  157. Response(
  158. fp=io.BytesIO(e.response.body),
  159. url=request.url,
  160. headers=e.response.headers,
  161. status=e.response.status_code,
  162. reason=e.response.reason_phrase),
  163. ) from e
  164. except (OSError, TimeoutError, websockets.exceptions.WebSocketException) as e:
  165. raise TransportError(cause=e) from e