1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485 |
- import unittest
- from test import test_support
- import subprocess32
- subprocess = subprocess32
- import sys
- try:
- import ctypes
- except ImportError:
- ctypes = None
- else:
- import ctypes.util
- import signal
- import os
- import errno
- import tempfile
- import textwrap
- import time
- try:
- import threading
- except ImportError:
- threading = None
- import re
- #import sysconfig
- import select
- import shutil
- try:
- import gc
- except ImportError:
- gc = None
- import pickle
- mswindows = (sys.platform == "win32")
- yenv = '''
- import os
- os.environ['Y_PYTHON_ENTRY_POINT'] = ':main'
- '''
- #
- # Depends on the following external programs: Python
- #
- if mswindows:
- SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
- 'os.O_BINARY);')
- else:
- SETBINARY = ''
- try:
- mkstemp = tempfile.mkstemp
- except AttributeError:
- # tempfile.mkstemp is not available
- def mkstemp():
- """Replacement for mkstemp, calling mktemp."""
- fname = tempfile.mktemp()
- return os.open(fname, os.O_RDWR|os.O_CREAT), fname
- try:
- strip_python_stderr = test_support.strip_python_stderr
- except AttributeError:
- # Copied from the test.test_support module in 2.7.
- def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr).strip()
- return stderr
- class BaseTestCase(unittest.TestCase):
- def setUp(self):
- os.environ['Y_PYTHON_ENTRY_POINT'] = ':main'
- # Try to minimize the number of children we have so this test
- # doesn't crash on some buildbots (Alphas in particular).
- reap_children()
- if not hasattr(unittest.TestCase, 'addCleanup'):
- self._cleanups = []
- def tearDown(self):
- try:
- for inst in subprocess._active:
- inst.wait()
- subprocess._cleanup()
- self.assertFalse(subprocess._active, "subprocess._active not empty")
- finally:
- if self._use_our_own_cleanup_implementation:
- self._doCleanups()
- if not hasattr(unittest.TestCase, 'assertIn'):
- def assertIn(self, a, b, msg=None):
- self.assert_((a in b), msg or ('%r not in %r' % (a, b)))
- def assertNotIn(self, a, b, msg=None):
- self.assert_((a not in b), msg or ('%r in %r' % (a, b)))
- if not hasattr(unittest.TestCase, 'skipTest'):
- def skipTest(self, message):
- """These will still fail but it'll be clear that it is okay."""
- self.fail('SKIPPED - %s\n' % (message,))
- def _addCleanup(self, function, *args, **kwargs):
- """Add a function, with arguments, to be called when the test is
- completed. Functions added are called on a LIFO basis and are
- called after tearDown on test failure or success.
- Unlike unittest2 or python 2.7, cleanups are not if setUp fails.
- That is easier to implement in this subclass and is all we need.
- """
- self._cleanups.append((function, args, kwargs))
- def _doCleanups(self):
- """Execute all cleanup functions. Normally called for you after
- tearDown."""
- while self._cleanups:
- function, args, kwargs = self._cleanups.pop()
- try:
- function(*args, **kwargs)
- except KeyboardInterrupt:
- raise
- except:
- pass
- _use_our_own_cleanup_implementation = False
- if not hasattr(unittest.TestCase, 'addCleanup'):
- _use_our_own_cleanup_implementation = True
- addCleanup = _addCleanup
- def assertStderrEqual(self, stderr, expected, msg=None):
- # In a debug build, stuff like "[6580 refs]" is printed to stderr at
- # shutdown time. That frustrates tests trying to check stderr produced
- # from a spawned Python process.
- actual = strip_python_stderr(stderr)
- # strip_python_stderr also strips whitespace, so we do too.
- expected = expected.strip()
- self.assertEqual(actual, expected, msg)
- class PopenTestException(Exception):
- pass
- class PopenExecuteChildRaises(subprocess32.Popen):
- """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
- _execute_child fails.
- """
- def _execute_child(self, *args, **kwargs):
- raise PopenTestException("Forced Exception for Test")
- class ProcessTestCase(BaseTestCase):
- def test_call_seq(self):
- # call() function with sequence argument
- rc = subprocess.call([sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"])
- self.assertEqual(rc, 47)
- def test_call_timeout(self):
- # call() function with timeout argument; we want to test that the child
- # process gets killed when the timeout expires. If the child isn't
- # killed, this call will deadlock since subprocess.call waits for the
- # child.
- self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
- [sys.executable, "-c", yenv + "while True: pass"],
- timeout=0.1)
- def test_check_call_zero(self):
- # check_call() function with zero return code
- rc = subprocess.check_call([sys.executable, "-c", yenv +
- "import sys; sys.exit(0)"])
- self.assertEqual(rc, 0)
- def test_check_call_nonzero(self):
- # check_call() function with non-zero return code
- try:
- subprocess.check_call([sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"])
- except subprocess.CalledProcessError, c:
- self.assertEqual(c.returncode, 47)
- def test_check_output(self):
- # check_output() function with zero return code
- output = subprocess.check_output(
- [sys.executable, "-c", yenv + "print 'BDFL'"])
- self.assertIn('BDFL', output)
- def test_check_output_nonzero(self):
- # check_call() function with non-zero return code
- try:
- subprocess.check_output(
- [sys.executable, "-c", yenv + "import sys; sys.exit(5)"])
- except subprocess.CalledProcessError, c:
- self.assertEqual(c.returncode, 5)
- def test_check_output_stderr(self):
- # check_output() function stderr redirected to stdout
- output = subprocess.check_output(
- [sys.executable, "-c", yenv + "import sys; sys.stderr.write('BDFL')"],
- stderr=subprocess.STDOUT)
- self.assertIn('BDFL', output)
- def test_check_output_stdout_arg(self):
- # check_output() function stderr redirected to stdout
- try:
- output = subprocess.check_output(
- [sys.executable, "-c", yenv + "print 'will not be run'"],
- stdout=sys.stdout)
- self.fail("Expected ValueError when stdout arg supplied.")
- except ValueError, c:
- self.assertIn('stdout', c.args[0])
- def test_check_output_timeout(self):
- # check_output() function with timeout arg
- try:
- output = subprocess.check_output(
- [sys.executable, "-c", yenv +
- "import sys; sys.stdout.write('BDFL')\n"
- "sys.stdout.flush()\n"
- "while True: pass"],
- timeout=0.5)
- except subprocess.TimeoutExpired, exception:
- self.assertEqual(exception.output, 'BDFL')
- else:
- self.fail("Expected TimeoutExpired.")
- def test_call_kwargs(self):
- # call() function with keyword args
- newenv = os.environ.copy()
- newenv["FRUIT"] = "banana"
- rc = subprocess.call([sys.executable, "-c", yenv +
- 'import sys, os;'
- 'sys.exit(os.getenv("FRUIT")=="banana")'],
- env=newenv)
- self.assertEqual(rc, 1)
- def test_stdin_none(self):
- # .stdin is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", yenv + 'print "banana"'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- p.wait()
- self.assertEqual(p.stdin, None)
- def test_stdout_none(self):
- # .stdout is None when not redirected, and the child's stdout will
- # be inherited from the parent. In order to test this we run a
- # subprocess in a subprocess:
- # this_test
- # \-- subprocess created by this test (parent)
- # \-- subprocess created by the parent subprocess (child)
- # The parent doesn't specify stdout, so the child will use the
- # parent's stdout. This test checks that the message printed by the
- # child goes to the parent stdout. The parent also checks that the
- # child's stdout is None. See #11963.
- code = ('import sys; from subprocess32 import Popen, PIPE;'
- 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
- ' stdin=PIPE, stderr=PIPE);'
- 'p.wait(); assert p.stdout is None;')
- p = subprocess.Popen([sys.executable, "-c", yenv + code],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- out, err = p.communicate()
- self.assertEqual(p.returncode, 0, err)
- self.assertEqual(out.rstrip(), 'test_stdout_none')
- def test_stderr_none(self):
- # .stderr is None when not redirected
- p = subprocess.Popen([sys.executable, "-c", yenv + 'print "banana"'],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- p.wait()
- self.assertEqual(p.stderr, None)
- # For use in the test_cwd* tests below.
- def _normalize_cwd(self, cwd):
- # Normalize an expected cwd (for Tru64 support).
- # We can't use os.path.realpath since it doesn't expand Tru64 {memb}
- # strings. See bug #1063571.
- original_cwd = os.getcwd()
- os.chdir(cwd)
- cwd = os.getcwd()
- os.chdir(original_cwd)
- return cwd
- # For use in the test_cwd* tests below.
- def _split_python_path(self):
- # Return normalized (python_dir, python_base).
- python_path = os.path.realpath(sys.executable)
- return os.path.split(python_path)
- # For use in the test_cwd* tests below.
- def _assert_cwd(self, expected_cwd, python_arg, **kwargs):
- # Invoke Python via Popen, and assert that (1) the call succeeds,
- # and that (2) the current working directory of the child process
- # matches *expected_cwd*.
- p = subprocess.Popen([python_arg, "-c", yenv +
- "import os, sys; "
- "sys.stdout.write(os.getcwd()); "
- "sys.exit(47)"],
- stdout=subprocess.PIPE,
- **kwargs)
- self.addCleanup(p.stdout.close)
- p.wait()
- self.assertEqual(47, p.returncode)
- normcase = os.path.normcase
- self.assertEqual(normcase(expected_cwd),
- normcase(p.stdout.read().decode("utf-8")))
- def test_cwd(self):
- # Check that cwd changes the cwd for the child process.
- temp_dir = tempfile.gettempdir()
- temp_dir = self._normalize_cwd(temp_dir)
- self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir)
- if not mswindows: # pending resolution of issue #15533
- def test_cwd_with_relative_arg(self):
- # Check that Popen looks for args[0] relative to cwd if args[0]
- # is relative.
- python_dir, python_base = self._split_python_path()
- rel_python = os.path.join(os.curdir, python_base)
- path = 'tempcwd'
- saved_dir = os.getcwd()
- os.mkdir(path)
- try:
- os.chdir(path)
- wrong_dir = os.getcwd()
- # Before calling with the correct cwd, confirm that the call fails
- # without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
- [rel_python])
- self.assertRaises(OSError, subprocess.Popen,
- [rel_python], cwd=wrong_dir)
- python_dir = self._normalize_cwd(python_dir)
- self._assert_cwd(python_dir, rel_python, cwd=python_dir)
- finally:
- os.chdir(saved_dir)
- shutil.rmtree(path)
- def test_cwd_with_relative_executable(self):
- # Check that Popen looks for executable relative to cwd if executable
- # is relative (and that executable takes precedence over args[0]).
- python_dir, python_base = self._split_python_path()
- rel_python = os.path.join(os.curdir, python_base)
- doesntexist = "somethingyoudonthave"
- path = 'tempcwd'
- saved_dir = os.getcwd()
- os.mkdir(path)
- try:
- os.chdir(path)
- wrong_dir = os.getcwd()
- # Before calling with the correct cwd, confirm that the call fails
- # without cwd and with the wrong cwd.
- self.assertRaises(OSError, subprocess.Popen,
- [doesntexist], executable=rel_python)
- self.assertRaises(OSError, subprocess.Popen,
- [doesntexist], executable=rel_python,
- cwd=wrong_dir)
- python_dir = self._normalize_cwd(python_dir)
- self._assert_cwd(python_dir, doesntexist, executable=rel_python,
- cwd=python_dir)
- finally:
- os.chdir(saved_dir)
- shutil.rmtree(path)
- def test_cwd_with_absolute_arg(self):
- # Check that Popen can find the executable when the cwd is wrong
- # if args[0] is an absolute path.
- python_dir, python_base = self._split_python_path()
- abs_python = os.path.join(python_dir, python_base)
- rel_python = os.path.join(os.curdir, python_base)
- wrong_dir = tempfile.mkdtemp()
- wrong_dir = os.path.realpath(wrong_dir)
- try:
- # Before calling with an absolute path, confirm that using a
- # relative path fails.
- self.assertRaises(OSError, subprocess.Popen,
- [rel_python], cwd=wrong_dir)
- wrong_dir = self._normalize_cwd(wrong_dir)
- self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir)
- finally:
- shutil.rmtree(wrong_dir)
- def test_executable_with_cwd(self):
- python_dir, python_base = self._split_python_path()
- python_dir = self._normalize_cwd(python_dir)
- self._assert_cwd(python_dir, "somethingyoudonthave",
- executable=sys.executable, cwd=python_dir)
- #@unittest.skipIf(sysconfig.is_python_build(),
- # "need an installed Python. See #7774")
- #def test_executable_without_cwd(self):
- # # For a normal installation, it should work without 'cwd'
- # # argument. For test runs in the build directory, see #7774.
- # self._assert_cwd('', "somethingyoudonthave", executable=sys.executable)
- def test_stdin_pipe(self):
- # stdin redirection
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=subprocess.PIPE)
- p.stdin.write("pear")
- p.stdin.close()
- p.wait()
- self.assertEqual(p.returncode, 1)
- def test_stdin_filedes(self):
- # stdin is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- os.write(d, "pear")
- os.lseek(d, 0, 0)
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=d)
- p.wait()
- self.assertEqual(p.returncode, 1)
- def test_stdin_fileobj(self):
- # stdin is set to open file object
- tf = tempfile.TemporaryFile()
- tf.write("pear")
- tf.seek(0)
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.exit(sys.stdin.read() == "pear")'],
- stdin=tf)
- p.wait()
- self.assertEqual(p.returncode, 1)
- def test_stdout_pipe(self):
- # stdout redirection
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stdout.write("orange")'],
- stdout=subprocess.PIPE)
- self.assertEqual(p.stdout.read(), "orange")
- def test_stdout_filedes(self):
- # stdout is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stdout.write("orange")'],
- stdout=d)
- p.wait()
- os.lseek(d, 0, 0)
- self.assertEqual(os.read(d, 1024), "orange")
- def test_stdout_fileobj(self):
- # stdout is set to open file object
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stdout.write("orange")'],
- stdout=tf)
- p.wait()
- tf.seek(0)
- self.assertEqual(tf.read(), "orange")
- def test_stderr_pipe(self):
- # stderr redirection
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=subprocess.PIPE)
- self.assertStderrEqual(p.stderr.read(), "strawberry")
- def test_stderr_filedes(self):
- # stderr is set to open file descriptor
- tf = tempfile.TemporaryFile()
- d = tf.fileno()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=d)
- p.wait()
- os.lseek(d, 0, 0)
- self.assertStderrEqual(os.read(d, 1024), "strawberry")
- def test_stderr_fileobj(self):
- # stderr is set to open file object
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stderr.write("strawberry")'],
- stderr=tf)
- p.wait()
- tf.seek(0)
- self.assertStderrEqual(tf.read(), "strawberry")
- def test_stderr_redirect_with_no_stdout_redirect(self):
- # test stderr=STDOUT while stdout=None (not set)
- # - grandchild prints to stderr
- # - child redirects grandchild's stderr to its stdout
- # - the parent should get grandchild's stderr in child's stdout
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys, subprocess32 as subprocess;'
- 'rc = subprocess.call([sys.executable, "-c",'
- ' "import sys;"'
- ' "sys.stderr.write(\'42\')"],'
- ' stderr=subprocess.STDOUT);'
- 'sys.exit(rc)'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- #NOTE: stdout should get stderr from grandchild
- self.assertStderrEqual(stdout, '42')
- self.assertStderrEqual(stderr, '') # should be empty
- self.assertEqual(p.returncode, 0)
- def test_stdout_stderr_pipe(self):
- # capture stdout and stderr to the same pipe
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- self.assertStderrEqual(p.stdout.read(), "appleorange")
- def test_stdout_stderr_file(self):
- # capture stdout and stderr to the same open file
- tf = tempfile.TemporaryFile()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdout=tf,
- stderr=tf)
- p.wait()
- tf.seek(0)
- self.assertStderrEqual(tf.read(), "appleorange")
- def test_stdout_filedes_of_stdout(self):
- # stdout is set to 1 (#1531862).
- # To avoid printing the text on stdout, we do something similar to
- # test_stdout_none (see above). The parent subprocess calls the child
- # subprocess passing stdout=1, and this test uses stdout=PIPE in
- # order to capture and check the output of the parent. See #11963.
- code = ('import sys, subprocess32; '
- 'rc = subprocess32.call([sys.executable, "-c", '
- ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
- '\'test with stdout=1\'))"], stdout=1); '
- 'assert rc == 18')
- p = subprocess.Popen([sys.executable, "-c", yenv + code],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- out, err = p.communicate()
- self.assertEqual(p.returncode, 0, err)
- self.assertEqual(out.rstrip(), 'test with stdout=1')
- def test_stdout_devnull(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'for i in range(10240):'
- 'print("x" * 1024)'],
- stdout=subprocess.DEVNULL)
- p.wait()
- self.assertEqual(p.stdout, None)
- def test_stderr_devnull(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys\n'
- 'for i in range(10240):'
- 'sys.stderr.write("x" * 1024)'],
- stderr=subprocess.DEVNULL)
- p.wait()
- self.assertEqual(p.stderr, None)
- def test_stdin_devnull(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.stdin.read(1)'],
- stdin=subprocess.DEVNULL)
- p.wait()
- self.assertEqual(p.stdin, None)
- def test_env(self):
- newenv = os.environ.copy()
- newenv["FRUIT"] = "orange"
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;'
- 'sys.stdout.write(os.getenv("FRUIT"))'],
- stdout=subprocess.PIPE,
- env=newenv)
- try:
- stdout, stderr = p.communicate()
- self.assertEqual(stdout, "orange")
- finally:
- p.__exit__(None, None, None)
- def test_empty_env(self):
- """test_empty_env() - verify that env={} is as empty as possible."""
- def is_env_var_to_ignore(n):
- """Determine if an environment variable is under our control."""
- # This excludes some __CF_* and VERSIONER_* keys MacOS insists
- # on adding even when the environment in exec is empty.
- # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist.
- return ('VERSIONER' in n or '__CF' in n or # MacOS
- n == 'LD_PRELOAD' or n.startswith('SANDBOX')) # Gentoo
- p = subprocess.Popen(
- [sys.executable, '-c',
- 'import os; print(list(os.environ.keys()))'],
- stdout=subprocess.PIPE, env={'Y_PYTHON_ENTRY_POINT': ':main'})
- try:
- stdout, stderr = p.communicate()
- child_env_names = eval(stdout.strip())
- self.assertTrue(isinstance(child_env_names, list),
- msg=repr(child_env_names))
- child_env_names = [k for k in child_env_names
- if not is_env_var_to_ignore(k)]
- self.assertEqual(child_env_names, [])
- finally:
- p.__exit__(None, None, None)
- def test_communicate_stdin(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.exit(sys.stdin.read() == "pear")'],
- stdin=subprocess.PIPE)
- p.communicate("pear")
- self.assertEqual(p.returncode, 1)
- def test_communicate_stdout(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stdout.write("pineapple")'],
- stdout=subprocess.PIPE)
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, "pineapple")
- self.assertEqual(stderr, None)
- def test_communicate_stderr(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; sys.stderr.write("pineapple")'],
- stderr=subprocess.PIPE)
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, None)
- self.assertStderrEqual(stderr, "pineapple")
- def test_communicate(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;'
- 'sys.stderr.write("pineapple");'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- (stdout, stderr) = p.communicate("banana")
- self.assertEqual(stdout, "banana")
- self.assertStderrEqual(stderr, "pineapple")
- def test_communicate_timeout(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os,time;'
- 'sys.stderr.write("pineapple\\n");'
- 'time.sleep(1);'
- 'sys.stderr.write("pear\\n");'
- 'sys.stdout.write(sys.stdin.read())'],
- universal_newlines=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.assertRaises(subprocess.TimeoutExpired, p.communicate, u"banana",
- timeout=0.3)
- # Make sure we can keep waiting for it, and that we get the whole output
- # after it completes.
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, "banana")
- self.assertStderrEqual(stderr.encode(), "pineapple\npear\n")
- def test_communicate_timeout_large_ouput(self):
- # Test a expring timeout while the child is outputting lots of data.
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os,time;'
- 'sys.stdout.write("a" * (64 * 1024));'
- 'time.sleep(0.2);'
- 'sys.stdout.write("a" * (64 * 1024));'
- 'time.sleep(0.2);'
- 'sys.stdout.write("a" * (64 * 1024));'
- 'time.sleep(0.2);'
- 'sys.stdout.write("a" * (64 * 1024));'],
- stdout=subprocess.PIPE)
- self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
- (stdout, _) = p.communicate()
- self.assertEqual(len(stdout), 4 * 64 * 1024)
- # Test for the fd leak reported in http://bugs.python.org/issue2791.
- def test_communicate_pipe_fd_leak(self):
- for stdin_pipe in (False, True):
- for stdout_pipe in (False, True):
- for stderr_pipe in (False, True):
- options = {}
- if stdin_pipe:
- options['stdin'] = subprocess.PIPE
- if stdout_pipe:
- options['stdout'] = subprocess.PIPE
- if stderr_pipe:
- options['stderr'] = subprocess.PIPE
- if not options:
- continue
- p = subprocess.Popen((sys.executable, "-c", yenv + "pass"), **options)
- p.communicate()
- if p.stdin is not None:
- self.assertTrue(p.stdin.closed)
- if p.stdout is not None:
- self.assertTrue(p.stdout.closed)
- if p.stderr is not None:
- self.assertTrue(p.stderr.closed)
- def test_communicate_returns(self):
- # communicate() should return None if no redirection is active
- p = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"])
- (stdout, stderr) = p.communicate()
- self.assertEqual(stdout, None)
- self.assertEqual(stderr, None)
- def test_communicate_pipe_buf(self):
- # communicate() with writes larger than pipe_buf
- # This test will probably deadlock rather than fail, if
- # communicate() does not work properly.
- x, y = os.pipe()
- if mswindows:
- pipe_buf = 512
- else:
- pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
- os.close(x)
- os.close(y)
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read(47));'
- 'sys.stderr.write("xyz"*%d);'
- 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- string_to_write = "abc"*pipe_buf
- (stdout, stderr) = p.communicate(string_to_write)
- self.assertEqual(stdout, string_to_write)
- def test_writes_before_communicate(self):
- # stdin.write before communicate()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;'
- 'sys.stdout.write(sys.stdin.read())'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- p.stdin.write("banana")
- (stdout, stderr) = p.communicate("split")
- self.assertEqual(stdout, "bananasplit")
- self.assertStderrEqual(stderr, "")
- def test_universal_newlines(self):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE,
- universal_newlines=1)
- stdout = p.stdout.read()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
- def test_universal_newlines_communicate(self):
- # universal newlines through communicate()
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;' + SETBINARY +
- 'sys.stdout.write("line1\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line2\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line3\\r\\n");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("line4\\r");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline5");'
- 'sys.stdout.flush();'
- 'sys.stdout.write("\\nline6");'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=1)
- (stdout, stderr) = p.communicate()
- if hasattr(file, 'newlines'):
- # Interpreter with universal newline support
- self.assertEqual(stdout,
- "line1\nline2\nline3\nline4\nline5\nline6")
- else:
- # Interpreter without universal newline support
- self.assertEqual(stdout,
- "line1\nline2\rline3\r\nline4\r\nline5\nline6")
- def test_universal_newlines_communicate_input_none(self):
- # Test communicate(input=None) with universal newlines.
- #
- # We set stdout to PIPE because, as of this writing, a different
- # code path is tested when the number of pipes is zero or one.
- p = subprocess.Popen([sys.executable, "-c", yenv + "pass"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- universal_newlines=True)
- p.communicate()
- self.assertEqual(p.returncode, 0)
- def test_no_leaking(self):
- # Make sure we leak no resources
- if not hasattr(test_support, "is_resource_enabled") \
- or test_support.is_resource_enabled("subprocess") and not mswindows:
- max_handles = 1026 # too much for most UNIX systems
- else:
- max_handles = 65
- for i in range(max_handles):
- p = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys;sys.stdout.write(sys.stdin.read())"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- data = p.communicate("lime")[0]
- self.assertEqual(data, "lime")
- def test_universal_newlines_communicate_stdin_stdout_stderr(self):
- # universal newlines through communicate(), with stdin, stdout, stderr
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys,os;' + SETBINARY + '''\nif True:
- s = sys.stdin.readline()
- sys.stdout.write(s)
- sys.stdout.write("line2\\r")
- sys.stderr.write("eline2\\n")
- s = sys.stdin.read()
- sys.stdout.write(s)
- sys.stdout.write("line4\\n")
- sys.stdout.write("line5\\r\\n")
- sys.stderr.write("eline6\\r")
- sys.stderr.write("eline7\\r\\nz")
- '''],
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE,
- universal_newlines=True)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- (stdout, stderr) = p.communicate(u"line1\nline3\n")
- self.assertEqual(p.returncode, 0)
- self.assertEqual(u"line1\nline2\nline3\nline4\nline5\n", stdout)
- # Python debug build push something like "[42442 refs]\n"
- # to stderr at exit of subprocess.
- # Don't use assertStderrEqual because it strips CR and LF from output.
- self.assertTrue(stderr.startswith(u"eline2\neline6\neline7\n"))
- def test_list2cmdline(self):
- self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
- '"a b c" d e')
- self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
- 'ab\\"c \\ d')
- self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
- 'ab\\"c " \\\\" d')
- self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
- 'a\\\\\\b "de fg" h')
- self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
- 'a\\\\\\"b c d')
- self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
- '"a\\\\b c" d e')
- self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
- '"a\\\\b\\ c" d e')
- self.assertEqual(subprocess.list2cmdline(['ab', '']),
- 'ab ""')
- def test_poll(self):
- p = subprocess.Popen([sys.executable,
- "-c", yenv + "import time; time.sleep(1)"])
- count = 0
- while p.poll() is None:
- time.sleep(0.1)
- count += 1
- # We expect that the poll loop probably went around about 10 times,
- # but, based on system scheduling we can't control, it's possible
- # poll() never returned None. It "should be" very rare that it
- # didn't go around at least twice.
- self.assert_(count >= 2)
- # Subsequent invocations should just return the returncode
- self.assertEqual(p.poll(), 0)
- def test_wait(self):
- p = subprocess.Popen([sys.executable,
- "-c", yenv + "import time; time.sleep(2)"])
- self.assertEqual(p.wait(), 0)
- # Subsequent invocations should just return the returncode
- self.assertEqual(p.wait(), 0)
- def test_wait_timeout(self):
- p = subprocess.Popen([sys.executable,
- "-c", yenv + "import time; time.sleep(0.1)"])
- try:
- p.wait(timeout=0.01)
- except subprocess.TimeoutExpired, e:
- self.assertIn("0.01", str(e)) # For coverage of __str__.
- else:
- self.fail("subprocess.TimeoutExpired expected but not raised.")
- self.assertEqual(p.wait(timeout=2), 0)
- def test_invalid_bufsize(self):
- # an invalid type of the bufsize argument should raise
- # TypeError.
- try:
- subprocess.Popen([sys.executable, "-c", yenv + "pass"], "orange")
- except TypeError:
- pass
- def test_leaking_fds_on_error(self):
- # see bug #5179: Popen leaks file descriptors to PIPEs if
- # the child fails to execute; this will eventually exhaust
- # the maximum number of open fds. 1024 seems a very common
- # value for that limit, but Windows has 2048, so we loop
- # 1024 times (each call leaked two fds).
- for i in range(1024):
- # Windows raises IOError. Others raise OSError.
- try:
- subprocess.Popen(['nonexisting_i_hope'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- except EnvironmentError, c:
- if c.errno != 2: # ignore "no such file"
- raise
- #@unittest.skipIf(threading is None, "threading required")
- def test_threadsafe_wait(self):
- """Issue21291: Popen.wait() needs to be threadsafe for returncode."""
- proc = subprocess.Popen([sys.executable, '-c', yenv +
- 'import time; time.sleep(12)'])
- self.assertEqual(proc.returncode, None)
- results = []
- def kill_proc_timer_thread():
- results.append(('thread-start-poll-result', proc.poll()))
- # terminate it from the thread and wait for the result.
- proc.kill()
- proc.wait()
- results.append(('thread-after-kill-and-wait', proc.returncode))
- # this wait should be a no-op given the above.
- proc.wait()
- results.append(('thread-after-second-wait', proc.returncode))
- # This is a timing sensitive test, the failure mode is
- # triggered when both the main thread and this thread are in
- # the wait() call at once. The delay here is to allow the
- # main thread to most likely be blocked in its wait() call.
- t = threading.Timer(0.2, kill_proc_timer_thread)
- t.start()
- if mswindows:
- expected_errorcode = 1
- else:
- # Should be -9 because of the proc.kill() from the thread.
- expected_errorcode = -9
- # Wait for the process to finish; the thread should kill it
- # long before it finishes on its own. Supplying a timeout
- # triggers a different code path for better coverage.
- proc.wait(timeout=20)
- self.assertEqual(proc.returncode, expected_errorcode,
- msg="unexpected result in wait from main thread")
- # This should be a no-op with no change in returncode.
- proc.wait()
- self.assertEqual(proc.returncode, expected_errorcode,
- msg="unexpected result in second main wait.")
- t.join()
- # Ensure that all of the thread results are as expected.
- # When a race condition occurs in wait(), the returncode could
- # be set by the wrong thread that doesn't actually have it
- # leading to an incorrect value.
- self.assertEqual([('thread-start-poll-result', None),
- ('thread-after-kill-and-wait', expected_errorcode),
- ('thread-after-second-wait', expected_errorcode)],
- results)
- def test_issue8780(self):
- # Ensure that stdout is inherited from the parent
- # if stdout=PIPE is not used
- code = ';'.join((
- 'import subprocess32, sys',
- 'retcode = subprocess32.call('
- "[sys.executable, '-c', 'print(\"Hello World!\")'])",
- 'assert retcode == 0'))
- output = subprocess.check_output([sys.executable, '-c', yenv + code])
- self.assert_(output.startswith('Hello World!'), output)
- def test_communicate_epipe(self):
- # Issue 10963: communicate() should hide EPIPE
- p = subprocess.Popen([sys.executable, "-c", yenv + 'pass'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- p.communicate(b"x" * 2**20)
- def test_communicate_epipe_only_stdin(self):
- # Issue 10963: communicate() should hide EPIPE
- p = subprocess.Popen([sys.executable, "-c", yenv + 'pass'],
- stdin=subprocess.PIPE)
- self.addCleanup(p.stdin.close)
- p.wait()
- p.communicate(b"x" * 2**20)
- if not mswindows: # Signal tests are POSIX specific.
- def test_communicate_eintr(self):
- # Issue #12493: communicate() should handle EINTR
- def handler(signum, frame):
- pass
- old_handler = signal.signal(signal.SIGALRM, handler)
- self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
- # the process is running for 2 seconds
- args = [sys.executable, "-c", yenv + 'import time; time.sleep(2)']
- for stream in ('stdout', 'stderr'):
- kw = {stream: subprocess.PIPE}
- process = subprocess.Popen(args, **kw)
- try:
- signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
- process.communicate()
- finally:
- process.__exit__(None, None, None)
- # This test is Linux-ish specific for simplicity to at least have
- # some coverage. It is not a platform specific bug.
- #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
- # "Linux specific")
- def test_failed_child_execute_fd_leak(self):
- """Test for the fork() failure fd leak reported in issue16327."""
- if not os.path.isdir('/proc/%d/fd' % os.getpid()):
- self.skipTest("Linux specific")
- fd_directory = '/proc/%d/fd' % os.getpid()
- fds_before_popen = os.listdir(fd_directory)
- try:
- PopenExecuteChildRaises(
- [sys.executable, '-c', yenv + 'pass'], stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- except PopenTestException:
- pass # Yay! Because 2.4 doesn't support with statements.
- else:
- self.fail("PopenTestException expected but not raised.")
- # NOTE: This test doesn't verify that the real _execute_child
- # does not close the file descriptors itself on the way out
- # during an exception. Code inspection has confirmed that.
- fds_after_exception = os.listdir(fd_directory)
- self.assertEqual(fds_before_popen, fds_after_exception)
- class RunFuncTestCase(BaseTestCase):
- def run_python(self, code, **kwargs):
- """Run Python code in a subprocess using subprocess.run"""
- argv = [sys.executable, "-c", yenv + code]
- return subprocess.run(argv, **kwargs)
- def test_returncode(self):
- # call() function with sequence argument
- cp = self.run_python("import sys; sys.exit(47)")
- self.assertEqual(cp.returncode, 47)
- try:
- cp.check_returncode()
- except subprocess.CalledProcessError:
- pass
- else:
- self.fail("CalledProcessError not raised")
- def test_check(self):
- try:
- self.run_python("import sys; sys.exit(47)", check=True)
- except subprocess.CalledProcessError, exception:
- self.assertEqual(exception.returncode, 47)
- else:
- self.fail("CalledProcessError not raised")
- def test_check_zero(self):
- # check_returncode shouldn't raise when returncode is zero
- cp = self.run_python("import sys; sys.exit(0)", check=True)
- self.assertEqual(cp.returncode, 0)
- def test_timeout(self):
- # run() function with timeout argument; we want to test that the child
- # process gets killed when the timeout expires. If the child isn't
- # killed, this call will deadlock since subprocess.run waits for the
- # child.
- try:
- self.run_python("while True: pass", timeout=0.0001)
- except subprocess.TimeoutExpired:
- pass
- else:
- self.fail("TimeoutExpired not raised")
- def test_capture_stdout(self):
- # capture stdout with zero return code
- cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE)
- self.assertIn('BDFL', cp.stdout)
- def test_capture_stderr(self):
- cp = self.run_python("import sys; sys.stderr.write('BDFL')",
- stderr=subprocess.PIPE)
- self.assertIn('BDFL', cp.stderr)
- def test_check_output_stdin_arg(self):
- # run() can be called with stdin set to a file
- tf = tempfile.TemporaryFile()
- self.addCleanup(tf.close)
- tf.write('pear')
- tf.seek(0)
- cp = self.run_python(
- "import sys; sys.stdout.write(sys.stdin.read().upper())",
- stdin=tf, stdout=subprocess.PIPE)
- self.assertIn('PEAR', cp.stdout)
- def test_check_output_input_arg(self):
- # check_output() can be called with input set to a string
- cp = self.run_python(
- "import sys; sys.stdout.write(sys.stdin.read().upper())",
- input='pear', stdout=subprocess.PIPE)
- self.assertIn('PEAR', cp.stdout)
- def test_check_output_stdin_with_input_arg(self):
- # run() refuses to accept 'stdin' with 'input'
- tf = tempfile.TemporaryFile()
- self.addCleanup(tf.close)
- tf.write('pear')
- tf.seek(0)
- try:
- output = self.run_python("print('will not be run')",
- stdin=tf, input='hare')
- except ValueError, exception:
- self.assertIn('stdin', exception.args[0])
- self.assertIn('input', exception.args[0])
- else:
- self.fail("Expected ValueError when stdin and input args supplied.")
- def test_check_output_timeout(self):
- try:
- cp = self.run_python((
- "import sys, time\n"
- "sys.stdout.write('BDFL')\n"
- "sys.stdout.flush()\n"
- "time.sleep(3600)"),
- # Some heavily loaded buildbots (sparc Debian 3.x) require
- # this much time to start and print.
- timeout=3, stdout=subprocess.PIPE)
- except subprocess.TimeoutExpired, exception:
- self.assertEqual(exception.output, 'BDFL')
- # output is aliased to stdout
- self.assertEqual(exception.stdout, 'BDFL')
- else:
- self.fail("TimeoutExpired not raised")
- def test_run_kwargs(self):
- newenv = os.environ.copy()
- newenv["FRUIT"] = "banana"
- cp = self.run_python(('import sys, os;'
- 'os.getenv("FRUIT")=="banana" and sys.exit(33) or sys.exit(31)'),
- env=newenv)
- self.assertEqual(cp.returncode, 33)
- # context manager
- class _SuppressCoreFiles(object):
- """Try to prevent core files from being created."""
- old_limit = None
- def __enter__(self):
- """Try to save previous ulimit, then set it to (0, 0)."""
- try:
- import resource
- self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
- resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
- except (ImportError, ValueError, resource.error):
- pass
- def __exit__(self, *args):
- """Return core file behavior to default."""
- if self.old_limit is None:
- return
- try:
- import resource
- resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
- except (ImportError, ValueError, resource.error):
- pass
- #@unittest.skipIf(mswindows, "POSIX specific tests")
- class POSIXProcessTestCase(BaseTestCase):
- def setUp(self):
- BaseTestCase.setUp(self)
- self._nonexistent_dir = "/_this/pa.th/does/not/exist"
- def _get_chdir_exception(self):
- try:
- os.chdir(self._nonexistent_dir)
- except OSError, e:
- # This avoids hard coding the errno value or the OS perror()
- # string and instead capture the exception that we want to see
- # below for comparison.
- desired_exception = e
- desired_exception.strerror += ': ' + repr(self._nonexistent_dir)
- else:
- self.fail("chdir to nonexistant directory %s succeeded." %
- self._nonexistent_dir)
- return desired_exception
- def test_exception_cwd(self):
- """Test error in the child raised in the parent for a bad cwd."""
- desired_exception = self._get_chdir_exception()
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + ""],
- cwd=self._nonexistent_dir)
- except OSError, e:
- # Test that the child process chdir failure actually makes
- # it up to the parent process as the correct exception.
- self.assertEqual(desired_exception.errno, e.errno)
- self.assertEqual(desired_exception.strerror, e.strerror)
- else:
- self.fail("Expected OSError: %s" % desired_exception)
- def test_exception_bad_executable(self):
- """Test error in the child raised in the parent for a bad executable."""
- desired_exception = self._get_chdir_exception()
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + ""],
- executable=self._nonexistent_dir)
- except OSError, e:
- # Test that the child process exec failure actually makes
- # it up to the parent process as the correct exception.
- self.assertEqual(desired_exception.errno, e.errno)
- self.assertEqual(desired_exception.strerror, e.strerror)
- else:
- self.fail("Expected OSError: %s" % desired_exception)
- def test_exception_bad_args_0(self):
- """Test error in the child raised in the parent for a bad args[0]."""
- desired_exception = self._get_chdir_exception()
- try:
- p = subprocess.Popen([self._nonexistent_dir, "-c", yenv + ""])
- except OSError, e:
- # Test that the child process exec failure actually makes
- # it up to the parent process as the correct exception.
- self.assertEqual(desired_exception.errno, e.errno)
- self.assertEqual(desired_exception.strerror, e.strerror)
- else:
- self.fail("Expected OSError: %s" % desired_exception)
- #@unittest.skipIf(not os.path.exists('/proc/self/status'))
- def test_restore_signals(self):
- if not os.path.exists('/proc/self/status'):
- print("SKIP - Functional test requires /proc/self/status.")
- return
- # Blindly assume that cat exists on systems with /proc/self/status...
- default_proc_status = subprocess.check_output(
- ['cat', '/proc/self/status'],
- restore_signals=False)
- for line in default_proc_status.splitlines():
- if line.startswith(b'SigIgn'):
- default_sig_ign_mask = line
- break
- else:
- self.skipTest("SigIgn not found in /proc/self/status.")
- restored_proc_status = subprocess.check_output(
- ['cat', '/proc/self/status'],
- restore_signals=True)
- for line in restored_proc_status.splitlines():
- if line.startswith(b'SigIgn'):
- restored_sig_ign_mask = line
- break
- # restore_signals=True should've unblocked SIGPIPE and friends.
- self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask)
- def test_start_new_session(self):
- # For code coverage of calling setsid(). We don't care if we get an
- # EPERM error from it depending on the test execution environment, that
- # still indicates that it was called.
- try:
- output = subprocess.check_output(
- [sys.executable, "-c", yenv +
- "import os; print(os.getpgid(os.getpid()))"],
- start_new_session=True)
- except OSError, e:
- if e.errno != errno.EPERM:
- raise
- else:
- parent_pgid = os.getpgid(os.getpid())
- child_pgid = int(output)
- self.assertNotEqual(parent_pgid, child_pgid)
- def test_run_abort(self):
- # returncode handles signal termination
- scf = _SuppressCoreFiles()
- scf.__enter__()
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv +
- "import os; os.abort()"])
- p.wait()
- finally:
- scf.__exit__()
- self.assertEqual(-p.returncode, signal.SIGABRT)
- def test_preexec(self):
- # DISCLAIMER: Setting environment variables is *not* a good use
- # of a preexec_fn. This is merely a test.
- p = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys, os;"
- "sys.stdout.write(os.getenv('FRUIT'))"],
- stdout=subprocess.PIPE,
- preexec_fn=lambda: os.putenv("FRUIT", "apple"))
- self.assertEqual(p.stdout.read(), "apple")
- def test_preexec_exception(self):
- def raise_it():
- raise ValueError("What if two swallows carried a coconut?")
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + ""],
- preexec_fn=raise_it)
- except RuntimeError, e:
- self.assertTrue(
- subprocess._posixsubprocess,
- "Expected a ValueError from the preexec_fn")
- except ValueError, e:
- self.assertIn("coconut", e.args[0])
- else:
- self.fail("Exception raised by preexec_fn did not make it "
- "to the parent process.")
- class _TestExecuteChildPopen(subprocess.Popen):
- """Used to test behavior at the end of _execute_child."""
- def __init__(self, testcase, *args, **kwargs):
- self._testcase = testcase
- subprocess.Popen.__init__(self, *args, **kwargs)
- def _execute_child(self, *args, **kwargs):
- try:
- subprocess.Popen._execute_child(self, *args, **kwargs)
- finally:
- # Open a bunch of file descriptors and verify that
- # none of them are the same as the ones the Popen
- # instance is using for stdin/stdout/stderr.
- devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
- for _ in range(8)]
- try:
- for fd in devzero_fds:
- self._testcase.assertNotIn(
- fd, (self.stdin.fileno(), self.stdout.fileno(),
- self.stderr.fileno()),
- msg="At least one fd was closed early.")
- finally:
- map(os.close, devzero_fds)
- #@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
- def test_preexec_errpipe_does_not_double_close_pipes(self):
- """Issue16140: Don't double close pipes on preexec error."""
- def raise_it():
- raise RuntimeError("force the _execute_child() errpipe_data path.")
- try:
- self._TestExecuteChildPopen(
- self, [sys.executable, "-c", yenv + "pass"],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, preexec_fn=raise_it)
- except RuntimeError:
- pass # Yay! Because 2.4 doesn't support with statements.
- else:
- self.fail("RuntimeError expected but not raised.")
- #@unittest.skipUnless(gc, "Requires a gc module.")
- def test_preexec_gc_module_failure(self):
- # This tests the code that disables garbage collection if the child
- # process will execute any Python.
- def raise_runtime_error():
- raise RuntimeError("this shouldn't escape")
- enabled = gc.isenabled()
- orig_gc_disable = gc.disable
- orig_gc_isenabled = gc.isenabled
- try:
- gc.disable()
- self.assertFalse(gc.isenabled())
- subprocess.call([sys.executable, '-c', yenv + ''],
- preexec_fn=lambda: None)
- self.assertFalse(gc.isenabled(),
- "Popen enabled gc when it shouldn't.")
- gc.enable()
- self.assertTrue(gc.isenabled())
- subprocess.call([sys.executable, '-c', yenv + ''],
- preexec_fn=lambda: None)
- self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
- gc.disable = raise_runtime_error
- self.assertRaises(RuntimeError, subprocess.Popen,
- [sys.executable, '-c', yenv + ''],
- preexec_fn=lambda: None)
- del gc.isenabled # force an AttributeError
- self.assertRaises(AttributeError, subprocess.Popen,
- [sys.executable, '-c', yenv + ''],
- preexec_fn=lambda: None)
- finally:
- gc.disable = orig_gc_disable
- gc.isenabled = orig_gc_isenabled
- if not enabled:
- gc.disable()
- def test_args_string(self):
- # args is a string
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
- os.chmod(fname, 0700)
- p = subprocess.Popen(fname)
- p.wait()
- os.remove(fname)
- self.assertEqual(p.returncode, 47)
- def test_invalid_args(self):
- # invalid arguments should raise ValueError
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"],
- startupinfo=47)
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"],
- creationflags=47)
- def test_shell_sequence(self):
- # Run command through the shell (sequence)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "apple"
- p = subprocess.Popen(["echo $FRUIT"], shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.assertEqual(p.stdout.read().strip(), "apple")
- def test_shell_string(self):
- # Run command through the shell (string)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "apple"
- p = subprocess.Popen("echo $FRUIT", shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.assertEqual(p.stdout.read().strip(), "apple")
- def test_call_string(self):
- # call() function with string argument on UNIX
- f, fname = mkstemp()
- os.write(f, "#!/bin/sh\n")
- os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
- sys.executable)
- os.close(f)
- os.chmod(fname, 0700)
- rc = subprocess.call(fname)
- os.remove(fname)
- self.assertEqual(rc, 47)
- def test_specific_shell(self):
- # Issue #9265: Incorrect name passed as arg[0].
- shells = []
- for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
- for name in ['bash', 'ksh']:
- sh = os.path.join(prefix, name)
- if os.path.isfile(sh):
- shells.append(sh)
- if not shells: # Will probably work for any shell but csh.
- self.skipTest("bash or ksh required for this test")
- sh = '/bin/sh'
- if os.path.isfile(sh) and not os.path.islink(sh):
- # Test will fail if /bin/sh is a symlink to csh.
- shells.append(sh)
- for sh in shells:
- p = subprocess.Popen("echo $0", executable=sh, shell=True,
- stdout=subprocess.PIPE)
- self.assertEqual(p.stdout.read().strip(), sh)
- def _kill_process(self, method, *args):
- # Do not inherit file handles from the parent.
- # It should fix failures on some platforms.
- p = subprocess.Popen([sys.executable, "-c", yenv + """if 1:
- import sys, time
- sys.stdout.write('x\\n')
- sys.stdout.flush()
- time.sleep(30)
- """],
- close_fds=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Wait for the interpreter to be completely initialized before
- # sending any signal.
- p.stdout.read(1)
- getattr(p, method)(*args)
- return p
- def test_send_signal(self):
- p = self._kill_process('send_signal', signal.SIGINT)
- _, stderr = p.communicate()
- self.assertIn('KeyboardInterrupt', stderr)
- self.assertNotEqual(p.wait(), 0)
- def test_kill(self):
- p = self._kill_process('kill')
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- self.assertEqual(p.wait(), -signal.SIGKILL)
- def test_terminate(self):
- p = self._kill_process('terminate')
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- self.assertEqual(p.wait(), -signal.SIGTERM)
- def check_close_std_fds(self, fds):
- # Issue #9905: test that subprocess pipes still work properly with
- # some standard fds closed
- stdin = 0
- newfds = []
- for a in fds:
- b = os.dup(a)
- newfds.append(b)
- if a == 0:
- stdin = b
- try:
- for fd in fds:
- os.close(fd)
- out, err = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.stdout.write("apple");'
- 'sys.stdout.flush();'
- 'sys.stderr.write("orange")'],
- stdin=stdin,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE).communicate()
- err = strip_python_stderr(err)
- self.assertEqual((out, err), ('apple', 'orange'))
- finally:
- for b, a in zip(newfds, fds):
- os.dup2(b, a)
- for b in newfds:
- os.close(b)
- def test_close_fd_0(self):
- self.check_close_std_fds([0])
- def test_close_fd_1(self):
- self.check_close_std_fds([1])
- def test_close_fd_2(self):
- self.check_close_std_fds([2])
- def test_close_fds_0_1(self):
- self.check_close_std_fds([0, 1])
- def test_close_fds_0_2(self):
- self.check_close_std_fds([0, 2])
- def test_close_fds_1_2(self):
- self.check_close_std_fds([1, 2])
- def test_close_fds_0_1_2(self):
- # Issue #10806: test that subprocess pipes still work properly with
- # all standard fds closed.
- self.check_close_std_fds([0, 1, 2])
- def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
- # open up some temporary files
- temps = [mkstemp() for i in range(3)]
- temp_fds = [fd for fd, fname in temps]
- try:
- # unlink the files -- we won't need to reopen them
- for fd, fname in temps:
- os.unlink(fname)
- # save a copy of the standard file descriptors
- saved_fds = [os.dup(fd) for fd in range(3)]
- try:
- # duplicate the temp files over the standard fd's 0, 1, 2
- for fd, temp_fd in enumerate(temp_fds):
- os.dup2(temp_fd, fd)
- # write some data to what will become stdin, and rewind
- os.write(stdin_no, "STDIN")
- os.lseek(stdin_no, 0, 0)
- # now use those files in the given order, so that subprocess
- # has to rearrange them in the child
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; got = sys.stdin.read();'
- 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
- stdin=stdin_no,
- stdout=stdout_no,
- stderr=stderr_no)
- p.wait()
- for fd in temp_fds:
- os.lseek(fd, 0, 0)
- out = os.read(stdout_no, 1024)
- err = os.read(stderr_no, 1024)
- finally:
- for std, saved in enumerate(saved_fds):
- os.dup2(saved, std)
- os.close(saved)
- self.assertEqual(out, "got STDIN")
- self.assertStderrEqual(err, "err")
- finally:
- for fd in temp_fds:
- os.close(fd)
- # When duping fds, if there arises a situation where one of the fds is
- # either 0, 1 or 2, it is possible that it is overwritten (#12607).
- # This tests all combinations of this.
- def test_swap_fds(self):
- self.check_swap_fds(0, 1, 2)
- self.check_swap_fds(0, 2, 1)
- self.check_swap_fds(1, 0, 2)
- self.check_swap_fds(1, 2, 0)
- self.check_swap_fds(2, 0, 1)
- self.check_swap_fds(2, 1, 0)
- def test_small_errpipe_write_fd(self):
- """Issue #15798: Popen should work when stdio fds are available."""
- new_stdin = os.dup(0)
- new_stdout = os.dup(1)
- try:
- os.close(0)
- os.close(1)
- subprocess.Popen([
- sys.executable, "-c", yenv + "pass"]).wait()
- finally:
- # Restore original stdin and stdout
- os.dup2(new_stdin, 0)
- os.dup2(new_stdout, 1)
- os.close(new_stdin)
- os.close(new_stdout)
- def test_remapping_std_fds(self):
- # open up some temporary files
- temps = [mkstemp() for i in range(3)]
- try:
- temp_fds = [fd for fd, fname in temps]
- # unlink the files -- we won't need to reopen them
- for fd, fname in temps:
- os.unlink(fname)
- # write some data to what will become stdin, and rewind
- os.write(temp_fds[1], "STDIN")
- os.lseek(temp_fds[1], 0, 0)
- # move the standard file descriptors out of the way
- saved_fds = [os.dup(fd) for fd in range(3)]
- try:
- # duplicate the file objects over the standard fd's
- for fd, temp_fd in enumerate(temp_fds):
- os.dup2(temp_fd, fd)
- # now use those files in the "wrong" order, so that subprocess
- # has to rearrange them in the child
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys; got = sys.stdin.read();'
- 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
- stdin=temp_fds[1],
- stdout=temp_fds[2],
- stderr=temp_fds[0])
- p.wait()
- finally:
- # restore the original fd's underneath sys.stdin, etc.
- for std, saved in enumerate(saved_fds):
- os.dup2(saved, std)
- os.close(saved)
- for fd in temp_fds:
- os.lseek(fd, 0, 0)
- out = os.read(temp_fds[2], 1024)
- err = os.read(temp_fds[0], 1024)
- self.assertEqual(out, "got STDIN")
- self.assertStderrEqual(err, "err")
- finally:
- for fd in temp_fds:
- os.close(fd)
- # NOTE: test_surrogates_error_message makes no sense on python 2.x. omitted.
- # NOTE: test_undecodable_env makes no sense on python 2.x. omitted.
- # NOTE: test_bytes_program makes no sense on python 2.x. omitted.
- if sys.version_info[:2] >= (2,7):
- # Disabling this test on 2.6 and earlier as it fails on Travis CI regardless
- # of LANG=C being set and is not worth the time to figure out why in such a
- # legacy environment..
- # https://travis-ci.org/google/python-subprocess32/jobs/290065729
- def test_fs_encode_unicode_error(self):
- fs_encoding = sys.getfilesystemencoding()
- if fs_encoding.upper() not in ("ANSI_X3.4-1968", "ASCII"):
- self.skipTest(
- "Requires a restictive sys.filesystemencoding(), "
- "not %s. Run python with LANG=C" % fs_encoding)
- highbit_executable_name = os.path.join(
- test_support.findfile("testdata"), u"Does\\Not\uDCff\\Exist")
- try:
- subprocess.call([highbit_executable_name])
- except UnicodeEncodeError:
- return
- except RuntimeError, e:
- # The ProcessTestCasePOSIXPurePython version ends up here. It
- # can't re-construct the unicode error from the child because it
- # doesn't have all the arguments. BFD. One doesn't use
- # subprocess32 for the old pure python implementation...
- if "UnicodeEncodeError" not in str(e):
- self.fail("Expected a RuntimeError whining about how a "
- "UnicodeEncodeError from the child could not "
- "be reraised. Not: %s" % e)
- return
- self.fail("Expected a UnicodeEncodeError to be raised.")
- def test_pipe_cloexec(self):
- sleeper = test_support.findfile("testdata/input_reader.py")
- fd_status = test_support.findfile("testdata/fd_status.py")
- p1 = subprocess.Popen([sys.executable, sleeper],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, close_fds=False)
- self.addCleanup(p1.communicate, '')
- p2 = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=False)
- output, error = p2.communicate()
- result_fds = set(map(int, output.split(',')))
- unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
- p1.stderr.fileno()])
- self.assertFalse(result_fds & unwanted_fds,
- "Expected no fds from %r to be open in child, "
- "found %r" %
- (unwanted_fds, result_fds & unwanted_fds))
- def test_pipe_cloexec_real_tools(self):
- qcat = test_support.findfile("testdata/qcat.py")
- qgrep = test_support.findfile("testdata/qgrep.py")
- subdata = 'zxcvbn'
- data = subdata * 4 + '\n'
- p1 = subprocess.Popen([sys.executable, qcat],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- close_fds=False)
- p2 = subprocess.Popen([sys.executable, qgrep, subdata],
- stdin=p1.stdout, stdout=subprocess.PIPE,
- close_fds=False)
- self.addCleanup(p1.wait)
- self.addCleanup(p2.wait)
- def kill_p1():
- try:
- p1.terminate()
- except ProcessLookupError:
- pass
- def kill_p2():
- try:
- p2.terminate()
- except ProcessLookupError:
- pass
- self.addCleanup(kill_p1)
- self.addCleanup(kill_p2)
- p1.stdin.write(data)
- p1.stdin.close()
- readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
- self.assertTrue(readfiles, "The child hung")
- self.assertEqual(p2.stdout.read(), data)
- p1.stdout.close()
- p2.stdout.close()
- def test_close_fds(self):
- fd_status = test_support.findfile("testdata/fd_status.py")
- fds = os.pipe()
- self.addCleanup(os.close, fds[0])
- self.addCleanup(os.close, fds[1])
- open_fds = set(fds)
- # add a bunch more fds
- for _ in range(9):
- fd = os.open("/dev/null", os.O_RDONLY)
- self.addCleanup(os.close, fd)
- open_fds.add(fd)
- p = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=False)
- output, ignored = p.communicate()
- remaining_fds = set(map(int, output.split(',')))
- self.assertEqual(remaining_fds & open_fds, open_fds,
- "Some fds were closed")
- p = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=True)
- output, ignored = p.communicate()
- remaining_fds = set(map(int, output.split(',')))
- self.assertFalse(remaining_fds & open_fds,
- "Some fds were left open")
- self.assertIn(1, remaining_fds, "Subprocess failed")
- # Keep some of the fd's we opened open in the subprocess.
- # This tests _posixsubprocess.c's proper handling of fds_to_keep.
- fds_to_keep = set(open_fds.pop() for _ in range(8))
- p = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=True,
- pass_fds=())
- output, ignored = p.communicate()
- remaining_fds = set(map(int, output.split(',')))
- self.assertFalse(remaining_fds & fds_to_keep & open_fds,
- "Some fds not in pass_fds were left open")
- self.assertIn(1, remaining_fds, "Subprocess failed")
- def test_close_fds_when_max_fd_is_lowered(self):
- """Confirm that issue21618 is fixed (may fail under valgrind)."""
- fd_status = test_support.findfile("testdata/fd_status.py")
- open_fds = set()
- # Add a bunch more fds to pass down.
- for _ in range(40):
- fd = os.open("/dev/null", os.O_RDONLY)
- open_fds.add(fd)
- # Leave a two pairs of low ones available for use by the
- # internal child error pipe and the stdout pipe.
- # We also leave 10 more open for use by the Python 2 startup
- # import machinery which tends to consume several at once.
- for fd in sorted(open_fds)[:14]:
- os.close(fd)
- open_fds.remove(fd)
- for fd in open_fds:
- self.addCleanup(os.close, fd)
- max_fd_open = max(open_fds)
- import resource
- rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE)
- try:
- # 29 is lower than the highest fds we are leaving open.
- resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max))
- # Launch a new Python interpreter with our low fd rlim_cur that
- # inherits open fds above that limit. It then uses subprocess
- # with close_fds=True to get a report of open fds in the child.
- # An explicit list of fds to check is passed to fd_status.py as
- # letting fd_status rely on its default logic would miss the
- # fds above rlim_cur as it normally only checks up to that limit.
- p = subprocess.Popen(
- [sys.executable, '-c', yenv +
- textwrap.dedent("""
- import subprocess32, sys
- subprocess32.Popen([sys.executable, %(fd_status)r] +
- [str(x) for x in range(%(max_fd)d)],
- close_fds=True).wait()
- """ % dict(fd_status=fd_status, max_fd=max_fd_open+1))],
- stdout=subprocess.PIPE, close_fds=False)
- finally:
- resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max))
- output, unused_stderr = p.communicate()
- remaining_fds = set(map(int, output.strip().split(',')))
- self.assertFalse(remaining_fds & open_fds,
- msg="Some fds were left open.")
- def test_pass_fds(self):
- fd_status = test_support.findfile("testdata/fd_status.py")
- open_fds = set()
- for x in range(5):
- fds = os.pipe()
- self.addCleanup(os.close, fds[0])
- self.addCleanup(os.close, fds[1])
- open_fds.update(fds)
- for fd in open_fds:
- p = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=True,
- pass_fds=(fd, ))
- output, ignored = p.communicate()
- remaining_fds = set(map(int, output.split(',')))
- to_be_closed = open_fds - set((fd,))
- self.assertIn(fd, remaining_fds, "fd to be passed not passed")
- self.assertFalse(remaining_fds & to_be_closed,
- "fd to be closed passed")
- # Syntax requires Python 2.5, assertWarns requires Python 2.7.
- #with self.assertWarns(RuntimeWarning) as context:
- # self.assertFalse(subprocess.call(
- # [sys.executable, "-c", yenv + "import sys; sys.exit(0)"],
- # close_fds=False, pass_fds=(fd, )))
- #self.assertIn('overriding close_fds', str(context.warning))
- def test_stdout_stdin_are_single_inout_fd(self):
- inout = open(os.devnull, "r+")
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"],
- stdout=inout, stdin=inout)
- p.wait()
- finally:
- inout.close()
- def test_stdout_stderr_are_single_inout_fd(self):
- inout = open(os.devnull, "r+")
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"],
- stdout=inout, stderr=inout)
- p.wait()
- finally:
- inout.close()
- def test_stderr_stdin_are_single_inout_fd(self):
- inout = open(os.devnull, "r+")
- try:
- p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"],
- stderr=inout, stdin=inout)
- p.wait()
- finally:
- inout.close()
- def test_wait_when_sigchild_ignored(self):
- # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
- sigchild_ignore = test_support.findfile("testdata/sigchild_ignore.py")
- p = subprocess.Popen([sys.executable, sigchild_ignore],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
- " non-zero with this error:\n%s" % stderr)
- def test_select_unbuffered(self):
- # Issue #11459: bufsize=0 should really set the pipes as
- # unbuffered (and therefore let select() work properly).
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys;'
- 'sys.stdout.write("apple")'],
- stdout=subprocess.PIPE,
- bufsize=0)
- f = p.stdout
- self.addCleanup(f.close)
- try:
- self.assertEqual(f.read(4), "appl")
- self.assertIn(f, select.select([f], [], [], 0.0)[0])
- finally:
- p.wait()
- def test_zombie_fast_process_del(self):
- # Issue #12650: on Unix, if Popen.__del__() was called before the
- # process exited, it wouldn't be added to subprocess._active, and would
- # remain a zombie.
- # spawn a Popen, and delete its reference before it exits
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import sys, time;'
- 'time.sleep(0.2)'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- ident = id(p)
- pid = p.pid
- del p
- # check that p is in the active processes list
- self.assertIn(ident, [id(o) for o in subprocess._active])
- def test_leak_fast_process_del_killed(self):
- # Issue #12650: on Unix, if Popen.__del__() was called before the
- # process exited, and the process got killed by a signal, it would never
- # be removed from subprocess._active, which triggered a FD and memory
- # leak.
- # spawn a Popen, delete its reference and kill it
- p = subprocess.Popen([sys.executable, "-c", yenv +
- 'import time;'
- 'time.sleep(3)'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- ident = id(p)
- pid = p.pid
- del p
- os.kill(pid, signal.SIGKILL)
- # check that p is in the active processes list
- self.assertIn(ident, [id(o) for o in subprocess._active])
- # let some time for the process to exit, and create a new Popen: this
- # should trigger the wait() of p
- time.sleep(0.2)
- try:
- proc = subprocess.Popen(['nonexisting_i_hope'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc.__exit__(None, None, None)
- except EnvironmentError:
- pass
- else:
- self.fail("EnvironmentError not raised.")
- # p should have been wait()ed on, and removed from the _active list
- self.assertRaises(OSError, os.waitpid, pid, 0)
- self.assertNotIn(ident, [id(o) for o in subprocess._active])
- def test_close_fds_after_preexec(self):
- fd_status = test_support.findfile("testdata/fd_status.py")
- # this FD is used as dup2() target by preexec_fn, and should be closed
- # in the child process
- fd = os.dup(1)
- self.addCleanup(os.close, fd)
- p = subprocess.Popen([sys.executable, fd_status],
- stdout=subprocess.PIPE, close_fds=True,
- preexec_fn=lambda: os.dup2(1, fd))
- output, ignored = p.communicate()
- remaining_fds = set(map(int, output.split(',')))
- self.assertNotIn(fd, remaining_fds)
- def test_child_terminated_in_stopped_state(self):
- """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
- if not ctypes:
- sys.stderr.write('ctypes module required.\n')
- return
- if not sys.executable:
- self.stderr.write('Test requires sys.executable.\n')
- return
- PTRACE_TRACEME = 0 # From glibc and MacOS (PT_TRACE_ME).
- libc_name = ctypes.util.find_library('c')
- libc = ctypes.CDLL(libc_name)
- if not hasattr(libc, 'ptrace'):
- self.stderr.write('ptrace() required.\n')
- return
- test_ptrace = subprocess.Popen(
- [sys.executable, '-c', yenv + """if True:
- import ctypes
- libc = ctypes.CDLL({libc_name!r})
- libc.ptrace({PTRACE_TRACEME}, 0, 0)
- """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
- ])
- if test_ptrace.wait() != 0:
- sys.stderr.write('ptrace() failed - unable to test.\n')
- return
- child = subprocess.Popen(
- [sys.executable, '-c', yenv + """if True:
- import ctypes
- libc = ctypes.CDLL({libc_name!r})
- libc.ptrace({PTRACE_TRACEME}, 0, 0)
- libc.printf(ctypes.c_char_p(0xdeadbeef)) # Crash the process.
- """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME)
- ])
- try:
- returncode = child.wait()
- except Exception, e:
- child.kill() # Clean up the hung stopped process.
- raise e
- self.assertNotEqual(0, returncode)
- self.assert_(returncode < 0, msg=repr(returncode)) # signal death, likely SIGSEGV.
- if mswindows:
- class POSIXProcessTestCase(unittest.TestCase): pass
- #@unittest.skipUnless(mswindows, "Windows specific tests")
- class Win32ProcessTestCase(BaseTestCase):
- def test_startupinfo(self):
- # startupinfo argument
- # We uses hardcoded constants, because we do not want to
- # depend on win32all.
- STARTF_USESHOWWINDOW = 1
- SW_MAXIMIZE = 3
- startupinfo = subprocess.STARTUPINFO()
- startupinfo.dwFlags = STARTF_USESHOWWINDOW
- startupinfo.wShowWindow = SW_MAXIMIZE
- # Since Python is a console process, it won't be affected
- # by wShowWindow, but the argument should be silently
- # ignored
- subprocess.call([sys.executable, "-c", yenv + "import sys; sys.exit(0)"],
- startupinfo=startupinfo)
- def test_creationflags(self):
- # creationflags argument
- CREATE_NEW_CONSOLE = 16
- sys.stderr.write(" a DOS box should flash briefly ...\n")
- subprocess.call(sys.executable +
- ' -c "import time; time.sleep(0.25)"',
- creationflags=CREATE_NEW_CONSOLE)
- def test_invalid_args(self):
- # invalid arguments should raise ValueError
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"],
- preexec_fn=lambda: 1)
- self.assertRaises(ValueError, subprocess.call,
- [sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"],
- stdout=subprocess.PIPE,
- close_fds=True)
- def test_close_fds(self):
- # close file descriptors
- rc = subprocess.call([sys.executable, "-c", yenv +
- "import sys; sys.exit(47)"],
- close_fds=True)
- self.assertEqual(rc, 47)
- def test_shell_sequence(self):
- # Run command through the shell (sequence)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "physalis"
- p = subprocess.Popen(["set"], shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.assertIn("physalis", p.stdout.read())
- def test_shell_string(self):
- # Run command through the shell (string)
- newenv = os.environ.copy()
- newenv["FRUIT"] = "physalis"
- p = subprocess.Popen("set", shell=1,
- stdout=subprocess.PIPE,
- env=newenv)
- self.assertIn("physalis", p.stdout.read())
- def test_call_string(self):
- # call() function with string argument on Windows
- rc = subprocess.call(sys.executable +
- ' -c "import sys; sys.exit(47)"')
- self.assertEqual(rc, 47)
- def _kill_process(self, method, *args):
- # Some win32 buildbot raises EOFError if stdin is inherited
- p = subprocess.Popen([sys.executable, "-c", yenv + "input()"],
- stdin=subprocess.PIPE, stderr=subprocess.PIPE)
- # Let the process initialize (Issue #3137)
- time.sleep(0.1)
- # The process should not terminate prematurely
- self.assert_(p.poll() is None)
- # Retry if the process do not receive the signal.
- count, maxcount = 0, 3
- while count < maxcount and p.poll() is None:
- getattr(p, method)(*args)
- time.sleep(0.1)
- count += 1
- returncode = p.poll()
- self.assert_(returncode is not None, "the subprocess did not terminate")
- if count > 1:
- print >>sys.stderr, ("p.{}{} succeeded after "
- "{} attempts".format(method, args, count))
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, '')
- self.assertEqual(p.wait(), returncode)
- self.assertNotEqual(returncode, 0)
- def test_send_signal(self):
- self._kill_process('send_signal', signal.SIGTERM)
- def test_kill(self):
- self._kill_process('kill')
- def test_terminate(self):
- self._kill_process('terminate')
- if not mswindows:
- class Win32ProcessTestCase(unittest.TestCase): pass
- #@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
- # "poll system call not supported")
- class ProcessTestCaseNoPoll(ProcessTestCase):
- def setUp(self):
- subprocess._has_poll = False
- ProcessTestCase.setUp(self)
- def tearDown(self):
- subprocess._has_poll = True
- ProcessTestCase.tearDown(self)
- if not getattr(subprocess, '_has_poll', False):
- class ProcessTestCaseNoPoll(unittest.TestCase): pass
- #@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
- # "_posixsubprocess extension module not found.")
- class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
- def setUp(self):
- subprocess._posixsubprocess = None
- ProcessTestCase.setUp(self)
- POSIXProcessTestCase.setUp(self)
- def tearDown(self):
- subprocess._posixsubprocess = sys.modules['_posixsubprocess32']
- POSIXProcessTestCase.tearDown(self)
- ProcessTestCase.tearDown(self)
- class POSIXSubprocessModuleTestCase(unittest.TestCase):
- def test_fork_exec_sorted_fd_sanity_check(self):
- # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check.
- _posixsubprocess = subprocess._posixsubprocess
- gc_enabled = gc.isenabled()
- try:
- gc.enable()
- for fds_to_keep in (
- (-1, 2, 3, 4, 5), # Negative number.
- ('str', 4), # Not an int.
- (18, 23, 42, 2**63), # Out of range.
- (5, 4), # Not sorted.
- (6, 7, 7, 8), # Duplicate.
- ):
- try:
- _posixsubprocess.fork_exec(
- ["false"], ["false"],
- True, fds_to_keep, None, ["env"],
- -1, -1, -1, -1,
- 1, 2, 3, 4,
- True, True, None)
- except ValueError, exception:
- self.assertTrue('fds_to_keep' in str(exception),
- msg=str(exception))
- else:
- self.fail("ValueError not raised, fds_to_keep=%s" %
- (fds_to_keep,))
- finally:
- if not gc_enabled:
- gc.disable()
- def test_cloexec_pass_fds(self):
- if not os.path.exists('/dev/null') or not os.path.isdir('/dev/fd'):
- print("Skipped - This test requires /dev/null and /dev/fd/*.")
- return
- null_reader_proc = subprocess.Popen(
- ["cat"],
- stdin=open('/dev/null', 'rb'),
- stdout=subprocess.PIPE)
- try:
- data = null_reader_proc.stdout
- fd_name = '/dev/fd/%d' % data.fileno()
- fd_reader_proc = subprocess.Popen(
- ["cat", fd_name],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, # Capture any error from cat.
- pass_fds=(data.fileno(),))
- try:
- fddata = fd_reader_proc.stdout
- self.assertEqual('', fddata.read())
- finally:
- fd_reader_proc.wait()
- finally:
- null_reader_proc.wait()
- if not getattr(subprocess, '_posixsubprocess', False):
- print >>sys.stderr, "_posixsubprocess extension module not found."
- class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass
- class POSIXSubprocessModuleTestCase(unittest.TestCase): pass
- class HelperFunctionTests(unittest.TestCase):
- #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
- def test_eintr_retry_call(self):
- record_calls = []
- def fake_os_func(*args):
- record_calls.append(args)
- if len(record_calls) == 2:
- raise OSError(errno.EINTR, "fake interrupted system call")
- return tuple(reversed(args))
- self.assertEqual((999, 256),
- subprocess._eintr_retry_call(fake_os_func, 256, 999))
- self.assertEqual([(256, 999)], record_calls)
- # This time there will be an EINTR so it will loop once.
- self.assertEqual((666,),
- subprocess._eintr_retry_call(fake_os_func, 666))
- self.assertEqual([(256, 999), (666,), (666,)], record_calls)
- if mswindows:
- del test_eintr_retry_call
- if not hasattr(unittest.TestCase, 'assertSequenceEqual'):
- def assertSequenceEqual(self, seq1, seq2):
- self.assertEqual(list(seq1), list(seq2))
- def test_get_exec_path(self):
- defpath_list = os.defpath.split(os.pathsep)
- test_path = ['/monty', '/python', '', '/flying/circus']
- test_env = {'PATH': os.pathsep.join(test_path)}
- get_exec_path = subprocess._get_exec_path
- saved_environ = os.environ
- try:
- os.environ = dict(test_env)
- # Test that defaulting to os.environ works.
- self.assertSequenceEqual(test_path, get_exec_path())
- self.assertSequenceEqual(test_path, get_exec_path(env=None))
- finally:
- os.environ = saved_environ
- # No PATH environment variable
- self.assertSequenceEqual(defpath_list, get_exec_path({}))
- # Empty PATH environment variable
- self.assertSequenceEqual(('',), get_exec_path({'PATH':''}))
- # Supplied PATH environment variable
- self.assertSequenceEqual(test_path, get_exec_path(test_env))
- def test_args_from_interpreter_flags(self):
- if sys.version_info[:2] < (2,6):
- print "Skipped - only useful on 2.6 and higher."
- return
- # Mostly just to call it for code coverage.
- args_list = subprocess32._args_from_interpreter_flags()
- self.assertTrue(isinstance(args_list, list), msg=repr(args_list))
- def test_timeout_expired_unpickling(self):
- """https://github.com/google/python-subprocess32/issues/57"""
- t = subprocess32.TimeoutExpired(['command', 'arg1'], 5,
- output='stdout!', stderr='err')
- t_pickled = pickle.dumps(t)
- t2 = pickle.loads(t_pickled)
- self.assertEqual(t.cmd, t2.cmd)
- self.assertEqual(t.timeout, t2.timeout)
- self.assertEqual(t.output, t2.output)
- self.assertEqual(t.stderr, t2.stderr)
- def test_called_process_error_unpickling(self):
- """https://github.com/google/python-subprocess32/issues/57"""
- e = subprocess32.CalledProcessError(
- 2, ['command', 'arg1'], output='stdout!', stderr='err')
- e_pickled = pickle.dumps(e)
- e2 = pickle.loads(e_pickled)
- self.assertEqual(e.returncode, e2.returncode)
- self.assertEqual(e.cmd, e2.cmd)
- self.assertEqual(e.output, e2.output)
- self.assertEqual(e.stderr, e2.stderr)
- def reap_children():
- """Use this function at the end of test_main() whenever sub-processes
- are started. This will help ensure that no extra children (zombies)
- stick around to hog resources and create problems when looking
- for refleaks.
- """
- # Reap all our dead child processes so we don't leave zombies around.
- # These hog resources and might be causing some of the buildbots to die.
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
- try:
- # This will raise an exception on Windows. That's ok.
- pid, status = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- except:
- break
- class ContextManagerTests(BaseTestCase):
- def test_pipe(self):
- proc = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys;"
- "sys.stdout.write('stdout');"
- "sys.stderr.write('stderr');"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- try:
- self.assertEqual(proc.stdout.read(), "stdout")
- self.assertStderrEqual(proc.stderr.read(), "stderr")
- finally:
- proc.__exit__(None, None, None)
- self.assertTrue(proc.stdout.closed)
- self.assertTrue(proc.stderr.closed)
- def test_returncode(self):
- proc = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys; sys.exit(100)"])
- proc.__exit__(None, None, None)
- # __exit__ calls wait(), so the returncode should be set
- self.assertEqual(proc.returncode, 100)
- def test_communicate_stdin(self):
- proc = subprocess.Popen([sys.executable, "-c", yenv +
- "import sys;"
- "sys.exit(sys.stdin.read() == 'context')"],
- stdin=subprocess.PIPE)
- try:
- proc.communicate("context")
- self.assertEqual(proc.returncode, 1)
- finally:
- proc.__exit__(None, None, None)
- def test_invalid_args(self):
- try:
- proc = subprocess.Popen(['nonexisting_i_hope'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc.__exit__(None, None, None)
- except EnvironmentError, exception:
- # ignore errors that indicate the command was not found
- if exception.errno not in (errno.ENOENT, errno.EACCES):
- raise
- else:
- self.fail("Expected an EnvironmentError exception.")
- if sys.version_info[:2] <= (2,4):
- # The test suite hangs during the pure python test on 2.4. No idea why.
- # That is not the implementation anyone is using this module for anyways.
- class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass
- def main():
- unit_tests = (ProcessTestCase,
- POSIXProcessTestCase,
- POSIXSubprocessModuleTestCase,
- Win32ProcessTestCase,
- ProcessTestCasePOSIXPurePython,
- ProcessTestCaseNoPoll,
- HelperFunctionTests,
- ContextManagerTests,
- RunFuncTestCase,
- )
- test_support.run_unittest(*unit_tests)
- reap_children()
- if __name__ == "__main__":
- main()
|