|
@@ -17,9 +17,11 @@ import tempfile
|
|
|
import threading
|
|
|
import urllib.error
|
|
|
import urllib.request
|
|
|
+import zlib
|
|
|
|
|
|
from test.helper import http_server_port
|
|
|
from yt_dlp import YoutubeDL
|
|
|
+from yt_dlp.dependencies import brotli
|
|
|
from yt_dlp.utils import sanitized_Request, urlencode_postdata
|
|
|
|
|
|
from .helper import FakeYDL
|
|
@@ -148,6 +150,31 @@ class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
self.send_header('Location', new_url)
|
|
|
self.send_header('Content-Length', '0')
|
|
|
self.end_headers()
|
|
|
+ elif self.path == '/content-encoding':
|
|
|
+ encodings = self.headers.get('ytdl-encoding', '')
|
|
|
+ payload = b'<html><video src="/vid.mp4" /></html>'
|
|
|
+ for encoding in filter(None, (e.strip() for e in encodings.split(','))):
|
|
|
+ if encoding == 'br' and brotli:
|
|
|
+ payload = brotli.compress(payload)
|
|
|
+ elif encoding == 'gzip':
|
|
|
+ buf = io.BytesIO()
|
|
|
+ with gzip.GzipFile(fileobj=buf, mode='wb') as f:
|
|
|
+ f.write(payload)
|
|
|
+ payload = buf.getvalue()
|
|
|
+ elif encoding == 'deflate':
|
|
|
+ payload = zlib.compress(payload)
|
|
|
+ elif encoding == 'unsupported':
|
|
|
+ payload = b'raw'
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self._status(415)
|
|
|
+ return
|
|
|
+ self.send_response(200)
|
|
|
+ self.send_header('Content-Encoding', encodings)
|
|
|
+ self.send_header('Content-Length', str(len(payload)))
|
|
|
+ self.end_headers()
|
|
|
+ self.wfile.write(payload)
|
|
|
+
|
|
|
else:
|
|
|
self._status(404)
|
|
|
|
|
@@ -302,6 +329,55 @@ class TestHTTP(unittest.TestCase):
|
|
|
data = ydl.urlopen(sanitized_Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode('utf-8')
|
|
|
self.assertEqual(data, '<html><video src="/vid.mp4" /></html>')
|
|
|
|
|
|
+ @unittest.skipUnless(brotli, 'brotli support is not installed')
|
|
|
+ def test_brotli(self):
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ res = ydl.urlopen(
|
|
|
+ sanitized_Request(
|
|
|
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
+ headers={'ytdl-encoding': 'br'}))
|
|
|
+ self.assertEqual(res.headers.get('Content-Encoding'), 'br')
|
|
|
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
|
|
+
|
|
|
+ def test_deflate(self):
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ res = ydl.urlopen(
|
|
|
+ sanitized_Request(
|
|
|
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
+ headers={'ytdl-encoding': 'deflate'}))
|
|
|
+ self.assertEqual(res.headers.get('Content-Encoding'), 'deflate')
|
|
|
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
|
|
+
|
|
|
+ def test_gzip(self):
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ res = ydl.urlopen(
|
|
|
+ sanitized_Request(
|
|
|
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
+ headers={'ytdl-encoding': 'gzip'}))
|
|
|
+ self.assertEqual(res.headers.get('Content-Encoding'), 'gzip')
|
|
|
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
|
|
+
|
|
|
+ def test_multiple_encodings(self):
|
|
|
+ # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
|
|
|
+ res = ydl.urlopen(
|
|
|
+ sanitized_Request(
|
|
|
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
+ headers={'ytdl-encoding': pair}))
|
|
|
+ self.assertEqual(res.headers.get('Content-Encoding'), pair)
|
|
|
+ self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>')
|
|
|
+
|
|
|
+ def test_unsupported_encoding(self):
|
|
|
+ # it should return the raw content
|
|
|
+ with FakeYDL() as ydl:
|
|
|
+ res = ydl.urlopen(
|
|
|
+ sanitized_Request(
|
|
|
+ f'http://127.0.0.1:{self.http_port}/content-encoding',
|
|
|
+ headers={'ytdl-encoding': 'unsupported'}))
|
|
|
+ self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported')
|
|
|
+ self.assertEqual(res.read(), b'raw')
|
|
|
+
|
|
|
|
|
|
class TestClientCert(unittest.TestCase):
|
|
|
def setUp(self):
|