test_downloader_http.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import unittest
  6. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. import http.server
  8. import re
  9. import threading
  10. from test.helper import http_server_port, try_rm
  11. from yt_dlp import YoutubeDL
  12. from yt_dlp.downloader.http import HttpFD
  13. from yt_dlp.utils import encodeFilename
  14. from yt_dlp.utils._utils import _YDLLogger as FakeLogger
  15. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  16. TEST_SIZE = 10 * 1024
  17. class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
  18. def log_message(self, format, *args):
  19. pass
  20. def send_content_range(self, total=None):
  21. range_header = self.headers.get('Range')
  22. start = end = None
  23. if range_header:
  24. mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)
  25. if mobj:
  26. start = int(mobj.group(1))
  27. end = int(mobj.group(2))
  28. valid_range = start is not None and end is not None
  29. if valid_range:
  30. content_range = f'bytes {start}-{end}'
  31. if total:
  32. content_range += f'/{total}'
  33. self.send_header('Content-Range', content_range)
  34. return (end - start + 1) if valid_range else total
  35. def serve(self, range=True, content_length=True):
  36. self.send_response(200)
  37. self.send_header('Content-Type', 'video/mp4')
  38. size = TEST_SIZE
  39. if range:
  40. size = self.send_content_range(TEST_SIZE)
  41. if content_length:
  42. self.send_header('Content-Length', size)
  43. self.end_headers()
  44. self.wfile.write(b'#' * size)
  45. def do_GET(self):
  46. if self.path == '/regular':
  47. self.serve()
  48. elif self.path == '/no-content-length':
  49. self.serve(content_length=False)
  50. elif self.path == '/no-range':
  51. self.serve(range=False)
  52. elif self.path == '/no-range-no-content-length':
  53. self.serve(range=False, content_length=False)
  54. else:
  55. assert False
  56. class TestHttpFD(unittest.TestCase):
  57. def setUp(self):
  58. self.httpd = http.server.HTTPServer(
  59. ('127.0.0.1', 0), HTTPTestRequestHandler)
  60. self.port = http_server_port(self.httpd)
  61. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  62. self.server_thread.daemon = True
  63. self.server_thread.start()
  64. def download(self, params, ep):
  65. params['logger'] = FakeLogger()
  66. ydl = YoutubeDL(params)
  67. downloader = HttpFD(ydl, params)
  68. filename = 'testfile.mp4'
  69. try_rm(encodeFilename(filename))
  70. self.assertTrue(downloader.real_download(filename, {
  71. 'url': f'http://127.0.0.1:{self.port}/{ep}',
  72. }), ep)
  73. self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE, ep)
  74. try_rm(encodeFilename(filename))
  75. def download_all(self, params):
  76. for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  77. self.download(params, ep)
  78. def test_regular(self):
  79. self.download_all({})
  80. def test_chunked(self):
  81. self.download_all({
  82. 'http_chunk_size': 1000,
  83. })
  84. if __name__ == '__main__':
  85. unittest.main()