fragment.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. import concurrent.futures
  2. import contextlib
  3. import json
  4. import math
  5. import os
  6. import struct
  7. import time
  8. from .common import FileDownloader
  9. from .http import HttpFD
  10. from ..aes import aes_cbc_decrypt_bytes, unpad_pkcs7
  11. from ..compat import compat_os_name
  12. from ..networking import Request
  13. from ..networking.exceptions import HTTPError, IncompleteRead
  14. from ..utils import DownloadError, RetryManager, encodeFilename, traverse_obj
  15. from ..utils.networking import HTTPHeaderDict
  16. from ..utils.progress import ProgressCalculator
  17. class HttpQuietDownloader(HttpFD):
  18. def to_screen(self, *args, **kargs):
  19. pass
  20. to_console_title = to_screen
  21. class FragmentFD(FileDownloader):
  22. """
  23. A base file downloader class for fragmented media (e.g. f4m/m3u8 manifests).
  24. Available options:
  25. fragment_retries: Number of times to retry a fragment for HTTP error
  26. (DASH and hlsnative only). Default is 0 for API, but 10 for CLI
  27. skip_unavailable_fragments:
  28. Skip unavailable fragments (DASH and hlsnative only)
  29. keep_fragments: Keep downloaded fragments on disk after downloading is
  30. finished
  31. concurrent_fragment_downloads: The number of threads to use for native hls and dash downloads
  32. _no_ytdl_file: Don't use .ytdl file
  33. For each incomplete fragment download yt-dlp keeps on disk a special
  34. bookkeeping file with download state and metadata (in future such files will
  35. be used for any incomplete download handled by yt-dlp). This file is
  36. used to properly handle resuming, check download file consistency and detect
  37. potential errors. The file has a .ytdl extension and represents a standard
  38. JSON file of the following format:
  39. extractor:
  40. Dictionary of extractor related data. TBD.
  41. downloader:
  42. Dictionary of downloader related data. May contain following data:
  43. current_fragment:
  44. Dictionary with current (being downloaded) fragment data:
  45. index: 0-based index of current fragment among all fragments
  46. fragment_count:
  47. Total count of fragments
  48. This feature is experimental and file format may change in future.
  49. """
  50. def report_retry_fragment(self, err, frag_index, count, retries):
  51. self.deprecation_warning('yt_dlp.downloader.FragmentFD.report_retry_fragment is deprecated. '
  52. 'Use yt_dlp.downloader.FileDownloader.report_retry instead')
  53. return self.report_retry(err, count, retries, frag_index)
  54. def report_skip_fragment(self, frag_index, err=None):
  55. err = f' {err};' if err else ''
  56. self.to_screen(f'[download]{err} Skipping fragment {frag_index:d} ...')
  57. def _prepare_url(self, info_dict, url):
  58. headers = info_dict.get('http_headers')
  59. return Request(url, None, headers) if headers else url
  60. def _prepare_and_start_frag_download(self, ctx, info_dict):
  61. self._prepare_frag_download(ctx)
  62. self._start_frag_download(ctx, info_dict)
  63. def __do_ytdl_file(self, ctx):
  64. return ctx['live'] is not True and ctx['tmpfilename'] != '-' and not self.params.get('_no_ytdl_file')
  65. def _read_ytdl_file(self, ctx):
  66. assert 'ytdl_corrupt' not in ctx
  67. stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'r')
  68. try:
  69. ytdl_data = json.loads(stream.read())
  70. ctx['fragment_index'] = ytdl_data['downloader']['current_fragment']['index']
  71. if 'extra_state' in ytdl_data['downloader']:
  72. ctx['extra_state'] = ytdl_data['downloader']['extra_state']
  73. except Exception:
  74. ctx['ytdl_corrupt'] = True
  75. finally:
  76. stream.close()
  77. def _write_ytdl_file(self, ctx):
  78. frag_index_stream, _ = self.sanitize_open(self.ytdl_filename(ctx['filename']), 'w')
  79. try:
  80. downloader = {
  81. 'current_fragment': {
  82. 'index': ctx['fragment_index'],
  83. },
  84. }
  85. if 'extra_state' in ctx:
  86. downloader['extra_state'] = ctx['extra_state']
  87. if ctx.get('fragment_count') is not None:
  88. downloader['fragment_count'] = ctx['fragment_count']
  89. frag_index_stream.write(json.dumps({'downloader': downloader}))
  90. finally:
  91. frag_index_stream.close()
  92. def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_data=None):
  93. fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], ctx['fragment_index'])
  94. fragment_info_dict = {
  95. 'url': frag_url,
  96. 'http_headers': headers or info_dict.get('http_headers'),
  97. 'request_data': request_data,
  98. 'ctx_id': ctx.get('ctx_id'),
  99. }
  100. frag_resume_len = 0
  101. if ctx['dl'].params.get('continuedl', True):
  102. frag_resume_len = self.filesize_or_none(self.temp_name(fragment_filename))
  103. fragment_info_dict['frag_resume_len'] = ctx['frag_resume_len'] = frag_resume_len
  104. success, _ = ctx['dl'].download(fragment_filename, fragment_info_dict)
  105. if not success:
  106. return False
  107. if fragment_info_dict.get('filetime'):
  108. ctx['fragment_filetime'] = fragment_info_dict.get('filetime')
  109. ctx['fragment_filename_sanitized'] = fragment_filename
  110. return True
  111. def _read_fragment(self, ctx):
  112. if not ctx.get('fragment_filename_sanitized'):
  113. return None
  114. try:
  115. down, frag_sanitized = self.sanitize_open(ctx['fragment_filename_sanitized'], 'rb')
  116. except FileNotFoundError:
  117. if ctx.get('live'):
  118. return None
  119. raise
  120. ctx['fragment_filename_sanitized'] = frag_sanitized
  121. frag_content = down.read()
  122. down.close()
  123. return frag_content
  124. def _append_fragment(self, ctx, frag_content):
  125. try:
  126. ctx['dest_stream'].write(frag_content)
  127. ctx['dest_stream'].flush()
  128. finally:
  129. if self.__do_ytdl_file(ctx):
  130. self._write_ytdl_file(ctx)
  131. if not self.params.get('keep_fragments', False):
  132. self.try_remove(encodeFilename(ctx['fragment_filename_sanitized']))
  133. del ctx['fragment_filename_sanitized']
  134. def _prepare_frag_download(self, ctx):
  135. if not ctx.setdefault('live', False):
  136. total_frags_str = '%d' % ctx['total_frags']
  137. ad_frags = ctx.get('ad_frags', 0)
  138. if ad_frags:
  139. total_frags_str += ' (not including %d ad)' % ad_frags
  140. else:
  141. total_frags_str = 'unknown (live)'
  142. self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
  143. self.report_destination(ctx['filename'])
  144. dl = HttpQuietDownloader(self.ydl, {
  145. **self.params,
  146. 'noprogress': True,
  147. 'test': False,
  148. 'sleep_interval': 0,
  149. 'max_sleep_interval': 0,
  150. 'sleep_interval_subtitles': 0,
  151. })
  152. tmpfilename = self.temp_name(ctx['filename'])
  153. open_mode = 'wb'
  154. # Establish possible resume length
  155. resume_len = self.filesize_or_none(tmpfilename)
  156. if resume_len > 0:
  157. open_mode = 'ab'
  158. # Should be initialized before ytdl file check
  159. ctx.update({
  160. 'tmpfilename': tmpfilename,
  161. 'fragment_index': 0,
  162. })
  163. if self.__do_ytdl_file(ctx):
  164. ytdl_file_exists = os.path.isfile(encodeFilename(self.ytdl_filename(ctx['filename'])))
  165. continuedl = self.params.get('continuedl', True)
  166. if continuedl and ytdl_file_exists:
  167. self._read_ytdl_file(ctx)
  168. is_corrupt = ctx.get('ytdl_corrupt') is True
  169. is_inconsistent = ctx['fragment_index'] > 0 and resume_len == 0
  170. if is_corrupt or is_inconsistent:
  171. message = (
  172. '.ytdl file is corrupt' if is_corrupt else
  173. 'Inconsistent state of incomplete fragment download')
  174. self.report_warning(
  175. f'{message}. Restarting from the beginning ...')
  176. ctx['fragment_index'] = resume_len = 0
  177. if 'ytdl_corrupt' in ctx:
  178. del ctx['ytdl_corrupt']
  179. self._write_ytdl_file(ctx)
  180. else:
  181. if not continuedl:
  182. if ytdl_file_exists:
  183. self._read_ytdl_file(ctx)
  184. ctx['fragment_index'] = resume_len = 0
  185. self._write_ytdl_file(ctx)
  186. assert ctx['fragment_index'] == 0
  187. dest_stream, tmpfilename = self.sanitize_open(tmpfilename, open_mode)
  188. ctx.update({
  189. 'dl': dl,
  190. 'dest_stream': dest_stream,
  191. 'tmpfilename': tmpfilename,
  192. # Total complete fragments downloaded so far in bytes
  193. 'complete_frags_downloaded_bytes': resume_len,
  194. })
  195. def _start_frag_download(self, ctx, info_dict):
  196. resume_len = ctx['complete_frags_downloaded_bytes']
  197. total_frags = ctx['total_frags']
  198. ctx_id = ctx.get('ctx_id')
  199. # Stores the download progress, updated by the progress hook
  200. state = {
  201. 'status': 'downloading',
  202. 'downloaded_bytes': resume_len,
  203. 'fragment_index': ctx['fragment_index'],
  204. 'fragment_count': total_frags,
  205. 'filename': ctx['filename'],
  206. 'tmpfilename': ctx['tmpfilename'],
  207. }
  208. ctx['started'] = time.time()
  209. progress = ProgressCalculator(resume_len)
  210. def frag_progress_hook(s):
  211. if s['status'] not in ('downloading', 'finished'):
  212. return
  213. if not total_frags and ctx.get('fragment_count'):
  214. state['fragment_count'] = ctx['fragment_count']
  215. if ctx_id is not None and s.get('ctx_id') != ctx_id:
  216. return
  217. state['max_progress'] = ctx.get('max_progress')
  218. state['progress_idx'] = ctx.get('progress_idx')
  219. state['elapsed'] = progress.elapsed
  220. frag_total_bytes = s.get('total_bytes') or 0
  221. s['fragment_info_dict'] = s.pop('info_dict', {})
  222. # XXX: Fragment resume is not accounted for here
  223. if not ctx['live']:
  224. estimated_size = (
  225. (ctx['complete_frags_downloaded_bytes'] + frag_total_bytes)
  226. / (state['fragment_index'] + 1) * total_frags)
  227. progress.total = estimated_size
  228. progress.update(s.get('downloaded_bytes'))
  229. state['total_bytes_estimate'] = progress.total
  230. else:
  231. progress.update(s.get('downloaded_bytes'))
  232. if s['status'] == 'finished':
  233. state['fragment_index'] += 1
  234. ctx['fragment_index'] = state['fragment_index']
  235. progress.thread_reset()
  236. state['downloaded_bytes'] = ctx['complete_frags_downloaded_bytes'] = progress.downloaded
  237. state['speed'] = ctx['speed'] = progress.speed.smooth
  238. state['eta'] = progress.eta.smooth
  239. self._hook_progress(state, info_dict)
  240. ctx['dl'].add_progress_hook(frag_progress_hook)
  241. return ctx['started']
  242. def _finish_frag_download(self, ctx, info_dict):
  243. ctx['dest_stream'].close()
  244. if self.__do_ytdl_file(ctx):
  245. self.try_remove(self.ytdl_filename(ctx['filename']))
  246. elapsed = time.time() - ctx['started']
  247. to_file = ctx['tmpfilename'] != '-'
  248. if to_file:
  249. downloaded_bytes = self.filesize_or_none(ctx['tmpfilename'])
  250. else:
  251. downloaded_bytes = ctx['complete_frags_downloaded_bytes']
  252. if not downloaded_bytes:
  253. if to_file:
  254. self.try_remove(ctx['tmpfilename'])
  255. self.report_error('The downloaded file is empty')
  256. return False
  257. elif to_file:
  258. self.try_rename(ctx['tmpfilename'], ctx['filename'])
  259. filetime = ctx.get('fragment_filetime')
  260. if self.params.get('updatetime', True) and filetime:
  261. with contextlib.suppress(Exception):
  262. os.utime(ctx['filename'], (time.time(), filetime))
  263. self._hook_progress({
  264. 'downloaded_bytes': downloaded_bytes,
  265. 'total_bytes': downloaded_bytes,
  266. 'filename': ctx['filename'],
  267. 'status': 'finished',
  268. 'elapsed': elapsed,
  269. 'ctx_id': ctx.get('ctx_id'),
  270. 'max_progress': ctx.get('max_progress'),
  271. 'progress_idx': ctx.get('progress_idx'),
  272. }, info_dict)
  273. return True
  274. def _prepare_external_frag_download(self, ctx):
  275. if 'live' not in ctx:
  276. ctx['live'] = False
  277. if not ctx['live']:
  278. total_frags_str = '%d' % ctx['total_frags']
  279. ad_frags = ctx.get('ad_frags', 0)
  280. if ad_frags:
  281. total_frags_str += ' (not including %d ad)' % ad_frags
  282. else:
  283. total_frags_str = 'unknown (live)'
  284. self.to_screen(f'[{self.FD_NAME}] Total fragments: {total_frags_str}')
  285. tmpfilename = self.temp_name(ctx['filename'])
  286. # Should be initialized before ytdl file check
  287. ctx.update({
  288. 'tmpfilename': tmpfilename,
  289. 'fragment_index': 0,
  290. })
  291. def decrypter(self, info_dict):
  292. _key_cache = {}
  293. def _get_key(url):
  294. if url not in _key_cache:
  295. _key_cache[url] = self.ydl.urlopen(self._prepare_url(info_dict, url)).read()
  296. return _key_cache[url]
  297. def decrypt_fragment(fragment, frag_content):
  298. if frag_content is None:
  299. return
  300. decrypt_info = fragment.get('decrypt_info')
  301. if not decrypt_info or decrypt_info['METHOD'] != 'AES-128':
  302. return frag_content
  303. iv = decrypt_info.get('IV') or struct.pack('>8xq', fragment['media_sequence'])
  304. decrypt_info['KEY'] = (decrypt_info.get('KEY')
  305. or _get_key(traverse_obj(info_dict, ('hls_aes', 'uri')) or decrypt_info['URI']))
  306. # Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
  307. # size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
  308. # not what it decrypts to.
  309. if self.params.get('test', False):
  310. return frag_content
  311. return unpad_pkcs7(aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv))
  312. return decrypt_fragment
  313. def download_and_append_fragments_multiple(self, *args, **kwargs):
  314. """
  315. @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ...
  316. all args must be either tuple or list
  317. """
  318. interrupt_trigger = [True]
  319. max_progress = len(args)
  320. if max_progress == 1:
  321. return self.download_and_append_fragments(*args[0], **kwargs)
  322. max_workers = self.params.get('concurrent_fragment_downloads', 1)
  323. if max_progress > 1:
  324. self._prepare_multiline_status(max_progress)
  325. is_live = any(traverse_obj(args, (..., 2, 'is_live')))
  326. def thread_func(idx, ctx, fragments, info_dict, tpe):
  327. ctx['max_progress'] = max_progress
  328. ctx['progress_idx'] = idx
  329. return self.download_and_append_fragments(
  330. ctx, fragments, info_dict, **kwargs, tpe=tpe, interrupt_trigger=interrupt_trigger)
  331. class FTPE(concurrent.futures.ThreadPoolExecutor):
  332. # has to stop this or it's going to wait on the worker thread itself
  333. def __exit__(self, exc_type, exc_val, exc_tb):
  334. pass
  335. if compat_os_name == 'nt':
  336. def future_result(future):
  337. while True:
  338. try:
  339. return future.result(0.1)
  340. except KeyboardInterrupt:
  341. raise
  342. except concurrent.futures.TimeoutError:
  343. continue
  344. else:
  345. def future_result(future):
  346. return future.result()
  347. def interrupt_trigger_iter(fg):
  348. for f in fg:
  349. if not interrupt_trigger[0]:
  350. break
  351. yield f
  352. spins = []
  353. for idx, (ctx, fragments, info_dict) in enumerate(args):
  354. tpe = FTPE(math.ceil(max_workers / max_progress))
  355. job = tpe.submit(thread_func, idx, ctx, interrupt_trigger_iter(fragments), info_dict, tpe)
  356. spins.append((tpe, job))
  357. result = True
  358. for tpe, job in spins:
  359. try:
  360. result = result and future_result(job)
  361. except KeyboardInterrupt:
  362. interrupt_trigger[0] = False
  363. finally:
  364. tpe.shutdown(wait=True)
  365. if not interrupt_trigger[0] and not is_live:
  366. raise KeyboardInterrupt
  367. # we expect the user wants to stop and DO WANT the preceding postprocessors to run;
  368. # so returning a intermediate result here instead of KeyboardInterrupt on live
  369. return result
  370. def download_and_append_fragments(
  371. self, ctx, fragments, info_dict, *, is_fatal=(lambda idx: False),
  372. pack_func=(lambda content, idx: content), finish_func=None,
  373. tpe=None, interrupt_trigger=(True, )):
  374. if not self.params.get('skip_unavailable_fragments', True):
  375. is_fatal = lambda _: True
  376. def download_fragment(fragment, ctx):
  377. if not interrupt_trigger[0]:
  378. return
  379. frag_index = ctx['fragment_index'] = fragment['frag_index']
  380. ctx['last_error'] = None
  381. headers = HTTPHeaderDict(info_dict.get('http_headers'))
  382. byte_range = fragment.get('byte_range')
  383. if byte_range:
  384. headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
  385. # Never skip the first fragment
  386. fatal = is_fatal(fragment.get('index') or (frag_index - 1))
  387. def error_callback(err, count, retries):
  388. if fatal and count > retries:
  389. ctx['dest_stream'].close()
  390. self.report_retry(err, count, retries, frag_index, fatal)
  391. ctx['last_error'] = err
  392. for retry in RetryManager(self.params.get('fragment_retries'), error_callback):
  393. try:
  394. ctx['fragment_count'] = fragment.get('fragment_count')
  395. if not self._download_fragment(
  396. ctx, fragment['url'], info_dict, headers, info_dict.get('request_data')):
  397. return
  398. except (HTTPError, IncompleteRead) as err:
  399. retry.error = err
  400. continue
  401. except DownloadError: # has own retry settings
  402. if fatal:
  403. raise
  404. def append_fragment(frag_content, frag_index, ctx):
  405. if frag_content:
  406. self._append_fragment(ctx, pack_func(frag_content, frag_index))
  407. elif not is_fatal(frag_index - 1):
  408. self.report_skip_fragment(frag_index, 'fragment not found')
  409. else:
  410. ctx['dest_stream'].close()
  411. self.report_error(f'fragment {frag_index} not found, unable to continue')
  412. return False
  413. return True
  414. decrypt_fragment = self.decrypter(info_dict)
  415. max_workers = math.ceil(
  416. self.params.get('concurrent_fragment_downloads', 1) / ctx.get('max_progress', 1))
  417. if max_workers > 1:
  418. def _download_fragment(fragment):
  419. ctx_copy = ctx.copy()
  420. download_fragment(fragment, ctx_copy)
  421. return fragment, fragment['frag_index'], ctx_copy.get('fragment_filename_sanitized')
  422. with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
  423. try:
  424. for fragment, frag_index, frag_filename in pool.map(_download_fragment, fragments):
  425. ctx.update({
  426. 'fragment_filename_sanitized': frag_filename,
  427. 'fragment_index': frag_index,
  428. })
  429. if not append_fragment(decrypt_fragment(fragment, self._read_fragment(ctx)), frag_index, ctx):
  430. return False
  431. except KeyboardInterrupt:
  432. self._finish_multiline_status()
  433. self.report_error(
  434. 'Interrupted by user. Waiting for all threads to shutdown...', is_error=False, tb=False)
  435. pool.shutdown(wait=False)
  436. raise
  437. else:
  438. for fragment in fragments:
  439. if not interrupt_trigger[0]:
  440. break
  441. try:
  442. download_fragment(fragment, ctx)
  443. result = append_fragment(
  444. decrypt_fragment(fragment, self._read_fragment(ctx)), fragment['frag_index'], ctx)
  445. except KeyboardInterrupt:
  446. if info_dict.get('is_live'):
  447. break
  448. raise
  449. if not result:
  450. return False
  451. if finish_func is not None:
  452. ctx['dest_stream'].write(finish_func())
  453. ctx['dest_stream'].flush()
  454. return self._finish_frag_download(ctx, info_dict)