_legacy.py 7.2 KB

  1. """No longer used and new code should not use. Exists only for API compat."""
  2. import platform
  3. import struct
  4. import sys
  5. import urllib.error
  6. import urllib.parse
  7. import urllib.request
  8. import zlib
  9. from ._utils import Popen, decode_base_n, preferredencoding
  10. from .networking import escape_rfc3986 # noqa: F401
  11. from .networking import normalize_url as escape_url # noqa: F401
  12. from .traversal import traverse_obj
  13. from ..dependencies import certifi, websockets
  14. from ..networking._helper import make_ssl_context
  15. from ..networking._urllib import HTTPHandler
  16. # isort: split
  17. from .networking import random_user_agent, std_headers # noqa: F401
  18. from ..cookies import YoutubeDLCookieJar # noqa: F401
  19. from ..networking._urllib import PUTRequest # noqa: F401
  20. from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
  21. from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
  22. from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
  23. from ..networking._urllib import ( # noqa: F401
  24. make_socks_conn_class,
  25. update_Request,
  26. )
  27. from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401
  28. has_certifi = bool(certifi)
  29. has_websockets = bool(websockets)
  30. def load_plugins(name, suffix, namespace):
  31. from ..plugins import load_plugins
  32. ret = load_plugins(name, suffix)
  33. namespace.update(ret)
  34. return ret
  35. def traverse_dict(dictn, keys, casesense=True):
  36. return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
  37. def decode_base(value, digits):
  38. return decode_base_n(value, table=digits)
  39. def platform_name():
  40. """ Returns the platform name as a str """
  41. return platform.platform()
  42. def get_subprocess_encoding():
  43. if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
  44. # For subprocess calls, encode with locale encoding
  45. # Refer to http://stackoverflow.com/a/9951851/35070
  46. encoding = preferredencoding()
  47. else:
  48. encoding = sys.getfilesystemencoding()
  49. if encoding is None:
  50. encoding = 'utf-8'
  51. return encoding
  52. # UNUSED
  53. # Based on png2str() written by @gdkchan and improved by @yokrysty
  54. # Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
  55. def decode_png(png_data):
  56. # Reference: https://www.w3.org/TR/PNG/
  57. header = png_data[8:]
  58. if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
  59. raise OSError('Not a valid PNG file.')
  60. int_map = {1: '>B', 2: '>H', 4: '>I'}
  61. unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
  62. chunks = []
  63. while header:
  64. length = unpack_integer(header[:4])
  65. header = header[4:]
  66. chunk_type = header[:4]
  67. header = header[4:]
  68. chunk_data = header[:length]
  69. header = header[length:]
  70. header = header[4:] # Skip CRC
  71. chunks.append({
  72. 'type': chunk_type,
  73. 'length': length,
  74. 'data': chunk_data
  75. })
  76. ihdr = chunks[0]['data']
  77. width = unpack_integer(ihdr[:4])
  78. height = unpack_integer(ihdr[4:8])
  79. idat = b''
  80. for chunk in chunks:
  81. if chunk['type'] == b'IDAT':
  82. idat += chunk['data']
  83. if not idat:
  84. raise OSError('Unable to read PNG data.')
  85. decompressed_data = bytearray(zlib.decompress(idat))
  86. stride = width * 3
  87. pixels = []
  88. def _get_pixel(idx):
  89. x = idx % stride
  90. y = idx // stride
  91. return pixels[y][x]
  92. for y in range(height):
  93. basePos = y * (1 + stride)
  94. filter_type = decompressed_data[basePos]
  95. current_row = []
  96. pixels.append(current_row)
  97. for x in range(stride):
  98. color = decompressed_data[1 + basePos + x]
  99. basex = y * stride + x
  100. left = 0
  101. up = 0
  102. if x > 2:
  103. left = _get_pixel(basex - 3)
  104. if y > 0:
  105. up = _get_pixel(basex - stride)
  106. if filter_type == 1: # Sub
  107. color = (color + left) & 0xff
  108. elif filter_type == 2: # Up
  109. color = (color + up) & 0xff
  110. elif filter_type == 3: # Average
  111. color = (color + ((left + up) >> 1)) & 0xff
  112. elif filter_type == 4: # Paeth
  113. a = left
  114. b = up
  115. c = 0
  116. if x > 2 and y > 0:
  117. c = _get_pixel(basex - stride - 3)
  118. p = a + b - c
  119. pa = abs(p - a)
  120. pb = abs(p - b)
  121. pc = abs(p - c)
  122. if pa <= pb and pa <= pc:
  123. color = (color + a) & 0xff
  124. elif pb <= pc:
  125. color = (color + b) & 0xff
  126. else:
  127. color = (color + c) & 0xff
  128. current_row.append(color)
  129. return width, height, pixels
  130. def register_socks_protocols():
  131. # "Register" SOCKS protocols
  132. # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
  133. # URLs with protocols not in urlparse.uses_netloc are not handled correctly
  134. for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
  135. if scheme not in urllib.parse.uses_netloc:
  136. urllib.parse.uses_netloc.append(scheme)
  137. def handle_youtubedl_headers(headers):
  138. filtered_headers = headers
  139. if 'Youtubedl-no-compression' in filtered_headers:
  140. filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
  141. del filtered_headers['Youtubedl-no-compression']
  142. return filtered_headers
  143. def request_to_url(req):
  144. if isinstance(req, urllib.request.Request):
  145. return req.get_full_url()
  146. else:
  147. return req
  148. def sanitized_Request(url, *args, **kwargs):
  149. from ..utils import extract_basic_auth, sanitize_url
  150. url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
  151. if auth_header is not None:
  152. headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
  153. headers['Authorization'] = auth_header
  154. return urllib.request.Request(url, *args, **kwargs)
  155. class YoutubeDLHandler(HTTPHandler):
  156. def __init__(self, params, *args, **kwargs):
  157. self._params = params
  158. super().__init__(*args, **kwargs)
  159. YoutubeDLHTTPSHandler = YoutubeDLHandler
  160. class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
  161. def __init__(self, cookiejar=None):
  162. urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
  163. def http_response(self, request, response):
  164. return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
  165. https_request = urllib.request.HTTPCookieProcessor.http_request
  166. https_response = http_response
  167. def make_HTTPS_handler(params, **kwargs):
  168. return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
  169. verify=not params.get('nocheckcertificate'),
  170. client_certificate=params.get('client_certificate'),
  171. client_certificate_key=params.get('client_certificate_key'),
  172. client_certificate_password=params.get('client_certificate_password'),
  173. legacy_support=params.get('legacyserverconnect'),
  174. use_certifi='no-certifi' not in params.get('compat_opts', []),
  175. ), **kwargs)
  176. def process_communicate_or_kill(p, *args, **kwargs):
  177. return Popen.communicate_or_kill(p, *args, **kwargs)