fragment.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. import concurrent.futures
  2. import contextlib
  3. import http.client
  4. import json
  5. import math
  6. import os
  7. import time
  8. from .common import FileDownloader
  9. from .http import HttpFD
  10. from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
  11. from ..compat import compat_os_name, compat_struct_pack, compat_urllib_error
  12. from ..utils import (
  13. DownloadError,
  14. encodeFilename,
  15. error_to_compat_str,
  16. sanitized_Request,
  17. traverse_obj,
  18. )
  19. class HttpQuietDownloader(HttpFD):
  20. def to_screen(self, *args, **kargs):
  21. pass
  22. console_title = to_screen
  23. class FragmentFD(FileDownloader):
  24. """
  25. A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
  26. Available options:
  27. fragment_retries: Number of times to retry a fragment for HTTP error (DASH
  28. and hlsnative only)
  29. skip_unavailable_fragments:
  30. Skip unavailable fragments (DASH and hlsnative only)
  31. keep_fragments: Keep downloaded fragments on disk after downloading is
  32. finished
  33. concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads
  34. _no_ytdl_file: Don't use .ytdl file
  35. For each incomplete fragment download yt-dlp keeps on disk a special
  36. bookkeeping file with download state and metadata (in future such files will
  37. be used for any incomplete download handled by yt-dlp). This file is
  38. used to properly handle resuming, check download file consistency and detect
  39. potential errors. The file has a .ytdl extension and represents a standard
  40. JSON file of the following format:
  41. extractor:
  42. Dictionary of extractor related data. TBD.
  43. downloader:
  44. Dictionary of downloader related data. May contain following data:
  45. current_fragment:
  46. Dictionary with current (being downloaded) fragment data:
  47. index: 0-based index of current fragment among all fragments
  48. fragment_count:
  49. Total count of fragments
  50. This feature is experimental and file format may change in future.
  51. """
  52. def report_retry_fragment(self, err, frag_index, count, retries):
  53. self.to_screen(
  54. '\r[download] Got server HTTP error: %s. Retrying fragment %d (attempt %d of %s) ...'
  55. % (error_to_compat_str(err), frag_index, count, self.format_retries(retries)))
  56. self.sleep_retry('fragment', count)
  57. def report_skip_fragment(self, frag_index, err=None):
  58. err = f' {err};' if err else ''
  59. self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...')
  60. def _prepare_url(self, info_dict, url):
  61. headers = info_dict.get('http_headers')
  62. return sanitized_Request(url, None, headers) if headers else url
  63. def _prepare_and_start_frag_download(self, ctx, info_dict):
  64. self._prepare_frag_download(ctx)
  65. self._start_frag_download(ctx, info_dict)
  66. def __do_ytdl_file(self, ctx):
  67. return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
  68. def _read_ytdl_file(self, ctx):
  69. assert 'ytdl_corrupt' not in ctx
  70. stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
  71. try:
  72. ytdl_data = json.loads(stream.read())
  73. ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
  74. if 'extra_state' in ytdl_data['downloader']:
  75. ctx['extra_state'] = ytdl_data['downloader']['extra_state']
  76. except Exception:
  77. ctx['ytdl_corrupt'] = True
  78. finally:
  79. stream.close()
  80. def _write_ytdl_file(self, ctx):
  81. frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
  82. try:
  83. downloader = {
  84. 'current_fragment': {
  85. 'index': ctx['fragment_index'],
  86. },
  87. }
  88. if 'extra_state' in ctx:
  89. downloader['extra_state'] = ctx['extra_state']
  90. if ctx.get('fragment_count') is not None:
  91. downloader['fragment_count'] = ctx['fragment_count']
  92. frag_index_stream.write(json.dumps({'downloader': downloader}))
  93. finally:
  94. frag_index_stream.close()
  95. def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
  96. fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
  97. fragment_info_dict = {
  98. 'url': frag_url,
  99. 'http_headers': headers or info_dict.get('http_headers'),
  100. 'request_data': request_data,
  101. 'ctx_id': ctx.get('ctx_id'),
  102. }
  103. success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict)
  104. if not success:
  105. return False
  106. if fragment_info_dict.get('filetime'):
  107. ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
  108. ctx['fragment_filename_sanitized'] = fragment_filename
  109. return True
  110. def _read_fragment(self, ctx):
  111. if not ctx.get('fragment_filename_sanitized'):
  112. return None
  113. try:
  114. down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
  115. except FileNotFoundError:
  116. if ctx.get('live'):
  117. return None
  118. raise
  119. ctx['fragment_filename_sanitized'] = frag_sanitized
  120. frag_content = down.read()
  121. down.close()
  122. return frag_content
  123. def _append_fragment(self, ctx, frag_content):
  124. try:
  125. ctx['dest_stream'].write(frag_content)
  126. ctx['dest_stream'].flush()
  127. finally:
  128. if self.__do_ytdl_file(ctx):
  129. self._write_ytdl_file(ctx)
  130. if not self.params.get('keep_fragments', False):
  131. self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
  132. del ctx['fragment_filename_sanitized']
  133. def _prepare_frag_download(self, ctx):
  134. if 'live' not in ctx:
  135. ctx['live'] = False
  136. if not ctx['live']:
  137. total_frags_str = '%d' % ctx['total_frags']
  138. ad_frags = ctx.get('ad_frags', 0)
  139. if ad_frags:
  140. total_frags_str += ' (not including %d ad)' % ad_frags
  141. else:
  142. total_frags_str = 'unknown (live)'
  143. self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
  144. self.report_destination(ctx['filename'])
  145. dl = HttpQuietDownloader(self.ydl, {
  146. **self.params,
  147. 'noprogress': True,
  148. 'test': False,
  149. })
  150. tmpfilename = self.temp_name(ctx['filename'])
  151. open_mode = 'wb'
  152. resume_len = 0
  153. # Establish possible resume length
  154. if os.path.isfile(encodeFilename(tmpfilename)):
  155. open_mode = 'ab'
  156. resume_len = os.path.getsize(encodeFilename(tmpfilename))
  157. # Should be initialized before ytdl file check
  158. ctx.update({
  159. 'tmpfilename': tmpfilename,
  160. 'fragment_index': 0,
  161. })
  162. if self.__do_ytdl_file(ctx):
  163. if os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename']))):
  164. self._read_ytdl_file(ctx)
  165. is_corrupt = ctx.get('ytdl_corrupt') is True
  166. is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
  167. if is_corrupt or is_inconsistent:
  168. message = (
  169. '.ytdl file is corrupt' if is_corrupt else
  170. 'Inconsistent state of incomplete fragment download')
  171. self.report_warning(
  172. '%s. Restarting from the beginning ...' % message)
  173. ctx['fragment_index'] = resume_len = 0
  174. if 'ytdl_corrupt' in ctx:
  175. del ctx['ytdl_corrupt']
  176. self._write_ytdl_file(ctx)
  177. else:
  178. self._write_ytdl_file(ctx)
  179. assert ctx['fragment_index'] == 0
  180. dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode)
  181. ctx.update({
  182. 'dl': dl,
  183. 'dest_stream': dest_stream,
  184. 'tmpfilename': tmpfilename,
  185. # Total complete fragments downloaded so far in bytes
  186. 'complete_frags_downloaded_bytes': resume_len,
  187. })
  188. def _start_frag_download(self, ctx, info_dict):
  189. resume_len = ctx['complete_frags_downloaded_bytes']
  190. total_frags = ctx['total_frags']
  191. ctx_id = ctx.get('ctx_id')
  192. # This dict stores the download progress, it's updated by the progress
  193. # hook
  194. state = {
  195. 'status': 'downloading',
  196. 'downloaded_bytes': resume_len,
  197. 'fragment_index': ctx['fragment_index'],
  198. 'fragment_count': total_frags,
  199. 'filename': ctx['filename'],
  200. 'tmpfilename': ctx['tmpfilename'],
  201. }
  202. start = time.time()
  203. ctx.update({
  204. 'started': start,
  205. 'fragment_started': start,
  206. # Amount of fragment's bytes downloaded by the time of the previous
  207. # frag progress hook invocation
  208. 'prev_frag_downloaded_bytes': 0,
  209. })
  210. def frag_progress_hook(s):
  211. if s['status'] not in ('downloading', 'finished'):
  212. return
  213. if not total_frags and ctx.get('fragment_count'):
  214. state['fragment_count'] = ctx['fragment_count']
  215. if ctx_id is not None and s.get('ctx_id') != ctx_id:
  216. return
  217. state['max_progress'] = ctx.get('max_progress')
  218. state['progress_idx'] = ctx.get('progress_idx')
  219. time_now = time.time()
  220. state['elapsed'] = time_now - start
  221. frag_total_bytes = s.get('total_bytes') or 0
  222. s['fragment_info_dict'] = s.pop('info_dict', {})
  223. if not ctx['live']:
  224. estimated_size = (
  225. (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
  226. / (state['fragment_index'] + 1) * total_frags)
  227. state['total_bytes_estimate'] = estimated_size
  228. if s['status'] == 'finished':
  229. state['fragment_index'] += 1
  230. ctx['fragment_index'] = state['fragment_index']
  231. state['downloaded_bytes'] += frag_total_bytes - ctx['prev_frag_downloaded_bytes']
  232. ctx['complete_frags_downloaded_bytes'] = state['downloaded_bytes']
  233. ctx['speed'] = state['speed'] = self.calc_speed(
  234. ctx['fragment_started'], time_now, frag_total_bytes)
  235. ctx['fragment_started'] = time.time()
  236. ctx['prev_frag_downloaded_bytes'] = 0
  237. else:
  238. frag_downloaded_bytes = s['downloaded_bytes']
  239. state['downloaded_bytes'] += frag_downloaded_bytes - ctx['prev_frag_downloaded_bytes']
  240. if not ctx['live']:
  241. state['eta'] = self.calc_eta(
  242. start, time_now, estimated_size - resume_len,
  243. state['downloaded_bytes'] - resume_len)
  244. ctx['speed'] = state['speed'] = self.calc_speed(
  245. ctx['fragment_started'], time_now, frag_downloaded_bytes)
  246. ctx['prev_frag_downloaded_bytes'] = frag_downloaded_bytes
  247. self._hook_progress(state, info_dict)
  248. ctx['dl'].add_progress_hook(frag_progress_hook)
  249. return start
  250. def _finish_frag_download(self, ctx, info_dict):
  251. ctx['dest_stream'].close()
  252. if self.__do_ytdl_file(ctx):
  253. ytdl_filename = encodeFilename(self.ytdl_filename(ctx['filename']))
  254. if os.path.isfile(ytdl_filename):
  255. self.try_remove(ytdl_filename)
  256. elapsed = time.time() - ctx['started']
  257. if ctx['tmpfilename'] == '-':
  258. downloaded_bytes = ctx['complete_frags_downloaded_bytes']
  259. else:
  260. self.try_rename(ctx['tmpfilename'], ctx['filename'])
  261. if self.params.get('updatetime', True):
  262. filetime = ctx.get('fragment_filetime')
  263. if filetime:
  264. with contextlib.suppress(Exception):
  265. os.utime(ctx['filename'], (time.time(), filetime))
  266. downloaded_bytes = os.path.getsize(encodeFilename(ctx['filename']))
  267. self._hook_progress({
  268. 'downloaded_bytes': downloaded_bytes,
  269. 'total_bytes': downloaded_bytes,
  270. 'filename': ctx['filename'],
  271. 'status': 'finished',
  272. 'elapsed': elapsed,
  273. 'ctx_id': ctx.get('ctx_id'),
  274. 'max_progress': ctx.get('max_progress'),
  275. 'progress_idx': ctx.get('progress_idx'),
  276. }, info_dict)
  277. def _prepare_external_frag_download(self, ctx):
  278. if 'live' not in ctx:
  279. ctx['live'] = False
  280. if not ctx['live']:
  281. total_frags_str = '%d' % ctx['total_frags']
  282. ad_frags = ctx.get('ad_frags', 0)
  283. if ad_frags:
  284. total_frags_str += ' (not including %d ad)' % ad_frags
  285. else:
  286. total_frags_str = 'unknown (live)'
  287. self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
  288. tmpfilename = self.temp_name(ctx['filename'])
  289. # Should be initialized before ytdl file check
  290. ctx.update({
  291. 'tmpfilename': tmpfilename,
  292. 'fragment_index': 0,
  293. })
  294. def decrypter(self, info_dict):
  295. _key_cache = {}
  296. def _get_key(url):
  297. if url not in _key_cache:
  298. _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
  299. return _key_cache[url]
  300. def decrypt_fragment(fragment, frag_content):
  301. decrypt_info = fragment.get('decrypt_info')
  302. if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
  303. return frag_content
  304. iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', fragment['media_sequence'])
  305. decrypt_info['KEY'] = decrypt_info.get('KEY') or _get_key(info_dict.get('_decryption_key_url') or decrypt_info['URI'])
  306. # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
  307. # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
  308. # not what it decrypts to.
  309. if self.params.get('test', False):
  310. return frag_content
  311. return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv))
  312. return decrypt_fragment
  313. def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None):
  314. '''
  315. @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
  316. all args must be either tuple or list
  317. '''
  318. interrupt_trigger = [True]
  319. max_progress = len(args)
  320. if max_progress == 1:
  321. return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func)
  322. max_workers = self.params.get('concurrent_fragment_downloads', 1)
  323. if max_progress > 1:
  324. self._prepare_multiline_status(max_progress)
  325. is_live = any(traverse_obj(args, (..., 2, 'is_live'), default=[]))
  326. def thread_func(idx, ctx, fragments, info_dict, tpe):
  327. ctx['max_progress'] = max_progress
  328. ctx['progress_idx'] = idx
  329. return self.download_and_append_fragments(
  330. ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func,
  331. tpe=tpe, interrupt_trigger=interrupt_trigger)
  332. class FTPE(concurrent.futures.ThreadPoolExecutor):
  333. # has to stop this or it's going to wait on the worker thread itself
  334. def __exit__(self, exc_type, exc_val, exc_tb):
  335. pass
  336. if compat_os_name == 'nt':
  337. def future_result(future):
  338. while True:
  339. try:
  340. return future.result(0.1)
  341. except KeyboardInterrupt:
  342. raise
  343. except concurrent.futures.TimeoutError:
  344. continue
  345. else:
  346. def future_result(future):
  347. return future.result()
  348. def interrupt_trigger_iter(fg):
  349. for f in fg:
  350. if not interrupt_trigger[0]:
  351. break
  352. yield f
  353. spins = []
  354. for idx, (ctx, fragments, info_dict) in enumerate(args):
  355. tpe = FTPE(math.ceil(max_workers / max_progress))
  356. job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe)
  357. spins.append((tpe, job))
  358. result = True
  359. for tpe, job in spins:
  360. try:
  361. result = result and future_result(job)
  362. except KeyboardInterrupt:
  363. interrupt_trigger[0] = False
  364. finally:
  365. tpe.shutdown(wait=True)
  366. if not interrupt_trigger[0] and not is_live:
  367. raise KeyboardInterrupt()
  368. # we expect the user wants to stop and DO WANT the preceding postprocessors to run;
  369. # so returning a intermediate result here instead of KeyboardInterrupt on live
  370. return result
  371. def download_and_append_fragments(
  372. self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None,
  373. tpe=None, interrupt_trigger=None):
  374. if not interrupt_trigger:
  375. interrupt_trigger = (True, )
  376. fragment_retries = self.params.get('fragment_retries', 0)
  377. is_fatal = (
  378. ((lambda _: False) if info_dict.get('is_live') else (lambda idx: idx == 0))
  379. if self.params.get('skip_unavailable_fragments', True) else (lambda _: True))
  380. if not pack_func:
  381. pack_func = lambda frag_content, _: frag_content
  382. def download_fragment(fragment, ctx):
  383. if not interrupt_trigger[0]:
  384. return
  385. frag_index = ctx['fragment_index'] = fragment['frag_index']
  386. ctx['last_error'] = None
  387. headers = info_dict.get('http_headers', {}).copy()
  388. byte_range = fragment.get('byte_range')
  389. if byte_range:
  390. headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
  391. # Never skip the first fragment
  392. fatal, count = is_fatal(fragment.get('index') or (frag_index - 1)), 0
  393. while count <= fragment_retries:
  394. try:
  395. ctx['fragment_count'] = fragment.get('fragment_count')
  396. if self._download_fragment(ctx, fragment['url'], info_dict, headers):
  397. break
  398. return
  399. except (compat_urllib_error.HTTPError, http.client.IncompleteRead) as err:
  400. # Unavailable (possibly temporary) fragments may be served.
  401. # First we try to retry then either skip or abort.
  402. # See https://github.com/ytdl-org/youtube-dl/issues/10165,
  403. # https://github.com/ytdl-org/youtube-dl/issues/10448).
  404. count += 1
  405. ctx['last_error'] = err
  406. if count <= fragment_retries:
  407. self.report_retry_fragment(err, frag_index, count, fragment_retries)
  408. except DownloadError:
  409. # Don't retry fragment if error occurred during HTTP downloading
  410. # itself since it has own retry settings
  411. if not fatal:
  412. break
  413. raise
  414. if count > fragment_retries and fatal:
  415. ctx['dest_stream'].close()
  416. self.report_error('Giving up after %s fragment retries' % fragment_retries)
  417. def append_fragment(frag_content, frag_index, ctx):
  418. if frag_content:
  419. self._append_fragment(ctx, pack_func(frag_content, frag_index))
  420. elif not is_fatal(frag_index - 1):
  421. self.report_skip_fragment(frag_index, 'fragment not found')
  422. else:
  423. ctx['dest_stream'].close()
  424. self.report_error(f'fragment {frag_index} not found, unable to continue')
  425. return False
  426. return True
  427. decrypt_fragment = self.decrypter(info_dict)
  428. max_workers = math.ceil(
  429. self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
  430. if max_workers > 1:
  431. def _download_fragment(fragment):
  432. ctx_copy = ctx.copy()
  433. download_fragment(fragment, ctx_copy)
  434. return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized')
  435. self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome')
  436. with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
  437. try:
  438. for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments):
  439. ctx.update({
  440. 'fragment_filename_sanitized': frag_filename,
  441. 'fragment_index': frag_index,
  442. })
  443. if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx):
  444. return False
  445. except KeyboardInterrupt:
  446. self._finish_multiline_status()
  447. self.report_error(
  448. 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False)
  449. pool.shutdown(wait=False)
  450. raise
  451. else:
  452. for fragment in fragments:
  453. if not interrupt_trigger[0]:
  454. break
  455. try:
  456. download_fragment(fragment, ctx)
  457. result = append_fragment(
  458. decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx)
  459. except KeyboardInterrupt:
  460. if info_dict.get('is_live'):
  461. break
  462. raise
  463. if not result:
  464. return False
  465. if finish_func is not None:
  466. ctx['dest_stream'].write(finish_func())
  467. ctx['dest_stream'].flush()
  468. self._finish_frag_download(ctx, info_dict)
  469. return True