Browse Source

[core] Prevent RCE when using `--exec` with `%q` (CVE-2023-40581)

The shell escape function is now using `""` instead of `\"`. `utils.Popen` has been patched to properly quote commands.

Prior to this fix using `--exec` together with `%q` when on Windows could cause remote code to execute. See https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-42h4-v29r-42qg for reference.

Authored by: Grub4K
Simon Sawicki 1 year ago
parent
commit
de015e9307

+ 5 - 0
devscripts/changelog_override.json

@@ -93,5 +93,10 @@
         "action": "add",
         "when": "c1d71d0d9f41db5e4306c86af232f5f6220a130b",
         "short": "[priority] **The minimum *recommended* Python version has been raised to 3.8**\nSince Python 3.7 has reached end-of-life, support for it will be dropped soon. [Read more](https://github.com/yt-dlp/yt-dlp/issues/7803)"
+    },
+    {
+        "action": "add",
+        "when": "61bdf15fc7400601c3da1aa7a43917310a5bf391",
+        "short": "[priority] Security: [[CVE-2023-40581](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40581)] [Prevent RCE when using `--exec` with `%q` on Windows](https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-42h4-v29r-42qg)\n    - The shell escape function is now using `\"\"` instead of `\\\"`.\n    - `utils.Popen` has been patched to properly quote commands."
     }
 ]

+ 3 - 3
test/test_YoutubeDL.py

@@ -784,9 +784,9 @@ class TestYoutubeDL(unittest.TestCase):
         test('%(title4)#S', 'foo_bar_test')
         test('%(title4).10S', ('foo "bar" ', 'foo "bar"' + ('#' if compat_os_name == 'nt' else ' ')))
         if compat_os_name == 'nt':
-            test('%(title4)q', ('"foo \\"bar\\" test"', ""foo ⧹"bar⧹" test""))
-            test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', '"id 1" "id 2" "id 3"'))
-            test('%(formats.0.id)#q', ('"id 1"', '"id 1"'))
+            test('%(title4)q', ('"foo ""bar"" test"', None))
+            test('%(formats.:.id)#q', ('"id 1" "id 2" "id 3"', None))
+            test('%(formats.0.id)#q', ('"id 1"', None))
         else:
             test('%(title4)q', ('\'foo "bar" test\'', '\'foo "bar" test\''))
             test('%(formats.:.id)#q', "'id 1' 'id 2' 'id 3'")

+ 16 - 0
test/test_utils.py

@@ -14,6 +14,7 @@ import contextlib
 import io
 import itertools
 import json
+import subprocess
 import xml.etree.ElementTree
 
 from yt_dlp.compat import (
@@ -28,6 +29,7 @@ from yt_dlp.utils import (
     InAdvancePagedList,
     LazyList,
     OnDemandPagedList,
+    Popen,
     age_restricted,
     args_to_str,
     base_url,
@@ -2388,6 +2390,20 @@ Line 1
         assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=')
         assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz')
 
+    @unittest.skipUnless(compat_os_name == 'nt', 'Only relevant on Windows')
+    def test_Popen_windows_escaping(self):
+        def run_shell(args):
+            stdout, stderr, error = Popen.run(
+                args, text=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            assert not stderr
+            assert not error
+            return stdout
+
+        # Test escaping
+        assert run_shell(['echo', 'test"&']) == '"test""&"\n'
+        # Test if delayed expansion is disabled
+        assert run_shell(['echo', '^!']) == '"^!"\n'
+        assert run_shell('echo "^!"') == '"^!"\n'
 
 if __name__ == '__main__':
     unittest.main()

+ 1 - 1
yt_dlp/compat/__init__.py

@@ -30,7 +30,7 @@ compat_os_name = os._name if os.name == 'java' else os.name
 if compat_os_name == 'nt':
     def compat_shlex_quote(s):
         import re
-        return s if re.match(r'^[-_\w./]+$', s) else '"%s"' % s.replace('"', '\\"')
+        return s if re.match(r'^[-_\w./]+$', s) else s.replace('"', '""').join('""')
 else:
     from shlex import quote as compat_shlex_quote  # noqa: F401
 

+ 5 - 7
yt_dlp/postprocessor/exec.py

@@ -1,8 +1,6 @@
-import subprocess
-
 from .common import PostProcessor
 from ..compat import compat_shlex_quote
-from ..utils import PostProcessingError, encodeArgument, variadic
+from ..utils import Popen, PostProcessingError, variadic
 
 
 class ExecPP(PostProcessor):
@@ -27,10 +25,10 @@ class ExecPP(PostProcessor):
     def run(self, info):
         for tmpl in self.exec_cmd:
             cmd = self.parse_cmd(tmpl, info)
-            self.to_screen('Executing command: %s' % cmd)
-            retCode = subprocess.call(encodeArgument(cmd), shell=True)
-            if retCode != 0:
-                raise PostProcessingError('Command returned error code %d' % retCode)
+            self.to_screen(f'Executing command: {cmd}')
+            _, _, return_code = Popen.run(cmd, shell=True)
+            if return_code != 0:
+                raise PostProcessingError(f'Command returned error code {return_code}')
         return [], info
 
 

+ 16 - 2
yt_dlp/utils/_utils.py

@@ -825,7 +825,7 @@ class Popen(subprocess.Popen):
         _fix('LD_LIBRARY_PATH')  # Linux
         _fix('DYLD_LIBRARY_PATH')  # macOS
 
-    def __init__(self, *args, env=None, text=False, **kwargs):
+    def __init__(self, args, *remaining, env=None, text=False, shell=False, **kwargs):
         if env is None:
             env = os.environ.copy()
         self._fix_pyinstaller_ld_path(env)
@@ -835,7 +835,21 @@ class Popen(subprocess.Popen):
             kwargs['universal_newlines'] = True  # For 3.6 compatibility
             kwargs.setdefault('encoding', 'utf-8')
             kwargs.setdefault('errors', 'replace')
-        super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
+
+        if shell and compat_os_name == 'nt' and kwargs.get('executable') is None:
+            if not isinstance(args, str):
+                args = ' '.join(compat_shlex_quote(a) for a in args)
+            shell = False
+            args = f'{self.__comspec()} /Q /S /D /V:OFF /C "{args}"'
+
+        super().__init__(args, *remaining, env=env, shell=shell, **kwargs, startupinfo=self._startupinfo)
+
+    def __comspec(self):
+        comspec = os.environ.get('ComSpec') or os.path.join(
+            os.environ.get('SystemRoot', ''), 'System32', 'cmd.exe')
+        if os.path.isabs(comspec):
+            return comspec
+        raise FileNotFoundError('shell not found: neither %ComSpec% nor %SystemRoot% is set')
 
     def communicate_or_kill(self, *args, **kwargs):
         try: