Changeset 391 for python/trunk/Lib/test/test_subprocess.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_subprocess.py
r2 r391 9 9 import time 10 10 import re 11 import sysconfig 12 13 try: 14 import resource 15 except ImportError: 16 resource = None 17 try: 18 import threading 19 except ImportError: 20 threading = None 11 21 12 22 mswindows = (sys.platform == "win32") … … 22 32 SETBINARY = '' 23 33 24 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 25 # shutdown time. That frustrates tests trying to check stderr produced 26 # from a spawned Python process. 27 def remove_stderr_debug_decorations(stderr): 28 return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 29 30 class ProcessTestCase(unittest.TestCase): 34 35 try: 36 mkstemp = tempfile.mkstemp 37 except AttributeError: 38 # tempfile.mkstemp is not available 39 def mkstemp(): 40 """Replacement for mkstemp, calling mktemp.""" 41 fname = tempfile.mktemp() 42 return os.open(fname, os.O_RDWR|os.O_CREAT), fname 43 44 45 class BaseTestCase(unittest.TestCase): 31 46 def setUp(self): 32 47 # Try to minimize the number of children we have so this test 33 48 # doesn't crash on some buildbots (Alphas in particular). 34 if hasattr(test_support, "reap_children"): 35 test_support.reap_children() 49 test_support.reap_children() 36 50 37 51 def tearDown(self): 38 # Try to minimize the number of children we have so this test 39 # doesn't crash on some buildbots (Alphas in particular). 40 if hasattr(test_support, "reap_children"): 41 test_support.reap_children() 42 43 def mkstemp(self): 44 """wrapper for mkstemp, calling mktemp if mkstemp is not available""" 45 if hasattr(tempfile, "mkstemp"): 46 return tempfile.mkstemp() 47 else: 48 fname = tempfile.mktemp() 49 return os.open(fname, os.O_RDWR|os.O_CREAT), fname 50 51 # 52 # Generic tests 53 # 52 for inst in subprocess._active: 53 inst.wait() 54 subprocess._cleanup() 55 self.assertFalse(subprocess._active, "subprocess._active not empty") 56 57 def assertStderrEqual(self, stderr, expected, msg=None): 58 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 59 # shutdown time. That frustrates tests trying to check stderr produced 60 # from a spawned Python process. 61 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 62 self.assertEqual(actual, expected, msg) 63 64 65 class PopenTestException(Exception): 66 pass 67 68 69 class PopenExecuteChildRaises(subprocess.Popen): 70 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 71 _execute_child fails. 72 """ 73 def _execute_child(self, *args, **kwargs): 74 raise PopenTestException("Forced Exception for Test") 75 76 77 class ProcessTestCase(BaseTestCase): 78 54 79 def test_call_seq(self): 55 80 # call() function with sequence argument … … 66 91 def test_check_call_nonzero(self): 67 92 # check_call() function with non-zero return code 68 try:93 with self.assertRaises(subprocess.CalledProcessError) as c: 69 94 subprocess.check_call([sys.executable, "-c", 70 95 "import sys; sys.exit(47)"]) 71 except subprocess.CalledProcessError, e: 72 self.assertEqual(e.returncode, 47) 73 else: 74 self.fail("Expected CalledProcessError") 96 self.assertEqual(c.exception.returncode, 47) 97 98 def test_check_output(self): 99 # check_output() function with zero return code 100 output = subprocess.check_output( 101 [sys.executable, "-c", "print 'BDFL'"]) 102 self.assertIn('BDFL', output) 103 104 def test_check_output_nonzero(self): 105 # check_call() function with non-zero return code 106 with self.assertRaises(subprocess.CalledProcessError) as c: 107 subprocess.check_output( 108 [sys.executable, "-c", "import sys; sys.exit(5)"]) 109 self.assertEqual(c.exception.returncode, 5) 110 111 def test_check_output_stderr(self): 112 # check_output() function stderr redirected to stdout 113 output = subprocess.check_output( 114 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 115 stderr=subprocess.STDOUT) 116 self.assertIn('BDFL', output) 117 118 def test_check_output_stdout_arg(self): 119 # check_output() function stderr redirected to stdout 120 with self.assertRaises(ValueError) as c: 121 output = subprocess.check_output( 122 [sys.executable, "-c", "print 'will not be run'"], 123 stdout=sys.stdout) 124 self.fail("Expected ValueError when stdout arg supplied.") 125 self.assertIn('stdout', c.exception.args[0]) 75 126 76 127 def test_call_kwargs(self): … … 79 130 newenv["FRUIT"] = "banana" 80 131 rc = subprocess.call([sys.executable, "-c", 81 'import sys, os;' \82 'sys.exit(os.getenv("FRUIT")=="banana")'],83 env=newenv)132 'import sys, os;' 133 'sys.exit(os.getenv("FRUIT")=="banana")'], 134 env=newenv) 84 135 self.assertEqual(rc, 1) 136 137 def test_invalid_args(self): 138 # Popen() called with invalid arguments should raise TypeError 139 # but Popen.__del__ should not complain (issue #12085) 140 with test_support.captured_stderr() as s: 141 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 142 argcount = subprocess.Popen.__init__.__code__.co_argcount 143 too_many_args = [0] * (argcount + 1) 144 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 145 self.assertEqual(s.getvalue(), '') 85 146 86 147 def test_stdin_none(self): … … 88 149 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 89 150 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 151 self.addCleanup(p.stdout.close) 152 self.addCleanup(p.stderr.close) 90 153 p.wait() 91 154 self.assertEqual(p.stdin, None) 92 155 93 156 def test_stdout_none(self): 94 # .stdout is None when not redirected 95 p = subprocess.Popen([sys.executable, "-c", 96 'print " this bit of output is from a ' 97 'test of stdout in a different ' 98 'process ..."'], 99 stdin=subprocess.PIPE, stderr=subprocess.PIPE) 100 p.wait() 101 self.assertEqual(p.stdout, None) 157 # .stdout is None when not redirected, and the child's stdout will 158 # be inherited from the parent. In order to test this we run a 159 # subprocess in a subprocess: 160 # this_test 161 # \-- subprocess created by this test (parent) 162 # \-- subprocess created by the parent subprocess (child) 163 # The parent doesn't specify stdout, so the child will use the 164 # parent's stdout. This test checks that the message printed by the 165 # child goes to the parent stdout. The parent also checks that the 166 # child's stdout is None. See #11963. 167 code = ('import sys; from subprocess import Popen, PIPE;' 168 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],' 169 ' stdin=PIPE, stderr=PIPE);' 170 'p.wait(); assert p.stdout is None;') 171 p = subprocess.Popen([sys.executable, "-c", code], 172 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 173 self.addCleanup(p.stdout.close) 174 self.addCleanup(p.stderr.close) 175 out, err = p.communicate() 176 self.assertEqual(p.returncode, 0, err) 177 self.assertEqual(out.rstrip(), 'test_stdout_none') 102 178 103 179 def test_stderr_none(self): … … 105 181 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 106 182 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 183 self.addCleanup(p.stdout.close) 184 self.addCleanup(p.stdin.close) 107 185 p.wait() 108 186 self.assertEqual(p.stderr, None) 109 187 110 def test_executable(self): 111 p = subprocess.Popen(["somethingyoudonthave", 112 "-c", "import sys; sys.exit(47)"], 188 def test_executable_with_cwd(self): 189 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 190 p = subprocess.Popen(["somethingyoudonthave", "-c", 191 "import sys; sys.exit(47)"], 192 executable=sys.executable, cwd=python_dir) 193 p.wait() 194 self.assertEqual(p.returncode, 47) 195 196 @unittest.skipIf(sysconfig.is_python_build(), 197 "need an installed Python. See #7774") 198 def test_executable_without_cwd(self): 199 # For a normal installation, it should work without 'cwd' 200 # argument. For test runs in the build directory, see #7774. 201 p = subprocess.Popen(["somethingyoudonthave", "-c", 202 "import sys; sys.exit(47)"], 113 203 executable=sys.executable) 114 204 p.wait() … … 153 243 'import sys; sys.stdout.write("orange")'], 154 244 stdout=subprocess.PIPE) 245 self.addCleanup(p.stdout.close) 155 246 self.assertEqual(p.stdout.read(), "orange") 156 247 … … 181 272 'import sys; sys.stderr.write("strawberry")'], 182 273 stderr=subprocess.PIPE) 183 self.a ssertEqual(remove_stderr_debug_decorations(p.stderr.read()),184 274 self.addCleanup(p.stderr.close) 275 self.assertStderrEqual(p.stderr.read(), "strawberry") 185 276 186 277 def test_stderr_filedes(self): … … 193 284 p.wait() 194 285 os.lseek(d, 0, 0) 195 self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), 196 "strawberry") 286 self.assertStderrEqual(os.read(d, 1024), "strawberry") 197 287 198 288 def test_stderr_fileobj(self): … … 204 294 p.wait() 205 295 tf.seek(0) 206 self.assertEqual(remove_stderr_debug_decorations(tf.read()), 207 "strawberry") 296 self.assertStderrEqual(tf.read(), "strawberry") 208 297 209 298 def test_stdout_stderr_pipe(self): 210 299 # capture stdout and stderr to the same pipe 211 300 p = subprocess.Popen([sys.executable, "-c", 212 'import sys;' \213 'sys.stdout.write("apple");' \214 'sys.stdout.flush();' \301 'import sys;' 302 'sys.stdout.write("apple");' 303 'sys.stdout.flush();' 215 304 'sys.stderr.write("orange")'], 216 305 stdout=subprocess.PIPE, 217 306 stderr=subprocess.STDOUT) 218 output = p.stdout.read() 219 stripped = remove_stderr_debug_decorations(output) 220 self.assertEqual(stripped, "appleorange") 307 self.addCleanup(p.stdout.close) 308 self.assertStderrEqual(p.stdout.read(), "appleorange") 221 309 222 310 def test_stdout_stderr_file(self): … … 224 312 tf = tempfile.TemporaryFile() 225 313 p = subprocess.Popen([sys.executable, "-c", 226 'import sys;' \227 'sys.stdout.write("apple");' \228 'sys.stdout.flush();' \314 'import sys;' 315 'sys.stdout.write("apple");' 316 'sys.stdout.flush();' 229 317 'sys.stderr.write("orange")'], 230 318 stdout=tf, … … 232 320 p.wait() 233 321 tf.seek(0) 234 output = tf.read() 235 stripped = remove_stderr_debug_decorations(output) 236 self.assertEqual(stripped, "appleorange") 322 self.assertStderrEqual(tf.read(), "appleorange") 237 323 238 324 def test_stdout_filedes_of_stdout(self): 239 325 # stdout is set to 1 (#1531862). 240 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" 241 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) 242 self.assertEquals(rc, 2) 326 # To avoid printing the text on stdout, we do something similar to 327 # test_stdout_none (see above). The parent subprocess calls the child 328 # subprocess passing stdout=1, and this test uses stdout=PIPE in 329 # order to capture and check the output of the parent. See #11963. 330 code = ('import sys, subprocess; ' 331 'rc = subprocess.call([sys.executable, "-c", ' 332 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 333 '\'test with stdout=1\'))"], stdout=1); ' 334 'assert rc == 18') 335 p = subprocess.Popen([sys.executable, "-c", code], 336 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 337 self.addCleanup(p.stdout.close) 338 self.addCleanup(p.stderr.close) 339 out, err = p.communicate() 340 self.assertEqual(p.returncode, 0, err) 341 self.assertEqual(out.rstrip(), 'test with stdout=1') 243 342 244 343 def test_cwd(self): … … 251 350 os.chdir(cwd) 252 351 p = subprocess.Popen([sys.executable, "-c", 253 'import sys,os;' \352 'import sys,os;' 254 353 'sys.stdout.write(os.getcwd())'], 255 354 stdout=subprocess.PIPE, 256 355 cwd=tmpdir) 356 self.addCleanup(p.stdout.close) 257 357 normcase = os.path.normcase 258 358 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) … … 262 362 newenv["FRUIT"] = "orange" 263 363 p = subprocess.Popen([sys.executable, "-c", 264 'import sys,os;' \364 'import sys,os;' 265 365 'sys.stdout.write(os.getenv("FRUIT"))'], 266 366 stdout=subprocess.PIPE, 267 367 env=newenv) 368 self.addCleanup(p.stdout.close) 268 369 self.assertEqual(p.stdout.read(), "orange") 269 370 270 371 def test_communicate_stdin(self): 271 372 p = subprocess.Popen([sys.executable, "-c", 272 'import sys; sys.exit(sys.stdin.read() == "pear")'], 373 'import sys;' 374 'sys.exit(sys.stdin.read() == "pear")'], 273 375 stdin=subprocess.PIPE) 274 376 p.communicate("pear") … … 289 391 (stdout, stderr) = p.communicate() 290 392 self.assertEqual(stdout, None) 291 self.assert Equal(remove_stderr_debug_decorations(stderr), "pineapple")393 self.assertStderrEqual(stderr, "pineapple") 292 394 293 395 def test_communicate(self): … … 299 401 stdout=subprocess.PIPE, 300 402 stderr=subprocess.PIPE) 403 self.addCleanup(p.stdout.close) 404 self.addCleanup(p.stderr.close) 405 self.addCleanup(p.stdin.close) 301 406 (stdout, stderr) = p.communicate("banana") 302 407 self.assertEqual(stdout, "banana") 303 self.assertEqual(remove_stderr_debug_decorations(stderr), 304 "pineapple") 408 self.assertStderrEqual(stderr, "pineapple") 305 409 306 410 # This test is Linux specific for simplicity to at least have 307 411 # some coverage. It is not a platform specific bug. 308 if os.path.isdir('/proc/%d/fd' % os.getpid()): 309 # Test for the fd leak reported in http://bugs.python.org/issue2791. 310 def test_communicate_pipe_fd_leak(self): 311 fd_directory = '/proc/%d/fd' % os.getpid() 312 num_fds_before_popen = len(os.listdir(fd_directory)) 313 p = subprocess.Popen([sys.executable, '-c', 'print()'], 314 stdout=subprocess.PIPE) 315 p.communicate() 316 num_fds_after_communicate = len(os.listdir(fd_directory)) 317 del p 318 num_fds_after_destruction = len(os.listdir(fd_directory)) 319 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 320 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 412 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 413 "Linux specific") 414 # Test for the fd leak reported in http://bugs.python.org/issue2791. 415 def test_communicate_pipe_fd_leak(self): 416 fd_directory = '/proc/%d/fd' % os.getpid() 417 num_fds_before_popen = len(os.listdir(fd_directory)) 418 p = subprocess.Popen([sys.executable, "-c", "print()"], 419 stdout=subprocess.PIPE) 420 p.communicate() 421 num_fds_after_communicate = len(os.listdir(fd_directory)) 422 del p 423 num_fds_after_destruction = len(os.listdir(fd_directory)) 424 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 425 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 321 426 322 427 def test_communicate_returns(self): … … 341 446 p = subprocess.Popen([sys.executable, "-c", 342 447 'import sys,os;' 343 'sys.stdout.write(sys.stdin.read(47));' \344 'sys.stderr.write("xyz"*%d);' \448 'sys.stdout.write(sys.stdin.read(47));' 449 'sys.stderr.write("xyz"*%d);' 345 450 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 346 451 stdin=subprocess.PIPE, 347 452 stdout=subprocess.PIPE, 348 453 stderr=subprocess.PIPE) 454 self.addCleanup(p.stdout.close) 455 self.addCleanup(p.stderr.close) 456 self.addCleanup(p.stdin.close) 349 457 string_to_write = "abc"*pipe_buf 350 458 (stdout, stderr) = p.communicate(string_to_write) … … 354 462 # stdin.write before communicate() 355 463 p = subprocess.Popen([sys.executable, "-c", 356 'import sys,os;' \464 'import sys,os;' 357 465 'sys.stdout.write(sys.stdin.read())'], 358 466 stdin=subprocess.PIPE, 359 467 stdout=subprocess.PIPE, 360 468 stderr=subprocess.PIPE) 469 self.addCleanup(p.stdout.close) 470 self.addCleanup(p.stderr.close) 471 self.addCleanup(p.stdin.close) 361 472 p.stdin.write("banana") 362 473 (stdout, stderr) = p.communicate("split") 363 474 self.assertEqual(stdout, "bananasplit") 364 self.assert Equal(remove_stderr_debug_decorations(stderr), "")475 self.assertStderrEqual(stderr, "") 365 476 366 477 def test_universal_newlines(self): … … 380 491 stdout=subprocess.PIPE, 381 492 universal_newlines=1) 493 self.addCleanup(p.stdout.close) 382 494 stdout = p.stdout.read() 383 495 if hasattr(file, 'newlines'): … … 407 519 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 408 520 universal_newlines=1) 521 self.addCleanup(p.stdout.close) 522 self.addCleanup(p.stderr.close) 409 523 (stdout, stderr) = p.communicate() 410 524 if hasattr(file, 'newlines'): … … 414 528 else: 415 529 # Interpreter without universal newline support 416 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6") 530 self.assertEqual(stdout, 531 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 417 532 418 533 def test_no_leaking(self): 419 534 # Make sure we leak no resources 420 if not hasattr(test_support, "is_resource_enabled") \ 421 or test_support.is_resource_enabled("subprocess") and not mswindows: 535 if not mswindows: 422 536 max_handles = 1026 # too much for most UNIX systems 423 537 else: 424 max_handles = 65 425 for i in range(max_handles): 426 p = subprocess.Popen([sys.executable, "-c", 427 "import sys;sys.stdout.write(sys.stdin.read())"], 428 stdin=subprocess.PIPE, 429 stdout=subprocess.PIPE, 430 stderr=subprocess.PIPE) 431 data = p.communicate("lime")[0] 432 self.assertEqual(data, "lime") 433 538 max_handles = 2050 # too much for (at least some) Windows setups 539 handles = [] 540 try: 541 for i in range(max_handles): 542 try: 543 handles.append(os.open(test_support.TESTFN, 544 os.O_WRONLY | os.O_CREAT)) 545 except OSError as e: 546 if e.errno != errno.EMFILE: 547 raise 548 break 549 else: 550 self.skipTest("failed to reach the file descriptor limit " 551 "(tried %d)" % max_handles) 552 # Close a couple of them (should be enough for a subprocess) 553 for i in range(10): 554 os.close(handles.pop()) 555 # Loop creating some subprocesses. If one of them leaks some fds, 556 # the next loop iteration will fail by reaching the max fd limit. 557 for i in range(15): 558 p = subprocess.Popen([sys.executable, "-c", 559 "import sys;" 560 "sys.stdout.write(sys.stdin.read())"], 561 stdin=subprocess.PIPE, 562 stdout=subprocess.PIPE, 563 stderr=subprocess.PIPE) 564 data = p.communicate(b"lime")[0] 565 self.assertEqual(data, b"lime") 566 finally: 567 for h in handles: 568 os.close(h) 569 test_support.unlink(test_support.TESTFN) 434 570 435 571 def test_list2cmdline(self): … … 450 586 self.assertEqual(subprocess.list2cmdline(['ab', '']), 451 587 'ab ""') 452 self.assertEqual(subprocess.list2cmdline(['echo', 'foo|bar']),453 'echo "foo|bar"')454 588 455 589 … … 465 599 # poll() never returned None. It "should be" very rare that it 466 600 # didn't go around at least twice. 467 self.assert _(count >=2)601 self.assertGreaterEqual(count, 2) 468 602 # Subsequent invocations should just return the returncode 469 603 self.assertEqual(p.poll(), 0) … … 481 615 # an invalid type of the bufsize argument should raise 482 616 # TypeError. 483 try:617 with self.assertRaises(TypeError): 484 618 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 485 except TypeError:486 pass487 else:488 self.fail("Expected TypeError")489 619 490 620 def test_leaking_fds_on_error(self): … … 495 625 # 1024 times (each call leaked two fds). 496 626 for i in range(1024): 497 try: 627 # Windows raises IOError. Others raise OSError. 628 with self.assertRaises(EnvironmentError) as c: 498 629 subprocess.Popen(['nonexisting_i_hope'], 499 630 stdout=subprocess.PIPE, 500 631 stderr=subprocess.PIPE) 501 # Windows raises IOError 502 except (IOError, OSError), err: 503 if err.errno != 2: # ignore "no such file" 504 raise 505 506 # 507 # POSIX tests 508 # 509 if not mswindows: 510 def test_exceptions(self): 511 # catched & re-raised exceptions 632 # ignore errors that indicate the command was not found 633 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 634 raise c.exception 635 636 @unittest.skipIf(threading is None, "threading required") 637 def test_double_close_on_error(self): 638 # Issue #18851 639 fds = [] 640 def open_fds(): 641 for i in range(20): 642 fds.extend(os.pipe()) 643 time.sleep(0.001) 644 t = threading.Thread(target=open_fds) 645 t.start() 646 try: 647 with self.assertRaises(EnvironmentError): 648 subprocess.Popen(['nonexisting_i_hope'], 649 stdin=subprocess.PIPE, 650 stdout=subprocess.PIPE, 651 stderr=subprocess.PIPE) 652 finally: 653 t.join() 654 exc = None 655 for fd in fds: 656 # If a double close occurred, some of those fds will 657 # already have been closed by mistake, and os.close() 658 # here will raise. 659 try: 660 os.close(fd) 661 except OSError as e: 662 exc = e 663 if exc is not None: 664 raise exc 665 666 def test_handles_closed_on_exception(self): 667 # If CreateProcess exits with an error, ensure the 668 # duplicate output handles are released 669 ifhandle, ifname = mkstemp() 670 ofhandle, ofname = mkstemp() 671 efhandle, efname = mkstemp() 672 try: 673 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 674 stderr=efhandle) 675 except OSError: 676 os.close(ifhandle) 677 os.remove(ifname) 678 os.close(ofhandle) 679 os.remove(ofname) 680 os.close(efhandle) 681 os.remove(efname) 682 self.assertFalse(os.path.exists(ifname)) 683 self.assertFalse(os.path.exists(ofname)) 684 self.assertFalse(os.path.exists(efname)) 685 686 def test_communicate_epipe(self): 687 # Issue 10963: communicate() should hide EPIPE 688 p = subprocess.Popen([sys.executable, "-c", 'pass'], 689 stdin=subprocess.PIPE, 690 stdout=subprocess.PIPE, 691 stderr=subprocess.PIPE) 692 self.addCleanup(p.stdout.close) 693 self.addCleanup(p.stderr.close) 694 self.addCleanup(p.stdin.close) 695 p.communicate("x" * 2**20) 696 697 def test_communicate_epipe_only_stdin(self): 698 # Issue 10963: communicate() should hide EPIPE 699 p = subprocess.Popen([sys.executable, "-c", 'pass'], 700 stdin=subprocess.PIPE) 701 self.addCleanup(p.stdin.close) 702 time.sleep(2) 703 p.communicate("x" * 2**20) 704 705 # This test is Linux-ish specific for simplicity to at least have 706 # some coverage. It is not a platform specific bug. 707 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 708 "Linux specific") 709 def test_failed_child_execute_fd_leak(self): 710 """Test for the fork() failure fd leak reported in issue16327.""" 711 fd_directory = '/proc/%d/fd' % os.getpid() 712 fds_before_popen = os.listdir(fd_directory) 713 with self.assertRaises(PopenTestException): 714 PopenExecuteChildRaises( 715 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE, 716 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 717 718 # NOTE: This test doesn't verify that the real _execute_child 719 # does not close the file descriptors itself on the way out 720 # during an exception. Code inspection has confirmed that. 721 722 fds_after_exception = os.listdir(fd_directory) 723 self.assertEqual(fds_before_popen, fds_after_exception) 724 725 726 # context manager 727 class _SuppressCoreFiles(object): 728 """Try to prevent core files from being created.""" 729 old_limit = None 730 731 def __enter__(self): 732 """Try to save previous ulimit, then set it to (0, 0).""" 733 if resource is not None: 512 734 try: 513 p = subprocess.Popen([sys.executable, "-c", ""], 735 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 736 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 737 except (ValueError, resource.error): 738 pass 739 740 if sys.platform == 'darwin': 741 # Check if the 'Crash Reporter' on OSX was configured 742 # in 'Developer' mode and warn that it will get triggered 743 # when it is. 744 # 745 # This assumes that this context manager is used in tests 746 # that might trigger the next manager. 747 value = subprocess.Popen(['/usr/bin/defaults', 'read', 748 'com.apple.CrashReporter', 'DialogType'], 749 stdout=subprocess.PIPE).communicate()[0] 750 if value.strip() == b'developer': 751 print "this tests triggers the Crash Reporter, that is intentional" 752 sys.stdout.flush() 753 754 def __exit__(self, *args): 755 """Return core file behavior to default.""" 756 if self.old_limit is None: 757 return 758 if resource is not None: 759 try: 760 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 761 except (ValueError, resource.error): 762 pass 763 764 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 765 "Requires signal.SIGALRM") 766 def test_communicate_eintr(self): 767 # Issue #12493: communicate() should handle EINTR 768 def handler(signum, frame): 769 pass 770 old_handler = signal.signal(signal.SIGALRM, handler) 771 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) 772 773 # the process is running for 2 seconds 774 args = [sys.executable, "-c", 'import time; time.sleep(2)'] 775 for stream in ('stdout', 'stderr'): 776 kw = {stream: subprocess.PIPE} 777 with subprocess.Popen(args, **kw) as process: 778 signal.alarm(1) 779 # communicate() will be interrupted by SIGALRM 780 process.communicate() 781 782 783 @unittest.skipIf(mswindows, "POSIX specific tests") 784 class POSIXProcessTestCase(BaseTestCase): 785 786 def test_exceptions(self): 787 # caught & re-raised exceptions 788 with self.assertRaises(OSError) as c: 789 p = subprocess.Popen([sys.executable, "-c", ""], 514 790 cwd="/this/path/does/not/exist") 515 except OSError, e: 516 # The attribute child_traceback should contain "os.chdir" 517 # somewhere. 518 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1) 519 else: 520 self.fail("Expected OSError") 521 522 def _suppress_core_files(self): 523 """Try to prevent core files from being created. 524 Returns previous ulimit if successful, else None. 525 """ 526 try: 527 import resource 528 old_limit = resource.getrlimit(resource.RLIMIT_CORE) 529 resource.setrlimit(resource.RLIMIT_CORE, (0,0)) 530 return old_limit 531 except (ImportError, ValueError, resource.error): 532 return None 533 534 def _unsuppress_core_files(self, old_limit): 535 """Return core file behavior to default.""" 536 if old_limit is None: 537 return 538 try: 539 import resource 540 resource.setrlimit(resource.RLIMIT_CORE, old_limit) 541 except (ImportError, ValueError, resource.error): 542 return 543 544 def test_run_abort(self): 545 # returncode handles signal termination 546 old_limit = self._suppress_core_files() 547 try: 548 p = subprocess.Popen([sys.executable, 549 "-c", "import os; os.abort()"]) 550 finally: 551 self._unsuppress_core_files(old_limit) 791 # The attribute child_traceback should contain "os.chdir" somewhere. 792 self.assertIn("os.chdir", c.exception.child_traceback) 793 794 def test_run_abort(self): 795 # returncode handles signal termination 796 with _SuppressCoreFiles(): 797 p = subprocess.Popen([sys.executable, "-c", 798 "import os; os.abort()"]) 552 799 p.wait() 553 554 555 556 557 558 'import sys,os;' \559 'sys.stdout.write(os.getenv("FRUIT"))'],800 self.assertEqual(-p.returncode, signal.SIGABRT) 801 802 def test_preexec(self): 803 # preexec function 804 p = subprocess.Popen([sys.executable, "-c", 805 "import sys, os;" 806 "sys.stdout.write(os.getenv('FRUIT'))"], 560 807 stdout=subprocess.PIPE, 561 808 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 562 self.assertEqual(p.stdout.read(), "apple") 563 564 def test_args_string(self): 565 # args is a string 566 f, fname = self.mkstemp() 567 os.write(f, "#!/bin/sh\n") 568 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 569 sys.executable) 570 os.close(f) 571 os.chmod(fname, 0700) 572 p = subprocess.Popen(fname) 573 p.wait() 574 os.remove(fname) 575 self.assertEqual(p.returncode, 47) 576 577 def test_invalid_args(self): 578 # invalid arguments should raise ValueError 579 self.assertRaises(ValueError, subprocess.call, 580 [sys.executable, 581 "-c", "import sys; sys.exit(47)"], 582 startupinfo=47) 583 self.assertRaises(ValueError, subprocess.call, 584 [sys.executable, 585 "-c", "import sys; sys.exit(47)"], 586 creationflags=47) 587 588 def test_shell_sequence(self): 589 # Run command through the shell (sequence) 590 newenv = os.environ.copy() 591 newenv["FRUIT"] = "apple" 592 p = subprocess.Popen(["echo $FRUIT"], shell=1, 593 stdout=subprocess.PIPE, 594 env=newenv) 595 self.assertEqual(p.stdout.read().strip(), "apple") 596 597 def test_shell_string(self): 598 # Run command through the shell (string) 599 newenv = os.environ.copy() 600 newenv["FRUIT"] = "apple" 601 p = subprocess.Popen("echo $FRUIT", shell=1, 602 stdout=subprocess.PIPE, 603 env=newenv) 604 self.assertEqual(p.stdout.read().strip(), "apple") 605 606 def test_call_string(self): 607 # call() function with string argument on UNIX 608 f, fname = self.mkstemp() 609 os.write(f, "#!/bin/sh\n") 610 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 611 sys.executable) 612 os.close(f) 613 os.chmod(fname, 0700) 614 rc = subprocess.call(fname) 615 os.remove(fname) 616 self.assertEqual(rc, 47) 617 618 def DISABLED_test_send_signal(self): 619 p = subprocess.Popen([sys.executable, 620 "-c", "input()"]) 621 622 self.assert_(p.poll() is None, p.poll()) 623 p.send_signal(signal.SIGINT) 624 self.assertNotEqual(p.wait(), 0) 625 626 def DISABLED_test_kill(self): 627 p = subprocess.Popen([sys.executable, 628 "-c", "input()"]) 629 630 self.assert_(p.poll() is None, p.poll()) 631 p.kill() 632 self.assertEqual(p.wait(), -signal.SIGKILL) 633 634 def DISABLED_test_terminate(self): 635 p = subprocess.Popen([sys.executable, 636 "-c", "input()"]) 637 638 self.assert_(p.poll() is None, p.poll()) 639 p.terminate() 640 self.assertEqual(p.wait(), -signal.SIGTERM) 641 642 # 643 # Windows tests 644 # 645 if mswindows: 646 def test_startupinfo(self): 647 # startupinfo argument 648 # We uses hardcoded constants, because we do not want to 649 # depend on win32all. 650 STARTF_USESHOWWINDOW = 1 651 SW_MAXIMIZE = 3 652 startupinfo = subprocess.STARTUPINFO() 653 startupinfo.dwFlags = STARTF_USESHOWWINDOW 654 startupinfo.wShowWindow = SW_MAXIMIZE 655 # Since Python is a console process, it won't be affected 656 # by wShowWindow, but the argument should be silently 657 # ignored 658 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 809 self.addCleanup(p.stdout.close) 810 self.assertEqual(p.stdout.read(), "apple") 811 812 class _TestExecuteChildPopen(subprocess.Popen): 813 """Used to test behavior at the end of _execute_child.""" 814 def __init__(self, testcase, *args, **kwargs): 815 self._testcase = testcase 816 subprocess.Popen.__init__(self, *args, **kwargs) 817 818 def _execute_child( 819 self, args, executable, preexec_fn, close_fds, cwd, env, 820 universal_newlines, startupinfo, creationflags, shell, to_close, 821 p2cread, p2cwrite, 822 c2pread, c2pwrite, 823 errread, errwrite): 824 try: 825 subprocess.Popen._execute_child( 826 self, args, executable, preexec_fn, close_fds, 827 cwd, env, universal_newlines, 828 startupinfo, creationflags, shell, to_close, 829 p2cread, p2cwrite, 830 c2pread, c2pwrite, 831 errread, errwrite) 832 finally: 833 # Open a bunch of file descriptors and verify that 834 # none of them are the same as the ones the Popen 835 # instance is using for stdin/stdout/stderr. 836 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 837 for _ in range(8)] 838 try: 839 for fd in devzero_fds: 840 self._testcase.assertNotIn( 841 fd, (p2cwrite, c2pread, errread)) 842 finally: 843 for fd in devzero_fds: 844 os.close(fd) 845 846 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 847 def test_preexec_errpipe_does_not_double_close_pipes(self): 848 """Issue16140: Don't double close pipes on preexec error.""" 849 850 def raise_it(): 851 raise RuntimeError("force the _execute_child() errpipe_data path.") 852 853 with self.assertRaises(RuntimeError): 854 self._TestExecuteChildPopen( 855 self, [sys.executable, "-c", "pass"], 856 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 857 stderr=subprocess.PIPE, preexec_fn=raise_it) 858 859 def test_args_string(self): 860 # args is a string 861 f, fname = mkstemp() 862 os.write(f, "#!/bin/sh\n") 863 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 864 sys.executable) 865 os.close(f) 866 os.chmod(fname, 0o700) 867 p = subprocess.Popen(fname) 868 p.wait() 869 os.remove(fname) 870 self.assertEqual(p.returncode, 47) 871 872 def test_invalid_args(self): 873 # invalid arguments should raise ValueError 874 self.assertRaises(ValueError, subprocess.call, 875 [sys.executable, "-c", 876 "import sys; sys.exit(47)"], 877 startupinfo=47) 878 self.assertRaises(ValueError, subprocess.call, 879 [sys.executable, "-c", 880 "import sys; sys.exit(47)"], 881 creationflags=47) 882 883 def test_shell_sequence(self): 884 # Run command through the shell (sequence) 885 newenv = os.environ.copy() 886 newenv["FRUIT"] = "apple" 887 p = subprocess.Popen(["echo $FRUIT"], shell=1, 888 stdout=subprocess.PIPE, 889 env=newenv) 890 self.addCleanup(p.stdout.close) 891 self.assertEqual(p.stdout.read().strip(), "apple") 892 893 def test_shell_string(self): 894 # Run command through the shell (string) 895 newenv = os.environ.copy() 896 newenv["FRUIT"] = "apple" 897 p = subprocess.Popen("echo $FRUIT", shell=1, 898 stdout=subprocess.PIPE, 899 env=newenv) 900 self.addCleanup(p.stdout.close) 901 self.assertEqual(p.stdout.read().strip(), "apple") 902 903 def test_call_string(self): 904 # call() function with string argument on UNIX 905 f, fname = mkstemp() 906 os.write(f, "#!/bin/sh\n") 907 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 908 sys.executable) 909 os.close(f) 910 os.chmod(fname, 0700) 911 rc = subprocess.call(fname) 912 os.remove(fname) 913 self.assertEqual(rc, 47) 914 915 def test_specific_shell(self): 916 # Issue #9265: Incorrect name passed as arg[0]. 917 shells = [] 918 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 919 for name in ['bash', 'ksh']: 920 sh = os.path.join(prefix, name) 921 if os.path.isfile(sh): 922 shells.append(sh) 923 if not shells: # Will probably work for any shell but csh. 924 self.skipTest("bash or ksh required for this test") 925 sh = '/bin/sh' 926 if os.path.isfile(sh) and not os.path.islink(sh): 927 # Test will fail if /bin/sh is a symlink to csh. 928 shells.append(sh) 929 for sh in shells: 930 p = subprocess.Popen("echo $0", executable=sh, shell=True, 931 stdout=subprocess.PIPE) 932 self.addCleanup(p.stdout.close) 933 self.assertEqual(p.stdout.read().strip(), sh) 934 935 def _kill_process(self, method, *args): 936 # Do not inherit file handles from the parent. 937 # It should fix failures on some platforms. 938 p = subprocess.Popen([sys.executable, "-c", """if 1: 939 import sys, time 940 sys.stdout.write('x\\n') 941 sys.stdout.flush() 942 time.sleep(30) 943 """], 944 close_fds=True, 945 stdin=subprocess.PIPE, 946 stdout=subprocess.PIPE, 947 stderr=subprocess.PIPE) 948 # Wait for the interpreter to be completely initialized before 949 # sending any signal. 950 p.stdout.read(1) 951 getattr(p, method)(*args) 952 return p 953 954 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 955 "Due to known OS bug (issue #16762)") 956 def _kill_dead_process(self, method, *args): 957 # Do not inherit file handles from the parent. 958 # It should fix failures on some platforms. 959 p = subprocess.Popen([sys.executable, "-c", """if 1: 960 import sys, time 961 sys.stdout.write('x\\n') 962 sys.stdout.flush() 963 """], 964 close_fds=True, 965 stdin=subprocess.PIPE, 966 stdout=subprocess.PIPE, 967 stderr=subprocess.PIPE) 968 # Wait for the interpreter to be completely initialized before 969 # sending any signal. 970 p.stdout.read(1) 971 # The process should end after this 972 time.sleep(1) 973 # This shouldn't raise even though the child is now dead 974 getattr(p, method)(*args) 975 p.communicate() 976 977 def test_send_signal(self): 978 p = self._kill_process('send_signal', signal.SIGINT) 979 _, stderr = p.communicate() 980 self.assertIn('KeyboardInterrupt', stderr) 981 self.assertNotEqual(p.wait(), 0) 982 983 def test_kill(self): 984 p = self._kill_process('kill') 985 _, stderr = p.communicate() 986 self.assertStderrEqual(stderr, '') 987 self.assertEqual(p.wait(), -signal.SIGKILL) 988 989 def test_terminate(self): 990 p = self._kill_process('terminate') 991 _, stderr = p.communicate() 992 self.assertStderrEqual(stderr, '') 993 self.assertEqual(p.wait(), -signal.SIGTERM) 994 995 def test_send_signal_dead(self): 996 # Sending a signal to a dead process 997 self._kill_dead_process('send_signal', signal.SIGINT) 998 999 def test_kill_dead(self): 1000 # Killing a dead process 1001 self._kill_dead_process('kill') 1002 1003 def test_terminate_dead(self): 1004 # Terminating a dead process 1005 self._kill_dead_process('terminate') 1006 1007 def check_close_std_fds(self, fds): 1008 # Issue #9905: test that subprocess pipes still work properly with 1009 # some standard fds closed 1010 stdin = 0 1011 newfds = [] 1012 for a in fds: 1013 b = os.dup(a) 1014 newfds.append(b) 1015 if a == 0: 1016 stdin = b 1017 try: 1018 for fd in fds: 1019 os.close(fd) 1020 out, err = subprocess.Popen([sys.executable, "-c", 1021 'import sys;' 1022 'sys.stdout.write("apple");' 1023 'sys.stdout.flush();' 1024 'sys.stderr.write("orange")'], 1025 stdin=stdin, 1026 stdout=subprocess.PIPE, 1027 stderr=subprocess.PIPE).communicate() 1028 err = test_support.strip_python_stderr(err) 1029 self.assertEqual((out, err), (b'apple', b'orange')) 1030 finally: 1031 for b, a in zip(newfds, fds): 1032 os.dup2(b, a) 1033 for b in newfds: 1034 os.close(b) 1035 1036 def test_close_fd_0(self): 1037 self.check_close_std_fds([0]) 1038 1039 def test_close_fd_1(self): 1040 self.check_close_std_fds([1]) 1041 1042 def test_close_fd_2(self): 1043 self.check_close_std_fds([2]) 1044 1045 def test_close_fds_0_1(self): 1046 self.check_close_std_fds([0, 1]) 1047 1048 def test_close_fds_0_2(self): 1049 self.check_close_std_fds([0, 2]) 1050 1051 def test_close_fds_1_2(self): 1052 self.check_close_std_fds([1, 2]) 1053 1054 def test_close_fds_0_1_2(self): 1055 # Issue #10806: test that subprocess pipes still work properly with 1056 # all standard fds closed. 1057 self.check_close_std_fds([0, 1, 2]) 1058 1059 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 1060 # open up some temporary files 1061 temps = [mkstemp() for i in range(3)] 1062 temp_fds = [fd for fd, fname in temps] 1063 try: 1064 # unlink the files -- we won't need to reopen them 1065 for fd, fname in temps: 1066 os.unlink(fname) 1067 1068 # save a copy of the standard file descriptors 1069 saved_fds = [os.dup(fd) for fd in range(3)] 1070 try: 1071 # duplicate the temp files over the standard fd's 0, 1, 2 1072 for fd, temp_fd in enumerate(temp_fds): 1073 os.dup2(temp_fd, fd) 1074 1075 # write some data to what will become stdin, and rewind 1076 os.write(stdin_no, b"STDIN") 1077 os.lseek(stdin_no, 0, 0) 1078 1079 # now use those files in the given order, so that subprocess 1080 # has to rearrange them in the child 1081 p = subprocess.Popen([sys.executable, "-c", 1082 'import sys; got = sys.stdin.read();' 1083 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 1084 stdin=stdin_no, 1085 stdout=stdout_no, 1086 stderr=stderr_no) 1087 p.wait() 1088 1089 for fd in temp_fds: 1090 os.lseek(fd, 0, 0) 1091 1092 out = os.read(stdout_no, 1024) 1093 err = test_support.strip_python_stderr(os.read(stderr_no, 1024)) 1094 finally: 1095 for std, saved in enumerate(saved_fds): 1096 os.dup2(saved, std) 1097 os.close(saved) 1098 1099 self.assertEqual(out, b"got STDIN") 1100 self.assertEqual(err, b"err") 1101 1102 finally: 1103 for fd in temp_fds: 1104 os.close(fd) 1105 1106 # When duping fds, if there arises a situation where one of the fds is 1107 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 1108 # This tests all combinations of this. 1109 def test_swap_fds(self): 1110 self.check_swap_fds(0, 1, 2) 1111 self.check_swap_fds(0, 2, 1) 1112 self.check_swap_fds(1, 0, 2) 1113 self.check_swap_fds(1, 2, 0) 1114 self.check_swap_fds(2, 0, 1) 1115 self.check_swap_fds(2, 1, 0) 1116 1117 def test_wait_when_sigchild_ignored(self): 1118 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 1119 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 1120 subdir="subprocessdata") 1121 p = subprocess.Popen([sys.executable, sigchild_ignore], 1122 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1123 stdout, stderr = p.communicate() 1124 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 1125 " non-zero with this error:\n%s" % stderr) 1126 1127 def test_zombie_fast_process_del(self): 1128 # Issue #12650: on Unix, if Popen.__del__() was called before the 1129 # process exited, it wouldn't be added to subprocess._active, and would 1130 # remain a zombie. 1131 # spawn a Popen, and delete its reference before it exits 1132 p = subprocess.Popen([sys.executable, "-c", 1133 'import sys, time;' 1134 'time.sleep(0.2)'], 1135 stdout=subprocess.PIPE, 1136 stderr=subprocess.PIPE) 1137 self.addCleanup(p.stdout.close) 1138 self.addCleanup(p.stderr.close) 1139 ident = id(p) 1140 pid = p.pid 1141 del p 1142 # check that p is in the active processes list 1143 self.assertIn(ident, [id(o) for o in subprocess._active]) 1144 1145 def test_leak_fast_process_del_killed(self): 1146 # Issue #12650: on Unix, if Popen.__del__() was called before the 1147 # process exited, and the process got killed by a signal, it would never 1148 # be removed from subprocess._active, which triggered a FD and memory 1149 # leak. 1150 # spawn a Popen, delete its reference and kill it 1151 p = subprocess.Popen([sys.executable, "-c", 1152 'import time;' 1153 'time.sleep(3)'], 1154 stdout=subprocess.PIPE, 1155 stderr=subprocess.PIPE) 1156 self.addCleanup(p.stdout.close) 1157 self.addCleanup(p.stderr.close) 1158 ident = id(p) 1159 pid = p.pid 1160 del p 1161 os.kill(pid, signal.SIGKILL) 1162 # check that p is in the active processes list 1163 self.assertIn(ident, [id(o) for o in subprocess._active]) 1164 1165 # let some time for the process to exit, and create a new Popen: this 1166 # should trigger the wait() of p 1167 time.sleep(0.2) 1168 with self.assertRaises(EnvironmentError) as c: 1169 with subprocess.Popen(['nonexisting_i_hope'], 1170 stdout=subprocess.PIPE, 1171 stderr=subprocess.PIPE) as proc: 1172 pass 1173 # p should have been wait()ed on, and removed from the _active list 1174 self.assertRaises(OSError, os.waitpid, pid, 0) 1175 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 1176 1177 def test_pipe_cloexec(self): 1178 # Issue 12786: check that the communication pipes' FDs are set CLOEXEC, 1179 # and are not inherited by another child process. 1180 p1 = subprocess.Popen([sys.executable, "-c", 1181 'import os;' 1182 'os.read(0, 1)' 1183 ], 1184 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1185 stderr=subprocess.PIPE) 1186 1187 p2 = subprocess.Popen([sys.executable, "-c", """if True: 1188 import os, errno, sys 1189 for fd in %r: 1190 try: 1191 os.close(fd) 1192 except OSError as e: 1193 if e.errno != errno.EBADF: 1194 raise 1195 else: 1196 sys.exit(1) 1197 sys.exit(0) 1198 """ % [f.fileno() for f in (p1.stdin, p1.stdout, 1199 p1.stderr)] 1200 ], 1201 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1202 stderr=subprocess.PIPE, close_fds=False) 1203 p1.communicate('foo') 1204 _, stderr = p2.communicate() 1205 1206 self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr)) 1207 1208 1209 @unittest.skipUnless(mswindows, "Windows specific tests") 1210 class Win32ProcessTestCase(BaseTestCase): 1211 1212 def test_startupinfo(self): 1213 # startupinfo argument 1214 # We uses hardcoded constants, because we do not want to 1215 # depend on win32all. 1216 STARTF_USESHOWWINDOW = 1 1217 SW_MAXIMIZE = 3 1218 startupinfo = subprocess.STARTUPINFO() 1219 startupinfo.dwFlags = STARTF_USESHOWWINDOW 1220 startupinfo.wShowWindow = SW_MAXIMIZE 1221 # Since Python is a console process, it won't be affected 1222 # by wShowWindow, but the argument should be silently 1223 # ignored 1224 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 659 1225 startupinfo=startupinfo) 660 1226 661 def test_creationflags(self): 662 # creationflags argument 663 CREATE_NEW_CONSOLE = 16 664 sys.stderr.write(" a DOS box should flash briefly ...\n") 665 subprocess.call(sys.executable + 666 ' -c "import time; time.sleep(0.25)"', 667 creationflags=CREATE_NEW_CONSOLE) 668 669 def test_invalid_args(self): 670 # invalid arguments should raise ValueError 671 self.assertRaises(ValueError, subprocess.call, 672 [sys.executable, 673 "-c", "import sys; sys.exit(47)"], 674 preexec_fn=lambda: 1) 675 self.assertRaises(ValueError, subprocess.call, 676 [sys.executable, 677 "-c", "import sys; sys.exit(47)"], 678 stdout=subprocess.PIPE, 1227 def test_creationflags(self): 1228 # creationflags argument 1229 CREATE_NEW_CONSOLE = 16 1230 sys.stderr.write(" a DOS box should flash briefly ...\n") 1231 subprocess.call(sys.executable + 1232 ' -c "import time; time.sleep(0.25)"', 1233 creationflags=CREATE_NEW_CONSOLE) 1234 1235 def test_invalid_args(self): 1236 # invalid arguments should raise ValueError 1237 self.assertRaises(ValueError, subprocess.call, 1238 [sys.executable, "-c", 1239 "import sys; sys.exit(47)"], 1240 preexec_fn=lambda: 1) 1241 self.assertRaises(ValueError, subprocess.call, 1242 [sys.executable, "-c", 1243 "import sys; sys.exit(47)"], 1244 stdout=subprocess.PIPE, 1245 close_fds=True) 1246 1247 def test_close_fds(self): 1248 # close file descriptors 1249 rc = subprocess.call([sys.executable, "-c", 1250 "import sys; sys.exit(47)"], 679 1251 close_fds=True) 680 681 def test_close_fds(self): 682 # close file descriptors 683 rc = subprocess.call([sys.executable, "-c", 684 "import sys; sys.exit(47)"], 685 close_fds=True) 686 self.assertEqual(rc, 47) 687 688 def test_shell_sequence(self): 689 # Run command through the shell (sequence) 690 newenv = os.environ.copy() 691 newenv["FRUIT"] = "physalis" 692 p = subprocess.Popen(["set"], shell=1, 693 stdout=subprocess.PIPE, 694 env=newenv) 695 self.assertNotEqual(p.stdout.read().find("physalis"), -1) 696 697 def test_shell_string(self): 698 # Run command through the shell (string) 699 newenv = os.environ.copy() 700 newenv["FRUIT"] = "physalis" 701 p = subprocess.Popen("set", shell=1, 702 stdout=subprocess.PIPE, 703 env=newenv) 704 self.assertNotEqual(p.stdout.read().find("physalis"), -1) 705 706 def test_call_string(self): 707 # call() function with string argument on Windows 708 rc = subprocess.call(sys.executable + 709 ' -c "import sys; sys.exit(47)"') 710 self.assertEqual(rc, 47) 711 712 def DISABLED_test_send_signal(self): 713 p = subprocess.Popen([sys.executable, 714 "-c", "input()"]) 715 716 self.assert_(p.poll() is None, p.poll()) 717 p.send_signal(signal.SIGTERM) 718 self.assertNotEqual(p.wait(), 0) 719 720 def DISABLED_test_kill(self): 721 p = subprocess.Popen([sys.executable, 722 "-c", "input()"]) 723 724 self.assert_(p.poll() is None, p.poll()) 725 p.kill() 726 self.assertNotEqual(p.wait(), 0) 727 728 def DISABLED_test_terminate(self): 729 p = subprocess.Popen([sys.executable, 730 "-c", "input()"]) 731 732 self.assert_(p.poll() is None, p.poll()) 733 p.terminate() 734 self.assertNotEqual(p.wait(), 0) 1252 self.assertEqual(rc, 47) 1253 1254 def test_shell_sequence(self): 1255 # Run command through the shell (sequence) 1256 newenv = os.environ.copy() 1257 newenv["FRUIT"] = "physalis" 1258 p = subprocess.Popen(["set"], shell=1, 1259 stdout=subprocess.PIPE, 1260 env=newenv) 1261 self.addCleanup(p.stdout.close) 1262 self.assertIn("physalis", p.stdout.read()) 1263 1264 def test_shell_string(self): 1265 # Run command through the shell (string) 1266 newenv = os.environ.copy() 1267 newenv["FRUIT"] = "physalis" 1268 p = subprocess.Popen("set", shell=1, 1269 stdout=subprocess.PIPE, 1270 env=newenv) 1271 self.addCleanup(p.stdout.close) 1272 self.assertIn("physalis", p.stdout.read()) 1273 1274 def test_call_string(self): 1275 # call() function with string argument on Windows 1276 rc = subprocess.call(sys.executable + 1277 ' -c "import sys; sys.exit(47)"') 1278 self.assertEqual(rc, 47) 1279 1280 def _kill_process(self, method, *args): 1281 # Some win32 buildbot raises EOFError if stdin is inherited 1282 p = subprocess.Popen([sys.executable, "-c", """if 1: 1283 import sys, time 1284 sys.stdout.write('x\\n') 1285 sys.stdout.flush() 1286 time.sleep(30) 1287 """], 1288 stdin=subprocess.PIPE, 1289 stdout=subprocess.PIPE, 1290 stderr=subprocess.PIPE) 1291 self.addCleanup(p.stdout.close) 1292 self.addCleanup(p.stderr.close) 1293 self.addCleanup(p.stdin.close) 1294 # Wait for the interpreter to be completely initialized before 1295 # sending any signal. 1296 p.stdout.read(1) 1297 getattr(p, method)(*args) 1298 _, stderr = p.communicate() 1299 self.assertStderrEqual(stderr, '') 1300 returncode = p.wait() 1301 self.assertNotEqual(returncode, 0) 1302 1303 def _kill_dead_process(self, method, *args): 1304 p = subprocess.Popen([sys.executable, "-c", """if 1: 1305 import sys, time 1306 sys.stdout.write('x\\n') 1307 sys.stdout.flush() 1308 sys.exit(42) 1309 """], 1310 stdin=subprocess.PIPE, 1311 stdout=subprocess.PIPE, 1312 stderr=subprocess.PIPE) 1313 self.addCleanup(p.stdout.close) 1314 self.addCleanup(p.stderr.close) 1315 self.addCleanup(p.stdin.close) 1316 # Wait for the interpreter to be completely initialized before 1317 # sending any signal. 1318 p.stdout.read(1) 1319 # The process should end after this 1320 time.sleep(1) 1321 # This shouldn't raise even though the child is now dead 1322 getattr(p, method)(*args) 1323 _, stderr = p.communicate() 1324 self.assertStderrEqual(stderr, b'') 1325 rc = p.wait() 1326 self.assertEqual(rc, 42) 1327 1328 def test_send_signal(self): 1329 self._kill_process('send_signal', signal.SIGTERM) 1330 1331 def test_kill(self): 1332 self._kill_process('kill') 1333 1334 def test_terminate(self): 1335 self._kill_process('terminate') 1336 1337 def test_send_signal_dead(self): 1338 self._kill_dead_process('send_signal', signal.SIGTERM) 1339 1340 def test_kill_dead(self): 1341 self._kill_dead_process('kill') 1342 1343 def test_terminate_dead(self): 1344 self._kill_dead_process('terminate') 1345 1346 1347 @unittest.skipUnless(getattr(subprocess, '_has_poll', False), 1348 "poll system call not supported") 1349 class ProcessTestCaseNoPoll(ProcessTestCase): 1350 def setUp(self): 1351 subprocess._has_poll = False 1352 ProcessTestCase.setUp(self) 1353 1354 def tearDown(self): 1355 subprocess._has_poll = True 1356 ProcessTestCase.tearDown(self) 1357 735 1358 736 1359 class HelperFunctionTests(unittest.TestCase): 737 def _test_eintr_retry_call(self): 1360 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 1361 def test_eintr_retry_call(self): 738 1362 record_calls = [] 739 1363 def fake_os_func(*args): … … 751 1375 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 752 1376 753 if not mswindows: 754 test_eintr_retry_call = _test_eintr_retry_call 755 1377 @unittest.skipUnless(mswindows, "mswindows only") 1378 class CommandsWithSpaces (BaseTestCase): 1379 1380 def setUp(self): 1381 super(CommandsWithSpaces, self).setUp() 1382 f, fname = mkstemp(".py", "te st") 1383 self.fname = fname.lower () 1384 os.write(f, b"import sys;" 1385 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1386 ) 1387 os.close(f) 1388 1389 def tearDown(self): 1390 os.remove(self.fname) 1391 super(CommandsWithSpaces, self).tearDown() 1392 1393 def with_spaces(self, *args, **kwargs): 1394 kwargs['stdout'] = subprocess.PIPE 1395 p = subprocess.Popen(*args, **kwargs) 1396 self.addCleanup(p.stdout.close) 1397 self.assertEqual( 1398 p.stdout.read ().decode("mbcs"), 1399 "2 [%r, 'ab cd']" % self.fname 1400 ) 1401 1402 def test_shell_string_with_spaces(self): 1403 # call() function with string argument with spaces on Windows 1404 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1405 "ab cd"), shell=1) 1406 1407 def test_shell_sequence_with_spaces(self): 1408 # call() function with sequence argument with spaces on Windows 1409 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1410 1411 def test_noshell_string_with_spaces(self): 1412 # call() function with string argument with spaces on Windows 1413 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1414 "ab cd")) 1415 1416 def test_noshell_sequence_with_spaces(self): 1417 # call() function with sequence argument with spaces on Windows 1418 self.with_spaces([sys.executable, self.fname, "ab cd"]) 756 1419 757 1420 def test_main(): 758 test_support.run_unittest(ProcessTestCase, 759 HelperFunctionTests) 760 if hasattr(test_support, "reap_children"): 761 test_support.reap_children() 1421 unit_tests = (ProcessTestCase, 1422 POSIXProcessTestCase, 1423 Win32ProcessTestCase, 1424 ProcessTestCaseNoPoll, 1425 HelperFunctionTests, 1426 CommandsWithSpaces) 1427 1428 test_support.run_unittest(*unit_tests) 1429 test_support.reap_children() 762 1430 763 1431 if __name__ == "__main__":
Note:
See TracChangeset
for help on using the changeset viewer.