test_http.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import unittest
  6. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. import gzip
  8. import http.cookiejar
  9. import http.server
  10. import io
  11. import pathlib
  12. import ssl
  13. import tempfile
  14. import threading
  15. import urllib.error
  16. import urllib.request
  17. import zlib
  18. from test.helper import http_server_port
  19. from yt_dlp import YoutubeDL
  20. from yt_dlp.dependencies import brotli
  21. from yt_dlp.utils import sanitized_Request, urlencode_postdata
  22. from .helper import FakeYDL
  23. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  24. class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
  25. protocol_version = 'HTTP/1.1'
  26. def log_message(self, format, *args):
  27. pass
  28. def _headers(self):
  29. payload = str(self.headers).encode('utf-8')
  30. self.send_response(200)
  31. self.send_header('Content-Type', 'application/json')
  32. self.send_header('Content-Length', str(len(payload)))
  33. self.end_headers()
  34. self.wfile.write(payload)
  35. def _redirect(self):
  36. self.send_response(int(self.path[len('/redirect_'):]))
  37. self.send_header('Location', '/method')
  38. self.send_header('Content-Length', '0')
  39. self.end_headers()
  40. def _method(self, method, payload=None):
  41. self.send_response(200)
  42. self.send_header('Content-Length', str(len(payload or '')))
  43. self.send_header('Method', method)
  44. self.end_headers()
  45. if payload:
  46. self.wfile.write(payload)
  47. def _status(self, status):
  48. payload = f'<html>{status} NOT FOUND</html>'.encode()
  49. self.send_response(int(status))
  50. self.send_header('Content-Type', 'text/html; charset=utf-8')
  51. self.send_header('Content-Length', str(len(payload)))
  52. self.end_headers()
  53. self.wfile.write(payload)
  54. def _read_data(self):
  55. if 'Content-Length' in self.headers:
  56. return self.rfile.read(int(self.headers['Content-Length']))
  57. def do_POST(self):
  58. data = self._read_data()
  59. if self.path.startswith('/redirect_'):
  60. self._redirect()
  61. elif self.path.startswith('/method'):
  62. self._method('POST', data)
  63. elif self.path.startswith('/headers'):
  64. self._headers()
  65. else:
  66. self._status(404)
  67. def do_HEAD(self):
  68. if self.path.startswith('/redirect_'):
  69. self._redirect()
  70. elif self.path.startswith('/method'):
  71. self._method('HEAD')
  72. else:
  73. self._status(404)
  74. def do_PUT(self):
  75. data = self._read_data()
  76. if self.path.startswith('/redirect_'):
  77. self._redirect()
  78. elif self.path.startswith('/method'):
  79. self._method('PUT', data)
  80. else:
  81. self._status(404)
  82. def do_GET(self):
  83. if self.path == '/video.html':
  84. payload = b'<html><video src="/vid.mp4" /></html>'
  85. self.send_response(200)
  86. self.send_header('Content-Type', 'text/html; charset=utf-8')
  87. self.send_header('Content-Length', str(len(payload))) # required for persistent connections
  88. self.end_headers()
  89. self.wfile.write(payload)
  90. elif self.path == '/vid.mp4':
  91. payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
  92. self.send_response(200)
  93. self.send_header('Content-Type', 'video/mp4')
  94. self.send_header('Content-Length', str(len(payload)))
  95. self.end_headers()
  96. self.wfile.write(payload)
  97. elif self.path == '/%E4%B8%AD%E6%96%87.html':
  98. payload = b'<html><video src="/vid.mp4" /></html>'
  99. self.send_response(200)
  100. self.send_header('Content-Type', 'text/html; charset=utf-8')
  101. self.send_header('Content-Length', str(len(payload)))
  102. self.end_headers()
  103. self.wfile.write(payload)
  104. elif self.path == '/%c7%9f':
  105. payload = b'<html><video src="/vid.mp4" /></html>'
  106. self.send_response(200)
  107. self.send_header('Content-Type', 'text/html; charset=utf-8')
  108. self.send_header('Content-Length', str(len(payload)))
  109. self.end_headers()
  110. self.wfile.write(payload)
  111. elif self.path.startswith('/redirect_'):
  112. self._redirect()
  113. elif self.path.startswith('/method'):
  114. self._method('GET')
  115. elif self.path.startswith('/headers'):
  116. self._headers()
  117. elif self.path == '/trailing_garbage':
  118. payload = b'<html><video src="/vid.mp4" /></html>'
  119. self.send_response(200)
  120. self.send_header('Content-Type', 'text/html; charset=utf-8')
  121. self.send_header('Content-Encoding', 'gzip')
  122. buf = io.BytesIO()
  123. with gzip.GzipFile(fileobj=buf, mode='wb') as f:
  124. f.write(payload)
  125. compressed = buf.getvalue() + b'trailing garbage'
  126. self.send_header('Content-Length', str(len(compressed)))
  127. self.end_headers()
  128. self.wfile.write(compressed)
  129. elif self.path == '/302-non-ascii-redirect':
  130. new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
  131. self.send_response(301)
  132. self.send_header('Location', new_url)
  133. self.send_header('Content-Length', '0')
  134. self.end_headers()
  135. elif self.path == '/content-encoding':
  136. encodings = self.headers.get('ytdl-encoding', '')
  137. payload = b'<html><video src="/vid.mp4" /></html>'
  138. for encoding in filter(None, (e.strip() for e in encodings.split(','))):
  139. if encoding == 'br' and brotli:
  140. payload = brotli.compress(payload)
  141. elif encoding == 'gzip':
  142. buf = io.BytesIO()
  143. with gzip.GzipFile(fileobj=buf, mode='wb') as f:
  144. f.write(payload)
  145. payload = buf.getvalue()
  146. elif encoding == 'deflate':
  147. payload = zlib.compress(payload)
  148. elif encoding == 'unsupported':
  149. payload = b'raw'
  150. break
  151. else:
  152. self._status(415)
  153. return
  154. self.send_response(200)
  155. self.send_header('Content-Encoding', encodings)
  156. self.send_header('Content-Length', str(len(payload)))
  157. self.end_headers()
  158. self.wfile.write(payload)
  159. else:
  160. self._status(404)
  161. def send_header(self, keyword, value):
  162. """
  163. Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
  164. This is against what is defined in RFC 3986, however we need to test we support this
  165. since some sites incorrectly do this.
  166. """
  167. if keyword.lower() == 'connection':
  168. return super().send_header(keyword, value)
  169. if not hasattr(self, '_headers_buffer'):
  170. self._headers_buffer = []
  171. self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
  172. class FakeLogger:
  173. def debug(self, msg):
  174. pass
  175. def warning(self, msg):
  176. pass
  177. def error(self, msg):
  178. pass
  179. class TestHTTP(unittest.TestCase):
  180. def setUp(self):
  181. # HTTP server
  182. self.http_httpd = http.server.ThreadingHTTPServer(
  183. ('127.0.0.1', 0), HTTPTestRequestHandler)
  184. self.http_port = http_server_port(self.http_httpd)
  185. self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever)
  186. # FIXME: we should probably stop the http server thread after each test
  187. # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
  188. self.http_server_thread.daemon = True
  189. self.http_server_thread.start()
  190. # HTTPS server
  191. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  192. self.https_httpd = http.server.ThreadingHTTPServer(
  193. ('127.0.0.1', 0), HTTPTestRequestHandler)
  194. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  195. sslctx.load_cert_chain(certfn, None)
  196. self.https_httpd.socket = sslctx.wrap_socket(self.https_httpd.socket, server_side=True)
  197. self.https_port = http_server_port(self.https_httpd)
  198. self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever)
  199. self.https_server_thread.daemon = True
  200. self.https_server_thread.start()
  201. def test_nocheckcertificate(self):
  202. with FakeYDL({'logger': FakeLogger()}) as ydl:
  203. with self.assertRaises(urllib.error.URLError):
  204. ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
  205. with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl:
  206. r = ydl.urlopen(sanitized_Request(f'https://127.0.0.1:{self.https_port}/headers'))
  207. self.assertEqual(r.status, 200)
  208. r.close()
  209. def test_percent_encode(self):
  210. with FakeYDL() as ydl:
  211. # Unicode characters should be encoded with uppercase percent-encoding
  212. res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
  213. self.assertEqual(res.status, 200)
  214. res.close()
  215. # don't normalize existing percent encodings
  216. res = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
  217. self.assertEqual(res.status, 200)
  218. res.close()
  219. def test_unicode_path_redirection(self):
  220. with FakeYDL() as ydl:
  221. r = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
  222. self.assertEqual(r.url, f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html')
  223. r.close()
  224. def test_redirect(self):
  225. with FakeYDL() as ydl:
  226. def do_req(redirect_status, method):
  227. data = b'testdata' if method in ('POST', 'PUT') else None
  228. res = ydl.urlopen(sanitized_Request(
  229. f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data))
  230. return res.read().decode('utf-8'), res.headers.get('method', '')
  231. # A 303 must either use GET or HEAD for subsequent request
  232. self.assertEqual(do_req(303, 'POST'), ('', 'GET'))
  233. self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD'))
  234. self.assertEqual(do_req(303, 'PUT'), ('', 'GET'))
  235. # 301 and 302 turn POST only into a GET
  236. self.assertEqual(do_req(301, 'POST'), ('', 'GET'))
  237. self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD'))
  238. self.assertEqual(do_req(302, 'POST'), ('', 'GET'))
  239. self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD'))
  240. self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT'))
  241. self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT'))
  242. # 307 and 308 should not change method
  243. for m in ('POST', 'PUT'):
  244. self.assertEqual(do_req(307, m), ('testdata', m))
  245. self.assertEqual(do_req(308, m), ('testdata', m))
  246. self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD'))
  247. self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD'))
  248. # These should not redirect and instead raise an HTTPError
  249. for code in (300, 304, 305, 306):
  250. with self.assertRaises(urllib.error.HTTPError):
  251. do_req(code, 'GET')
  252. def test_content_type(self):
  253. # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28
  254. with FakeYDL({'nocheckcertificate': True}) as ydl:
  255. # method should be auto-detected as POST
  256. r = sanitized_Request(f'https://localhost:{self.https_port}/headers', data=urlencode_postdata({'test': 'test'}))
  257. headers = ydl.urlopen(r).read().decode('utf-8')
  258. self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
  259. # test http
  260. r = sanitized_Request(f'http://localhost:{self.http_port}/headers', data=urlencode_postdata({'test': 'test'}))
  261. headers = ydl.urlopen(r).read().decode('utf-8')
  262. self.assertIn('Content-Type: application/x-www-form-urlencoded', headers)
  263. def test_cookiejar(self):
  264. with FakeYDL() as ydl:
  265. ydl.cookiejar.set_cookie(http.cookiejar.Cookie(
  266. 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
  267. False, '/headers', True, False, None, False, None, None, {}))
  268. data = ydl.urlopen(sanitized_Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
  269. self.assertIn(b'Cookie: test=ytdlp', data)
  270. def test_no_compression_compat_header(self):
  271. with FakeYDL() as ydl:
  272. data = ydl.urlopen(
  273. sanitized_Request(
  274. f'http://127.0.0.1:{self.http_port}/headers',
  275. headers={'Youtubedl-no-compression': True})).read()
  276. self.assertIn(b'Accept-Encoding: identity', data)
  277. self.assertNotIn(b'youtubedl-no-compression', data.lower())
  278. def test_gzip_trailing_garbage(self):
  279. # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5
  280. # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f
  281. with FakeYDL() as ydl:
  282. data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
  283. self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
  284. @unittest.skipUnless(brotli, 'brotli support is not installed')
  285. def test_brotli(self):
  286. with FakeYDL() as ydl:
  287. res = ydl.urlopen(
  288. sanitized_Request(
  289. f'http://127.0.0.1:{self.http_port}/content-encoding',
  290. headers={'ytdl-encoding': 'br'}))
  291. self.assertEqual(res.headers.get('Content-Encoding'), 'br')
  292. self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
  293. def test_deflate(self):
  294. with FakeYDL() as ydl:
  295. res = ydl.urlopen(
  296. sanitized_Request(
  297. f'http://127.0.0.1:{self.http_port}/content-encoding',
  298. headers={'ytdl-encoding': 'deflate'}))
  299. self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
  300. self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
  301. def test_gzip(self):
  302. with FakeYDL() as ydl:
  303. res = ydl.urlopen(
  304. sanitized_Request(
  305. f'http://127.0.0.1:{self.http_port}/content-encoding',
  306. headers={'ytdl-encoding': 'gzip'}))
  307. self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
  308. self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
  309. def test_multiple_encodings(self):
  310. # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
  311. with FakeYDL() as ydl:
  312. for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
  313. res = ydl.urlopen(
  314. sanitized_Request(
  315. f'http://127.0.0.1:{self.http_port}/content-encoding',
  316. headers={'ytdl-encoding': pair}))
  317. self.assertEqual(res.headers.get('Content-Encoding'), pair)
  318. self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
  319. def test_unsupported_encoding(self):
  320. # it should return the raw content
  321. with FakeYDL() as ydl:
  322. res = ydl.urlopen(
  323. sanitized_Request(
  324. f'http://127.0.0.1:{self.http_port}/content-encoding',
  325. headers={'ytdl-encoding': 'unsupported'}))
  326. self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
  327. self.assertEqual(res.read(), b'raw')
  328. class TestClientCert(unittest.TestCase):
  329. def setUp(self):
  330. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  331. self.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
  332. cacertfn = os.path.join(self.certdir, 'ca.crt')
  333. self.httpd = http.server.HTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
  334. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  335. sslctx.verify_mode = ssl.CERT_REQUIRED
  336. sslctx.load_verify_locations(cafile=cacertfn)
  337. sslctx.load_cert_chain(certfn, None)
  338. self.httpd.socket = sslctx.wrap_socket(self.httpd.socket, server_side=True)
  339. self.port = http_server_port(self.httpd)
  340. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  341. self.server_thread.daemon = True
  342. self.server_thread.start()
  343. def _run_test(self, **params):
  344. ydl = YoutubeDL({
  345. 'logger': FakeLogger(),
  346. # Disable client-side validation of unacceptable self-signed testcert.pem
  347. # The test is of a check on the server side, so unaffected
  348. 'nocheckcertificate': True,
  349. **params,
  350. })
  351. r = ydl.extract_info(f'https://127.0.0.1:{self.port}/video.html')
  352. self.assertEqual(r['url'], f'https://127.0.0.1:{self.port}/vid.mp4')
  353. def test_certificate_combined_nopass(self):
  354. self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithkey.crt'))
  355. def test_certificate_nocombined_nopass(self):
  356. self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
  357. client_certificate_key=os.path.join(self.certdir, 'client.key'))
  358. def test_certificate_combined_pass(self):
  359. self._run_test(client_certificate=os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
  360. client_certificate_password='foobar')
  361. def test_certificate_nocombined_pass(self):
  362. self._run_test(client_certificate=os.path.join(self.certdir, 'client.crt'),
  363. client_certificate_key=os.path.join(self.certdir, 'clientencrypted.key'),
  364. client_certificate_password='foobar')
  365. def _build_proxy_handler(name):
  366. class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
  367. proxy_name = name
  368. def log_message(self, format, *args):
  369. pass
  370. def do_GET(self):
  371. self.send_response(200)
  372. self.send_header('Content-Type', 'text/plain; charset=utf-8')
  373. self.end_headers()
  374. self.wfile.write(f'{self.proxy_name}: {self.path}'.encode())
  375. return HTTPTestRequestHandler
  376. class TestProxy(unittest.TestCase):
  377. def setUp(self):
  378. self.proxy = http.server.HTTPServer(
  379. ('127.0.0.1', 0), _build_proxy_handler('normal'))
  380. self.port = http_server_port(self.proxy)
  381. self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
  382. self.proxy_thread.daemon = True
  383. self.proxy_thread.start()
  384. self.geo_proxy = http.server.HTTPServer(
  385. ('127.0.0.1', 0), _build_proxy_handler('geo'))
  386. self.geo_port = http_server_port(self.geo_proxy)
  387. self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
  388. self.geo_proxy_thread.daemon = True
  389. self.geo_proxy_thread.start()
  390. def test_proxy(self):
  391. geo_proxy = f'127.0.0.1:{self.geo_port}'
  392. ydl = YoutubeDL({
  393. 'proxy': f'127.0.0.1:{self.port}',
  394. 'geo_verification_proxy': geo_proxy,
  395. })
  396. url = 'http://foo.com/bar'
  397. response = ydl.urlopen(url).read().decode()
  398. self.assertEqual(response, f'normal: {url}')
  399. req = urllib.request.Request(url)
  400. req.add_header('Ytdl-request-proxy', geo_proxy)
  401. response = ydl.urlopen(req).read().decode()
  402. self.assertEqual(response, f'geo: {url}')
  403. def test_proxy_with_idn(self):
  404. ydl = YoutubeDL({
  405. 'proxy': f'127.0.0.1:{self.port}',
  406. })
  407. url = 'http://中文.tw/'
  408. response = ydl.urlopen(url).read().decode()
  409. # b'xn--fiq228c' is '中文'.encode('idna')
  410. self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
  411. class TestFileURL(unittest.TestCase):
  412. # See https://github.com/ytdl-org/youtube-dl/issues/8227
  413. def test_file_urls(self):
  414. tf = tempfile.NamedTemporaryFile(delete=False)
  415. tf.write(b'foobar')
  416. tf.close()
  417. url = pathlib.Path(tf.name).as_uri()
  418. with FakeYDL() as ydl:
  419. self.assertRaisesRegex(
  420. urllib.error.URLError, 'file:// URLs are explicitly disabled in yt-dlp for security reasons', ydl.urlopen, url)
  421. with FakeYDL({'enable_file_urls': True}) as ydl:
  422. res = ydl.urlopen(url)
  423. self.assertEqual(res.read(), b'foobar')
  424. res.close()
  425. os.unlink(tf.name)
  426. if __name__ == '__main__':
  427. unittest.main()