test_websockets.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import pytest
  6. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. import http.client
  8. import http.cookiejar
  9. import http.server
  10. import json
  11. import random
  12. import ssl
  13. import threading
  14. from yt_dlp import socks
  15. from yt_dlp.cookies import YoutubeDLCookieJar
  16. from yt_dlp.dependencies import websockets
  17. from yt_dlp.networking import Request
  18. from yt_dlp.networking.exceptions import (
  19. CertificateVerifyError,
  20. HTTPError,
  21. ProxyError,
  22. RequestError,
  23. SSLError,
  24. TransportError,
  25. )
  26. from yt_dlp.utils.networking import HTTPHeaderDict
  27. from test.conftest import validate_and_send
  28. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  29. def websocket_handler(websocket):
  30. for message in websocket:
  31. if isinstance(message, bytes):
  32. if message == b'bytes':
  33. return websocket.send('2')
  34. elif isinstance(message, str):
  35. if message == 'headers':
  36. return websocket.send(json.dumps(dict(websocket.request.headers)))
  37. elif message == 'path':
  38. return websocket.send(websocket.request.path)
  39. elif message == 'source_address':
  40. return websocket.send(websocket.remote_address[0])
  41. elif message == 'str':
  42. return websocket.send('1')
  43. return websocket.send(message)
  44. def process_request(self, request):
  45. if request.path.startswith('/gen_'):
  46. status = http.HTTPStatus(int(request.path[5:]))
  47. if 300 <= status.value <= 300:
  48. return websockets.http11.Response(
  49. status.value, status.phrase, websockets.datastructures.Headers([('Location', '/')]), b'')
  50. return self.protocol.reject(status.value, status.phrase)
  51. return self.protocol.accept(request)
  52. def create_websocket_server(**ws_kwargs):
  53. import websockets.sync.server
  54. wsd = websockets.sync.server.serve(websocket_handler, '127.0.0.1', 0, process_request=process_request, **ws_kwargs)
  55. ws_port = wsd.socket.getsockname()[1]
  56. ws_server_thread = threading.Thread(target=wsd.serve_forever)
  57. ws_server_thread.daemon = True
  58. ws_server_thread.start()
  59. return ws_server_thread, ws_port
  60. def create_ws_websocket_server():
  61. return create_websocket_server()
  62. def create_wss_websocket_server():
  63. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  64. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  65. sslctx.load_cert_chain(certfn, None)
  66. return create_websocket_server(ssl_context=sslctx)
  67. MTLS_CERT_DIR = os.path.join(TEST_DIR, 'testdata', 'certificate')
  68. def create_mtls_wss_websocket_server():
  69. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  70. cacertfn = os.path.join(MTLS_CERT_DIR, 'ca.crt')
  71. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  72. sslctx.verify_mode = ssl.CERT_REQUIRED
  73. sslctx.load_verify_locations(cafile=cacertfn)
  74. sslctx.load_cert_chain(certfn, None)
  75. return create_websocket_server(ssl_context=sslctx)
  76. @pytest.mark.skipif(not websockets, reason='websockets must be installed to test websocket request handlers')
  77. class TestWebsSocketRequestHandlerConformance:
  78. @classmethod
  79. def setup_class(cls):
  80. cls.ws_thread, cls.ws_port = create_ws_websocket_server()
  81. cls.ws_base_url = f'ws://127.0.0.1:{cls.ws_port}'
  82. cls.wss_thread, cls.wss_port = create_wss_websocket_server()
  83. cls.wss_base_url = f'wss://127.0.0.1:{cls.wss_port}'
  84. cls.bad_wss_thread, cls.bad_wss_port = create_websocket_server(ssl_context=ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER))
  85. cls.bad_wss_host = f'wss://127.0.0.1:{cls.bad_wss_port}'
  86. cls.mtls_wss_thread, cls.mtls_wss_port = create_mtls_wss_websocket_server()
  87. cls.mtls_wss_base_url = f'wss://127.0.0.1:{cls.mtls_wss_port}'
  88. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  89. def test_basic_websockets(self, handler):
  90. with handler() as rh:
  91. ws = validate_and_send(rh, Request(self.ws_base_url))
  92. assert 'upgrade' in ws.headers
  93. assert ws.status == 101
  94. ws.send('foo')
  95. assert ws.recv() == 'foo'
  96. ws.close()
  97. # https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
  98. @pytest.mark.parametrize('msg,opcode', [('str', 1), (b'bytes', 2)])
  99. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  100. def test_send_types(self, handler, msg, opcode):
  101. with handler() as rh:
  102. ws = validate_and_send(rh, Request(self.ws_base_url))
  103. ws.send(msg)
  104. assert int(ws.recv()) == opcode
  105. ws.close()
  106. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  107. def test_verify_cert(self, handler):
  108. with handler() as rh:
  109. with pytest.raises(CertificateVerifyError):
  110. validate_and_send(rh, Request(self.wss_base_url))
  111. with handler(verify=False) as rh:
  112. ws = validate_and_send(rh, Request(self.wss_base_url))
  113. assert ws.status == 101
  114. ws.close()
  115. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  116. def test_ssl_error(self, handler):
  117. with handler(verify=False) as rh:
  118. with pytest.raises(SSLError, match='sslv3 alert handshake failure') as exc_info:
  119. validate_and_send(rh, Request(self.bad_wss_host))
  120. assert not issubclass(exc_info.type, CertificateVerifyError)
  121. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  122. @pytest.mark.parametrize('path,expected', [
  123. # Unicode characters should be encoded with uppercase percent-encoding
  124. ('/中文', '/%E4%B8%AD%E6%96%87'),
  125. # don't normalize existing percent encodings
  126. ('/%c7%9f', '/%c7%9f'),
  127. ])
  128. def test_percent_encode(self, handler, path, expected):
  129. with handler() as rh:
  130. ws = validate_and_send(rh, Request(f'{self.ws_base_url}{path}'))
  131. ws.send('path')
  132. assert ws.recv() == expected
  133. assert ws.status == 101
  134. ws.close()
  135. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  136. def test_remove_dot_segments(self, handler):
  137. with handler() as rh:
  138. # This isn't a comprehensive test,
  139. # but it should be enough to check whether the handler is removing dot segments
  140. ws = validate_and_send(rh, Request(f'{self.ws_base_url}/a/b/./../../test'))
  141. assert ws.status == 101
  142. ws.send('path')
  143. assert ws.recv() == '/test'
  144. ws.close()
  145. # We are restricted to known HTTP status codes in http.HTTPStatus
  146. # Redirects are not supported for websockets
  147. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  148. @pytest.mark.parametrize('status', (200, 204, 301, 302, 303, 400, 500, 511))
  149. def test_raise_http_error(self, handler, status):
  150. with handler() as rh:
  151. with pytest.raises(HTTPError) as exc_info:
  152. validate_and_send(rh, Request(f'{self.ws_base_url}/gen_{status}'))
  153. assert exc_info.value.status == status
  154. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  155. @pytest.mark.parametrize('params,extensions', [
  156. ({'timeout': 0.00001}, {}),
  157. ({}, {'timeout': 0.00001}),
  158. ])
  159. def test_timeout(self, handler, params, extensions):
  160. with handler(**params) as rh:
  161. with pytest.raises(TransportError):
  162. validate_and_send(rh, Request(self.ws_base_url, extensions=extensions))
  163. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  164. def test_cookies(self, handler):
  165. cookiejar = YoutubeDLCookieJar()
  166. cookiejar.set_cookie(http.cookiejar.Cookie(
  167. version=0, name='test', value='ytdlp', port=None, port_specified=False,
  168. domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
  169. path_specified=True, secure=False, expires=None, discard=False, comment=None,
  170. comment_url=None, rest={}))
  171. with handler(cookiejar=cookiejar) as rh:
  172. ws = validate_and_send(rh, Request(self.ws_base_url))
  173. ws.send('headers')
  174. assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
  175. ws.close()
  176. with handler() as rh:
  177. ws = validate_and_send(rh, Request(self.ws_base_url))
  178. ws.send('headers')
  179. assert 'cookie' not in json.loads(ws.recv())
  180. ws.close()
  181. ws = validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': cookiejar}))
  182. ws.send('headers')
  183. assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
  184. ws.close()
  185. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  186. def test_source_address(self, handler):
  187. source_address = f'127.0.0.{random.randint(5, 255)}'
  188. with handler(source_address=source_address) as rh:
  189. ws = validate_and_send(rh, Request(self.ws_base_url))
  190. ws.send('source_address')
  191. assert source_address == ws.recv()
  192. ws.close()
  193. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  194. def test_response_url(self, handler):
  195. with handler() as rh:
  196. url = f'{self.ws_base_url}/something'
  197. ws = validate_and_send(rh, Request(url))
  198. assert ws.url == url
  199. ws.close()
  200. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  201. def test_request_headers(self, handler):
  202. with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
  203. # Global Headers
  204. ws = validate_and_send(rh, Request(self.ws_base_url))
  205. ws.send('headers')
  206. headers = HTTPHeaderDict(json.loads(ws.recv()))
  207. assert headers['test1'] == 'test'
  208. ws.close()
  209. # Per request headers, merged with global
  210. ws = validate_and_send(rh, Request(
  211. self.ws_base_url, headers={'test2': 'changed', 'test3': 'test3'}))
  212. ws.send('headers')
  213. headers = HTTPHeaderDict(json.loads(ws.recv()))
  214. assert headers['test1'] == 'test'
  215. assert headers['test2'] == 'changed'
  216. assert headers['test3'] == 'test3'
  217. ws.close()
  218. @pytest.mark.parametrize('client_cert', (
  219. {'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithkey.crt')},
  220. {
  221. 'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
  222. 'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'client.key'),
  223. },
  224. {
  225. 'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithencryptedkey.crt'),
  226. 'client_certificate_password': 'foobar',
  227. },
  228. {
  229. 'client_certificate': os.path.join(MTLS_CERT_DIR, 'client.crt'),
  230. 'client_certificate_key': os.path.join(MTLS_CERT_DIR, 'clientencrypted.key'),
  231. 'client_certificate_password': 'foobar',
  232. }
  233. ))
  234. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  235. def test_mtls(self, handler, client_cert):
  236. with handler(
  237. # Disable client-side validation of unacceptable self-signed testcert.pem
  238. # The test is of a check on the server side, so unaffected
  239. verify=False,
  240. client_cert=client_cert
  241. ) as rh:
  242. validate_and_send(rh, Request(self.mtls_wss_base_url)).close()
  243. def create_fake_ws_connection(raised):
  244. import websockets.sync.client
  245. class FakeWsConnection(websockets.sync.client.ClientConnection):
  246. def __init__(self, *args, **kwargs):
  247. class FakeResponse:
  248. body = b''
  249. headers = {}
  250. status_code = 101
  251. reason_phrase = 'test'
  252. self.response = FakeResponse()
  253. def send(self, *args, **kwargs):
  254. raise raised()
  255. def recv(self, *args, **kwargs):
  256. raise raised()
  257. def close(self, *args, **kwargs):
  258. return
  259. return FakeWsConnection()
  260. @pytest.mark.parametrize('handler', ['Websockets'], indirect=True)
  261. class TestWebsocketsRequestHandler:
  262. @pytest.mark.parametrize('raised,expected', [
  263. # https://websockets.readthedocs.io/en/stable/reference/exceptions.html
  264. (lambda: websockets.exceptions.InvalidURI(msg='test', uri='test://'), RequestError),
  265. # Requires a response object. Should be covered by HTTP error tests.
  266. # (lambda: websockets.exceptions.InvalidStatus(), TransportError),
  267. (lambda: websockets.exceptions.InvalidHandshake(), TransportError),
  268. # These are subclasses of InvalidHandshake
  269. (lambda: websockets.exceptions.InvalidHeader(name='test'), TransportError),
  270. (lambda: websockets.exceptions.NegotiationError(), TransportError),
  271. # Catch-all
  272. (lambda: websockets.exceptions.WebSocketException(), TransportError),
  273. (lambda: TimeoutError(), TransportError),
  274. # These may be raised by our create_connection implementation, which should also be caught
  275. (lambda: OSError(), TransportError),
  276. (lambda: ssl.SSLError(), SSLError),
  277. (lambda: ssl.SSLCertVerificationError(), CertificateVerifyError),
  278. (lambda: socks.ProxyError(), ProxyError),
  279. ])
  280. def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
  281. import websockets.sync.client
  282. import yt_dlp.networking._websockets
  283. with handler() as rh:
  284. def fake_connect(*args, **kwargs):
  285. raise raised()
  286. monkeypatch.setattr(yt_dlp.networking._websockets, 'create_connection', lambda *args, **kwargs: None)
  287. monkeypatch.setattr(websockets.sync.client, 'connect', fake_connect)
  288. with pytest.raises(expected) as exc_info:
  289. rh.send(Request('ws://fake-url'))
  290. assert exc_info.type is expected
  291. @pytest.mark.parametrize('raised,expected,match', [
  292. # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.send
  293. (lambda: websockets.exceptions.ConnectionClosed(None, None), TransportError, None),
  294. (lambda: RuntimeError(), TransportError, None),
  295. (lambda: TimeoutError(), TransportError, None),
  296. (lambda: TypeError(), RequestError, None),
  297. (lambda: socks.ProxyError(), ProxyError, None),
  298. # Catch-all
  299. (lambda: websockets.exceptions.WebSocketException(), TransportError, None),
  300. ])
  301. def test_ws_send_error_mapping(self, handler, monkeypatch, raised, expected, match):
  302. from yt_dlp.networking._websockets import WebsocketsResponseAdapter
  303. ws = WebsocketsResponseAdapter(create_fake_ws_connection(raised), url='ws://fake-url')
  304. with pytest.raises(expected, match=match) as exc_info:
  305. ws.send('test')
  306. assert exc_info.type is expected
  307. @pytest.mark.parametrize('raised,expected,match', [
  308. # https://websockets.readthedocs.io/en/stable/reference/sync/client.html#websockets.sync.client.ClientConnection.recv
  309. (lambda: websockets.exceptions.ConnectionClosed(None, None), TransportError, None),
  310. (lambda: RuntimeError(), TransportError, None),
  311. (lambda: TimeoutError(), TransportError, None),
  312. (lambda: socks.ProxyError(), ProxyError, None),
  313. # Catch-all
  314. (lambda: websockets.exceptions.WebSocketException(), TransportError, None),
  315. ])
  316. def test_ws_recv_error_mapping(self, handler, monkeypatch, raised, expected, match):
  317. from yt_dlp.networking._websockets import WebsocketsResponseAdapter
  318. ws = WebsocketsResponseAdapter(create_fake_ws_connection(raised), url='ws://fake-url')
  319. with pytest.raises(expected, match=match) as exc_info:
  320. ws.recv()
  321. assert exc_info.type is expected