websocket.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import asyncio
  2. import contextlib
  3. import os
  4. import signal
  5. import threading
  6. from .common import FileDownloader
  7. from .external import FFmpegFD
  8. from ..dependencies import websockets
  9. class FFmpegSinkFD(FileDownloader):
  10. """ A sink to ffmpeg for downloading fragments in any form """
  11. def real_download(self, filename, info_dict):
  12. info_copy = info_dict.copy()
  13. info_copy['url'] = '-'
  14. async def call_conn(proc, stdin):
  15. try:
  16. await self.real_connection(stdin, info_dict)
  17. except OSError:
  18. pass
  19. finally:
  20. with contextlib.suppress(OSError):
  21. stdin.flush()
  22. stdin.close()
  23. os.kill(os.getpid(), signal.SIGINT)
  24. class FFmpegStdinFD(FFmpegFD):
  25. @classmethod
  26. def get_basename(cls):
  27. return FFmpegFD.get_basename()
  28. def on_process_started(self, proc, stdin):
  29. thread = threading.Thread(target=asyncio.run, daemon=True, args=(call_conn(proc, stdin), ))
  30. thread.start()
  31. return FFmpegStdinFD(self.ydl, self.params or {}).download(filename, info_copy)
  32. async def real_connection(self, sink, info_dict):
  33. """ Override this in subclasses """
  34. raise NotImplementedError('This method must be implemented by subclasses')
  35. class WebSocketFragmentFD(FFmpegSinkFD):
  36. async def real_connection(self, sink, info_dict):
  37. async with websockets.connect(info_dict['url'], extra_headers=info_dict.get('http_headers', {})) as ws:
  38. while True:
  39. recv = await ws.recv()
  40. if isinstance(recv, str):
  41. recv = recv.encode('utf8')
  42. sink.write(recv)