source: python/trunk/Lib/test/test_subprocess.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 57.4 KB
Line 
1import unittest
2from test import test_support
3import subprocess
4import sys
5import signal
6import os
7import errno
8import tempfile
9import time
10import re
11import sysconfig
12
13try:
14 import resource
15except ImportError:
16 resource = None
17try:
18 import threading
19except ImportError:
20 threading = None
21
22mswindows = (sys.platform == "win32")
23
24#
25# Depends on the following external programs: Python
26#
27
28if mswindows:
29 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
30 'os.O_BINARY);')
31else:
32 SETBINARY = ''
33
34
35try:
36 mkstemp = tempfile.mkstemp
37except AttributeError:
38 # tempfile.mkstemp is not available
39 def mkstemp():
40 """Replacement for mkstemp, calling mktemp."""
41 fname = tempfile.mktemp()
42 return os.open(fname, os.O_RDWR|os.O_CREAT), fname
43
44
45class BaseTestCase(unittest.TestCase):
46 def setUp(self):
47 # Try to minimize the number of children we have so this test
48 # doesn't crash on some buildbots (Alphas in particular).
49 test_support.reap_children()
50
51 def tearDown(self):
52 for inst in subprocess._active:
53 inst.wait()
54 subprocess._cleanup()
55 self.assertFalse(subprocess._active, "subprocess._active not empty")
56
57 def assertStderrEqual(self, stderr, expected, msg=None):
58 # In a debug build, stuff like "[6580 refs]" is printed to stderr at
59 # shutdown time. That frustrates tests trying to check stderr produced
60 # from a spawned Python process.
61 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
62 self.assertEqual(actual, expected, msg)
63
64
65class PopenTestException(Exception):
66 pass
67
68
69class PopenExecuteChildRaises(subprocess.Popen):
70 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
71 _execute_child fails.
72 """
73 def _execute_child(self, *args, **kwargs):
74 raise PopenTestException("Forced Exception for Test")
75
76
77class ProcessTestCase(BaseTestCase):
78
79 def test_call_seq(self):
80 # call() function with sequence argument
81 rc = subprocess.call([sys.executable, "-c",
82 "import sys; sys.exit(47)"])
83 self.assertEqual(rc, 47)
84
85 def test_check_call_zero(self):
86 # check_call() function with zero return code
87 rc = subprocess.check_call([sys.executable, "-c",
88 "import sys; sys.exit(0)"])
89 self.assertEqual(rc, 0)
90
91 def test_check_call_nonzero(self):
92 # check_call() function with non-zero return code
93 with self.assertRaises(subprocess.CalledProcessError) as c:
94 subprocess.check_call([sys.executable, "-c",
95 "import sys; sys.exit(47)"])
96 self.assertEqual(c.exception.returncode, 47)
97
98 def test_check_output(self):
99 # check_output() function with zero return code
100 output = subprocess.check_output(
101 [sys.executable, "-c", "print 'BDFL'"])
102 self.assertIn('BDFL', output)
103
104 def test_check_output_nonzero(self):
105 # check_call() function with non-zero return code
106 with self.assertRaises(subprocess.CalledProcessError) as c:
107 subprocess.check_output(
108 [sys.executable, "-c", "import sys; sys.exit(5)"])
109 self.assertEqual(c.exception.returncode, 5)
110
111 def test_check_output_stderr(self):
112 # check_output() function stderr redirected to stdout
113 output = subprocess.check_output(
114 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
115 stderr=subprocess.STDOUT)
116 self.assertIn('BDFL', output)
117
118 def test_check_output_stdout_arg(self):
119 # check_output() function stderr redirected to stdout
120 with self.assertRaises(ValueError) as c:
121 output = subprocess.check_output(
122 [sys.executable, "-c", "print 'will not be run'"],
123 stdout=sys.stdout)
124 self.fail("Expected ValueError when stdout arg supplied.")
125 self.assertIn('stdout', c.exception.args[0])
126
127 def test_call_kwargs(self):
128 # call() function with keyword args
129 newenv = os.environ.copy()
130 newenv["FRUIT"] = "banana"
131 rc = subprocess.call([sys.executable, "-c",
132 'import sys, os;'
133 'sys.exit(os.getenv("FRUIT")=="banana")'],
134 env=newenv)
135 self.assertEqual(rc, 1)
136
137 def test_invalid_args(self):
138 # Popen() called with invalid arguments should raise TypeError
139 # but Popen.__del__ should not complain (issue #12085)
140 with test_support.captured_stderr() as s:
141 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
142 argcount = subprocess.Popen.__init__.__code__.co_argcount
143 too_many_args = [0] * (argcount + 1)
144 self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
145 self.assertEqual(s.getvalue(), '')
146
147 def test_stdin_none(self):
148 # .stdin is None when not redirected
149 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
150 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
151 self.addCleanup(p.stdout.close)
152 self.addCleanup(p.stderr.close)
153 p.wait()
154 self.assertEqual(p.stdin, None)
155
156 def test_stdout_none(self):
157 # .stdout is None when not redirected, and the child's stdout will
158 # be inherited from the parent. In order to test this we run a
159 # subprocess in a subprocess:
160 # this_test
161 # \-- subprocess created by this test (parent)
162 # \-- subprocess created by the parent subprocess (child)
163 # The parent doesn't specify stdout, so the child will use the
164 # parent's stdout. This test checks that the message printed by the
165 # child goes to the parent stdout. The parent also checks that the
166 # child's stdout is None. See #11963.
167 code = ('import sys; from subprocess import Popen, PIPE;'
168 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
169 ' stdin=PIPE, stderr=PIPE);'
170 'p.wait(); assert p.stdout is None;')
171 p = subprocess.Popen([sys.executable, "-c", code],
172 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
173 self.addCleanup(p.stdout.close)
174 self.addCleanup(p.stderr.close)
175 out, err = p.communicate()
176 self.assertEqual(p.returncode, 0, err)
177 self.assertEqual(out.rstrip(), 'test_stdout_none')
178
179 def test_stderr_none(self):
180 # .stderr is None when not redirected
181 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
182 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
183 self.addCleanup(p.stdout.close)
184 self.addCleanup(p.stdin.close)
185 p.wait()
186 self.assertEqual(p.stderr, None)
187
188 def test_executable_with_cwd(self):
189 python_dir = os.path.dirname(os.path.realpath(sys.executable))
190 p = subprocess.Popen(["somethingyoudonthave", "-c",
191 "import sys; sys.exit(47)"],
192 executable=sys.executable, cwd=python_dir)
193 p.wait()
194 self.assertEqual(p.returncode, 47)
195
196 @unittest.skipIf(sysconfig.is_python_build(),
197 "need an installed Python. See #7774")
198 def test_executable_without_cwd(self):
199 # For a normal installation, it should work without 'cwd'
200 # argument. For test runs in the build directory, see #7774.
201 p = subprocess.Popen(["somethingyoudonthave", "-c",
202 "import sys; sys.exit(47)"],
203 executable=sys.executable)
204 p.wait()
205 self.assertEqual(p.returncode, 47)
206
207 def test_stdin_pipe(self):
208 # stdin redirection
209 p = subprocess.Popen([sys.executable, "-c",
210 'import sys; sys.exit(sys.stdin.read() == "pear")'],
211 stdin=subprocess.PIPE)
212 p.stdin.write("pear")
213 p.stdin.close()
214 p.wait()
215 self.assertEqual(p.returncode, 1)
216
217 def test_stdin_filedes(self):
218 # stdin is set to open file descriptor
219 tf = tempfile.TemporaryFile()
220 d = tf.fileno()
221 os.write(d, "pear")
222 os.lseek(d, 0, 0)
223 p = subprocess.Popen([sys.executable, "-c",
224 'import sys; sys.exit(sys.stdin.read() == "pear")'],
225 stdin=d)
226 p.wait()
227 self.assertEqual(p.returncode, 1)
228
229 def test_stdin_fileobj(self):
230 # stdin is set to open file object
231 tf = tempfile.TemporaryFile()
232 tf.write("pear")
233 tf.seek(0)
234 p = subprocess.Popen([sys.executable, "-c",
235 'import sys; sys.exit(sys.stdin.read() == "pear")'],
236 stdin=tf)
237 p.wait()
238 self.assertEqual(p.returncode, 1)
239
240 def test_stdout_pipe(self):
241 # stdout redirection
242 p = subprocess.Popen([sys.executable, "-c",
243 'import sys; sys.stdout.write("orange")'],
244 stdout=subprocess.PIPE)
245 self.addCleanup(p.stdout.close)
246 self.assertEqual(p.stdout.read(), "orange")
247
248 def test_stdout_filedes(self):
249 # stdout is set to open file descriptor
250 tf = tempfile.TemporaryFile()
251 d = tf.fileno()
252 p = subprocess.Popen([sys.executable, "-c",
253 'import sys; sys.stdout.write("orange")'],
254 stdout=d)
255 p.wait()
256 os.lseek(d, 0, 0)
257 self.assertEqual(os.read(d, 1024), "orange")
258
259 def test_stdout_fileobj(self):
260 # stdout is set to open file object
261 tf = tempfile.TemporaryFile()
262 p = subprocess.Popen([sys.executable, "-c",
263 'import sys; sys.stdout.write("orange")'],
264 stdout=tf)
265 p.wait()
266 tf.seek(0)
267 self.assertEqual(tf.read(), "orange")
268
269 def test_stderr_pipe(self):
270 # stderr redirection
271 p = subprocess.Popen([sys.executable, "-c",
272 'import sys; sys.stderr.write("strawberry")'],
273 stderr=subprocess.PIPE)
274 self.addCleanup(p.stderr.close)
275 self.assertStderrEqual(p.stderr.read(), "strawberry")
276
277 def test_stderr_filedes(self):
278 # stderr is set to open file descriptor
279 tf = tempfile.TemporaryFile()
280 d = tf.fileno()
281 p = subprocess.Popen([sys.executable, "-c",
282 'import sys; sys.stderr.write("strawberry")'],
283 stderr=d)
284 p.wait()
285 os.lseek(d, 0, 0)
286 self.assertStderrEqual(os.read(d, 1024), "strawberry")
287
288 def test_stderr_fileobj(self):
289 # stderr is set to open file object
290 tf = tempfile.TemporaryFile()
291 p = subprocess.Popen([sys.executable, "-c",
292 'import sys; sys.stderr.write("strawberry")'],
293 stderr=tf)
294 p.wait()
295 tf.seek(0)
296 self.assertStderrEqual(tf.read(), "strawberry")
297
298 def test_stdout_stderr_pipe(self):
299 # capture stdout and stderr to the same pipe
300 p = subprocess.Popen([sys.executable, "-c",
301 'import sys;'
302 'sys.stdout.write("apple");'
303 'sys.stdout.flush();'
304 'sys.stderr.write("orange")'],
305 stdout=subprocess.PIPE,
306 stderr=subprocess.STDOUT)
307 self.addCleanup(p.stdout.close)
308 self.assertStderrEqual(p.stdout.read(), "appleorange")
309
310 def test_stdout_stderr_file(self):
311 # capture stdout and stderr to the same open file
312 tf = tempfile.TemporaryFile()
313 p = subprocess.Popen([sys.executable, "-c",
314 'import sys;'
315 'sys.stdout.write("apple");'
316 'sys.stdout.flush();'
317 'sys.stderr.write("orange")'],
318 stdout=tf,
319 stderr=tf)
320 p.wait()
321 tf.seek(0)
322 self.assertStderrEqual(tf.read(), "appleorange")
323
324 def test_stdout_filedes_of_stdout(self):
325 # stdout is set to 1 (#1531862).
326 # To avoid printing the text on stdout, we do something similar to
327 # test_stdout_none (see above). The parent subprocess calls the child
328 # subprocess passing stdout=1, and this test uses stdout=PIPE in
329 # order to capture and check the output of the parent. See #11963.
330 code = ('import sys, subprocess; '
331 'rc = subprocess.call([sys.executable, "-c", '
332 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
333 '\'test with stdout=1\'))"], stdout=1); '
334 'assert rc == 18')
335 p = subprocess.Popen([sys.executable, "-c", code],
336 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
337 self.addCleanup(p.stdout.close)
338 self.addCleanup(p.stderr.close)
339 out, err = p.communicate()
340 self.assertEqual(p.returncode, 0, err)
341 self.assertEqual(out.rstrip(), 'test with stdout=1')
342
343 def test_cwd(self):
344 tmpdir = tempfile.gettempdir()
345 # We cannot use os.path.realpath to canonicalize the path,
346 # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
347 cwd = os.getcwd()
348 os.chdir(tmpdir)
349 tmpdir = os.getcwd()
350 os.chdir(cwd)
351 p = subprocess.Popen([sys.executable, "-c",
352 'import sys,os;'
353 'sys.stdout.write(os.getcwd())'],
354 stdout=subprocess.PIPE,
355 cwd=tmpdir)
356 self.addCleanup(p.stdout.close)
357 normcase = os.path.normcase
358 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
359
360 def test_env(self):
361 newenv = os.environ.copy()
362 newenv["FRUIT"] = "orange"
363 p = subprocess.Popen([sys.executable, "-c",
364 'import sys,os;'
365 'sys.stdout.write(os.getenv("FRUIT"))'],
366 stdout=subprocess.PIPE,
367 env=newenv)
368 self.addCleanup(p.stdout.close)
369 self.assertEqual(p.stdout.read(), "orange")
370
371 def test_communicate_stdin(self):
372 p = subprocess.Popen([sys.executable, "-c",
373 'import sys;'
374 'sys.exit(sys.stdin.read() == "pear")'],
375 stdin=subprocess.PIPE)
376 p.communicate("pear")
377 self.assertEqual(p.returncode, 1)
378
379 def test_communicate_stdout(self):
380 p = subprocess.Popen([sys.executable, "-c",
381 'import sys; sys.stdout.write("pineapple")'],
382 stdout=subprocess.PIPE)
383 (stdout, stderr) = p.communicate()
384 self.assertEqual(stdout, "pineapple")
385 self.assertEqual(stderr, None)
386
387 def test_communicate_stderr(self):
388 p = subprocess.Popen([sys.executable, "-c",
389 'import sys; sys.stderr.write("pineapple")'],
390 stderr=subprocess.PIPE)
391 (stdout, stderr) = p.communicate()
392 self.assertEqual(stdout, None)
393 self.assertStderrEqual(stderr, "pineapple")
394
395 def test_communicate(self):
396 p = subprocess.Popen([sys.executable, "-c",
397 'import sys,os;'
398 'sys.stderr.write("pineapple");'
399 'sys.stdout.write(sys.stdin.read())'],
400 stdin=subprocess.PIPE,
401 stdout=subprocess.PIPE,
402 stderr=subprocess.PIPE)
403 self.addCleanup(p.stdout.close)
404 self.addCleanup(p.stderr.close)
405 self.addCleanup(p.stdin.close)
406 (stdout, stderr) = p.communicate("banana")
407 self.assertEqual(stdout, "banana")
408 self.assertStderrEqual(stderr, "pineapple")
409
410 # This test is Linux specific for simplicity to at least have
411 # some coverage. It is not a platform specific bug.
412 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
413 "Linux specific")
414 # Test for the fd leak reported in http://bugs.python.org/issue2791.
415 def test_communicate_pipe_fd_leak(self):
416 fd_directory = '/proc/%d/fd' % os.getpid()
417 num_fds_before_popen = len(os.listdir(fd_directory))
418 p = subprocess.Popen([sys.executable, "-c", "print()"],
419 stdout=subprocess.PIPE)
420 p.communicate()
421 num_fds_after_communicate = len(os.listdir(fd_directory))
422 del p
423 num_fds_after_destruction = len(os.listdir(fd_directory))
424 self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
425 self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
426
427 def test_communicate_returns(self):
428 # communicate() should return None if no redirection is active
429 p = subprocess.Popen([sys.executable, "-c",
430 "import sys; sys.exit(47)"])
431 (stdout, stderr) = p.communicate()
432 self.assertEqual(stdout, None)
433 self.assertEqual(stderr, None)
434
435 def test_communicate_pipe_buf(self):
436 # communicate() with writes larger than pipe_buf
437 # This test will probably deadlock rather than fail, if
438 # communicate() does not work properly.
439 x, y = os.pipe()
440 if mswindows:
441 pipe_buf = 512
442 else:
443 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
444 os.close(x)
445 os.close(y)
446 p = subprocess.Popen([sys.executable, "-c",
447 'import sys,os;'
448 'sys.stdout.write(sys.stdin.read(47));'
449 'sys.stderr.write("xyz"*%d);'
450 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
451 stdin=subprocess.PIPE,
452 stdout=subprocess.PIPE,
453 stderr=subprocess.PIPE)
454 self.addCleanup(p.stdout.close)
455 self.addCleanup(p.stderr.close)
456 self.addCleanup(p.stdin.close)
457 string_to_write = "abc"*pipe_buf
458 (stdout, stderr) = p.communicate(string_to_write)
459 self.assertEqual(stdout, string_to_write)
460
461 def test_writes_before_communicate(self):
462 # stdin.write before communicate()
463 p = subprocess.Popen([sys.executable, "-c",
464 'import sys,os;'
465 'sys.stdout.write(sys.stdin.read())'],
466 stdin=subprocess.PIPE,
467 stdout=subprocess.PIPE,
468 stderr=subprocess.PIPE)
469 self.addCleanup(p.stdout.close)
470 self.addCleanup(p.stderr.close)
471 self.addCleanup(p.stdin.close)
472 p.stdin.write("banana")
473 (stdout, stderr) = p.communicate("split")
474 self.assertEqual(stdout, "bananasplit")
475 self.assertStderrEqual(stderr, "")
476
477 def test_universal_newlines(self):
478 p = subprocess.Popen([sys.executable, "-c",
479 'import sys,os;' + SETBINARY +
480 'sys.stdout.write("line1\\n");'
481 'sys.stdout.flush();'
482 'sys.stdout.write("line2\\r");'
483 'sys.stdout.flush();'
484 'sys.stdout.write("line3\\r\\n");'
485 'sys.stdout.flush();'
486 'sys.stdout.write("line4\\r");'
487 'sys.stdout.flush();'
488 'sys.stdout.write("\\nline5");'
489 'sys.stdout.flush();'
490 'sys.stdout.write("\\nline6");'],
491 stdout=subprocess.PIPE,
492 universal_newlines=1)
493 self.addCleanup(p.stdout.close)
494 stdout = p.stdout.read()
495 if hasattr(file, 'newlines'):
496 # Interpreter with universal newline support
497 self.assertEqual(stdout,
498 "line1\nline2\nline3\nline4\nline5\nline6")
499 else:
500 # Interpreter without universal newline support
501 self.assertEqual(stdout,
502 "line1\nline2\rline3\r\nline4\r\nline5\nline6")
503
504 def test_universal_newlines_communicate(self):
505 # universal newlines through communicate()
506 p = subprocess.Popen([sys.executable, "-c",
507 'import sys,os;' + SETBINARY +
508 'sys.stdout.write("line1\\n");'
509 'sys.stdout.flush();'
510 'sys.stdout.write("line2\\r");'
511 'sys.stdout.flush();'
512 'sys.stdout.write("line3\\r\\n");'
513 'sys.stdout.flush();'
514 'sys.stdout.write("line4\\r");'
515 'sys.stdout.flush();'
516 'sys.stdout.write("\\nline5");'
517 'sys.stdout.flush();'
518 'sys.stdout.write("\\nline6");'],
519 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
520 universal_newlines=1)
521 self.addCleanup(p.stdout.close)
522 self.addCleanup(p.stderr.close)
523 (stdout, stderr) = p.communicate()
524 if hasattr(file, 'newlines'):
525 # Interpreter with universal newline support
526 self.assertEqual(stdout,
527 "line1\nline2\nline3\nline4\nline5\nline6")
528 else:
529 # Interpreter without universal newline support
530 self.assertEqual(stdout,
531 "line1\nline2\rline3\r\nline4\r\nline5\nline6")
532
533 def test_no_leaking(self):
534 # Make sure we leak no resources
535 if not mswindows:
536 max_handles = 1026 # too much for most UNIX systems
537 else:
538 max_handles = 2050 # too much for (at least some) Windows setups
539 handles = []
540 try:
541 for i in range(max_handles):
542 try:
543 handles.append(os.open(test_support.TESTFN,
544 os.O_WRONLY | os.O_CREAT))
545 except OSError as e:
546 if e.errno != errno.EMFILE:
547 raise
548 break
549 else:
550 self.skipTest("failed to reach the file descriptor limit "
551 "(tried %d)" % max_handles)
552 # Close a couple of them (should be enough for a subprocess)
553 for i in range(10):
554 os.close(handles.pop())
555 # Loop creating some subprocesses. If one of them leaks some fds,
556 # the next loop iteration will fail by reaching the max fd limit.
557 for i in range(15):
558 p = subprocess.Popen([sys.executable, "-c",
559 "import sys;"
560 "sys.stdout.write(sys.stdin.read())"],
561 stdin=subprocess.PIPE,
562 stdout=subprocess.PIPE,
563 stderr=subprocess.PIPE)
564 data = p.communicate(b"lime")[0]
565 self.assertEqual(data, b"lime")
566 finally:
567 for h in handles:
568 os.close(h)
569 test_support.unlink(test_support.TESTFN)
570
571 def test_list2cmdline(self):
572 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
573 '"a b c" d e')
574 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
575 'ab\\"c \\ d')
576 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
577 'ab\\"c " \\\\" d')
578 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
579 'a\\\\\\b "de fg" h')
580 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
581 'a\\\\\\"b c d')
582 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
583 '"a\\\\b c" d e')
584 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
585 '"a\\\\b\\ c" d e')
586 self.assertEqual(subprocess.list2cmdline(['ab', '']),
587 'ab ""')
588
589
590 def test_poll(self):
591 p = subprocess.Popen([sys.executable,
592 "-c", "import time; time.sleep(1)"])
593 count = 0
594 while p.poll() is None:
595 time.sleep(0.1)
596 count += 1
597 # We expect that the poll loop probably went around about 10 times,
598 # but, based on system scheduling we can't control, it's possible
599 # poll() never returned None. It "should be" very rare that it
600 # didn't go around at least twice.
601 self.assertGreaterEqual(count, 2)
602 # Subsequent invocations should just return the returncode
603 self.assertEqual(p.poll(), 0)
604
605
606 def test_wait(self):
607 p = subprocess.Popen([sys.executable,
608 "-c", "import time; time.sleep(2)"])
609 self.assertEqual(p.wait(), 0)
610 # Subsequent invocations should just return the returncode
611 self.assertEqual(p.wait(), 0)
612
613
614 def test_invalid_bufsize(self):
615 # an invalid type of the bufsize argument should raise
616 # TypeError.
617 with self.assertRaises(TypeError):
618 subprocess.Popen([sys.executable, "-c", "pass"], "orange")
619
620 def test_leaking_fds_on_error(self):
621 # see bug #5179: Popen leaks file descriptors to PIPEs if
622 # the child fails to execute; this will eventually exhaust
623 # the maximum number of open fds. 1024 seems a very common
624 # value for that limit, but Windows has 2048, so we loop
625 # 1024 times (each call leaked two fds).
626 for i in range(1024):
627 # Windows raises IOError. Others raise OSError.
628 with self.assertRaises(EnvironmentError) as c:
629 subprocess.Popen(['nonexisting_i_hope'],
630 stdout=subprocess.PIPE,
631 stderr=subprocess.PIPE)
632 # ignore errors that indicate the command was not found
633 if c.exception.errno not in (errno.ENOENT, errno.EACCES):
634 raise c.exception
635
636 @unittest.skipIf(threading is None, "threading required")
637 def test_double_close_on_error(self):
638 # Issue #18851
639 fds = []
640 def open_fds():
641 for i in range(20):
642 fds.extend(os.pipe())
643 time.sleep(0.001)
644 t = threading.Thread(target=open_fds)
645 t.start()
646 try:
647 with self.assertRaises(EnvironmentError):
648 subprocess.Popen(['nonexisting_i_hope'],
649 stdin=subprocess.PIPE,
650 stdout=subprocess.PIPE,
651 stderr=subprocess.PIPE)
652 finally:
653 t.join()
654 exc = None
655 for fd in fds:
656 # If a double close occurred, some of those fds will
657 # already have been closed by mistake, and os.close()
658 # here will raise.
659 try:
660 os.close(fd)
661 except OSError as e:
662 exc = e
663 if exc is not None:
664 raise exc
665
666 def test_handles_closed_on_exception(self):
667 # If CreateProcess exits with an error, ensure the
668 # duplicate output handles are released
669 ifhandle, ifname = mkstemp()
670 ofhandle, ofname = mkstemp()
671 efhandle, efname = mkstemp()
672 try:
673 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
674 stderr=efhandle)
675 except OSError:
676 os.close(ifhandle)
677 os.remove(ifname)
678 os.close(ofhandle)
679 os.remove(ofname)
680 os.close(efhandle)
681 os.remove(efname)
682 self.assertFalse(os.path.exists(ifname))
683 self.assertFalse(os.path.exists(ofname))
684 self.assertFalse(os.path.exists(efname))
685
686 def test_communicate_epipe(self):
687 # Issue 10963: communicate() should hide EPIPE
688 p = subprocess.Popen([sys.executable, "-c", 'pass'],
689 stdin=subprocess.PIPE,
690 stdout=subprocess.PIPE,
691 stderr=subprocess.PIPE)
692 self.addCleanup(p.stdout.close)
693 self.addCleanup(p.stderr.close)
694 self.addCleanup(p.stdin.close)
695 p.communicate("x" * 2**20)
696
697 def test_communicate_epipe_only_stdin(self):
698 # Issue 10963: communicate() should hide EPIPE
699 p = subprocess.Popen([sys.executable, "-c", 'pass'],
700 stdin=subprocess.PIPE)
701 self.addCleanup(p.stdin.close)
702 time.sleep(2)
703 p.communicate("x" * 2**20)
704
705 # This test is Linux-ish specific for simplicity to at least have
706 # some coverage. It is not a platform specific bug.
707 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
708 "Linux specific")
709 def test_failed_child_execute_fd_leak(self):
710 """Test for the fork() failure fd leak reported in issue16327."""
711 fd_directory = '/proc/%d/fd' % os.getpid()
712 fds_before_popen = os.listdir(fd_directory)
713 with self.assertRaises(PopenTestException):
714 PopenExecuteChildRaises(
715 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
716 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
717
718 # NOTE: This test doesn't verify that the real _execute_child
719 # does not close the file descriptors itself on the way out
720 # during an exception. Code inspection has confirmed that.
721
722 fds_after_exception = os.listdir(fd_directory)
723 self.assertEqual(fds_before_popen, fds_after_exception)
724
725
726# context manager
727class _SuppressCoreFiles(object):
728 """Try to prevent core files from being created."""
729 old_limit = None
730
731 def __enter__(self):
732 """Try to save previous ulimit, then set it to (0, 0)."""
733 if resource is not None:
734 try:
735 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
736 resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
737 except (ValueError, resource.error):
738 pass
739
740 if sys.platform == 'darwin':
741 # Check if the 'Crash Reporter' on OSX was configured
742 # in 'Developer' mode and warn that it will get triggered
743 # when it is.
744 #
745 # This assumes that this context manager is used in tests
746 # that might trigger the next manager.
747 value = subprocess.Popen(['/usr/bin/defaults', 'read',
748 'com.apple.CrashReporter', 'DialogType'],
749 stdout=subprocess.PIPE).communicate()[0]
750 if value.strip() == b'developer':
751 print "this tests triggers the Crash Reporter, that is intentional"
752 sys.stdout.flush()
753
754 def __exit__(self, *args):
755 """Return core file behavior to default."""
756 if self.old_limit is None:
757 return
758 if resource is not None:
759 try:
760 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
761 except (ValueError, resource.error):
762 pass
763
764 @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
765 "Requires signal.SIGALRM")
766 def test_communicate_eintr(self):
767 # Issue #12493: communicate() should handle EINTR
768 def handler(signum, frame):
769 pass
770 old_handler = signal.signal(signal.SIGALRM, handler)
771 self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
772
773 # the process is running for 2 seconds
774 args = [sys.executable, "-c", 'import time; time.sleep(2)']
775 for stream in ('stdout', 'stderr'):
776 kw = {stream: subprocess.PIPE}
777 with subprocess.Popen(args, **kw) as process:
778 signal.alarm(1)
779 # communicate() will be interrupted by SIGALRM
780 process.communicate()
781
782
783@unittest.skipIf(mswindows, "POSIX specific tests")
784class POSIXProcessTestCase(BaseTestCase):
785
786 def test_exceptions(self):
787 # caught & re-raised exceptions
788 with self.assertRaises(OSError) as c:
789 p = subprocess.Popen([sys.executable, "-c", ""],
790 cwd="/this/path/does/not/exist")
791 # The attribute child_traceback should contain "os.chdir" somewhere.
792 self.assertIn("os.chdir", c.exception.child_traceback)
793
794 def test_run_abort(self):
795 # returncode handles signal termination
796 with _SuppressCoreFiles():
797 p = subprocess.Popen([sys.executable, "-c",
798 "import os; os.abort()"])
799 p.wait()
800 self.assertEqual(-p.returncode, signal.SIGABRT)
801
802 def test_preexec(self):
803 # preexec function
804 p = subprocess.Popen([sys.executable, "-c",
805 "import sys, os;"
806 "sys.stdout.write(os.getenv('FRUIT'))"],
807 stdout=subprocess.PIPE,
808 preexec_fn=lambda: os.putenv("FRUIT", "apple"))
809 self.addCleanup(p.stdout.close)
810 self.assertEqual(p.stdout.read(), "apple")
811
812 class _TestExecuteChildPopen(subprocess.Popen):
813 """Used to test behavior at the end of _execute_child."""
814 def __init__(self, testcase, *args, **kwargs):
815 self._testcase = testcase
816 subprocess.Popen.__init__(self, *args, **kwargs)
817
818 def _execute_child(
819 self, args, executable, preexec_fn, close_fds, cwd, env,
820 universal_newlines, startupinfo, creationflags, shell, to_close,
821 p2cread, p2cwrite,
822 c2pread, c2pwrite,
823 errread, errwrite):
824 try:
825 subprocess.Popen._execute_child(
826 self, args, executable, preexec_fn, close_fds,
827 cwd, env, universal_newlines,
828 startupinfo, creationflags, shell, to_close,
829 p2cread, p2cwrite,
830 c2pread, c2pwrite,
831 errread, errwrite)
832 finally:
833 # Open a bunch of file descriptors and verify that
834 # none of them are the same as the ones the Popen
835 # instance is using for stdin/stdout/stderr.
836 devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
837 for _ in range(8)]
838 try:
839 for fd in devzero_fds:
840 self._testcase.assertNotIn(
841 fd, (p2cwrite, c2pread, errread))
842 finally:
843 for fd in devzero_fds:
844 os.close(fd)
845
846 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
847 def test_preexec_errpipe_does_not_double_close_pipes(self):
848 """Issue16140: Don't double close pipes on preexec error."""
849
850 def raise_it():
851 raise RuntimeError("force the _execute_child() errpipe_data path.")
852
853 with self.assertRaises(RuntimeError):
854 self._TestExecuteChildPopen(
855 self, [sys.executable, "-c", "pass"],
856 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
857 stderr=subprocess.PIPE, preexec_fn=raise_it)
858
859 def test_args_string(self):
860 # args is a string
861 f, fname = mkstemp()
862 os.write(f, "#!/bin/sh\n")
863 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
864 sys.executable)
865 os.close(f)
866 os.chmod(fname, 0o700)
867 p = subprocess.Popen(fname)
868 p.wait()
869 os.remove(fname)
870 self.assertEqual(p.returncode, 47)
871
872 def test_invalid_args(self):
873 # invalid arguments should raise ValueError
874 self.assertRaises(ValueError, subprocess.call,
875 [sys.executable, "-c",
876 "import sys; sys.exit(47)"],
877 startupinfo=47)
878 self.assertRaises(ValueError, subprocess.call,
879 [sys.executable, "-c",
880 "import sys; sys.exit(47)"],
881 creationflags=47)
882
883 def test_shell_sequence(self):
884 # Run command through the shell (sequence)
885 newenv = os.environ.copy()
886 newenv["FRUIT"] = "apple"
887 p = subprocess.Popen(["echo $FRUIT"], shell=1,
888 stdout=subprocess.PIPE,
889 env=newenv)
890 self.addCleanup(p.stdout.close)
891 self.assertEqual(p.stdout.read().strip(), "apple")
892
893 def test_shell_string(self):
894 # Run command through the shell (string)
895 newenv = os.environ.copy()
896 newenv["FRUIT"] = "apple"
897 p = subprocess.Popen("echo $FRUIT", shell=1,
898 stdout=subprocess.PIPE,
899 env=newenv)
900 self.addCleanup(p.stdout.close)
901 self.assertEqual(p.stdout.read().strip(), "apple")
902
903 def test_call_string(self):
904 # call() function with string argument on UNIX
905 f, fname = mkstemp()
906 os.write(f, "#!/bin/sh\n")
907 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
908 sys.executable)
909 os.close(f)
910 os.chmod(fname, 0700)
911 rc = subprocess.call(fname)
912 os.remove(fname)
913 self.assertEqual(rc, 47)
914
915 def test_specific_shell(self):
916 # Issue #9265: Incorrect name passed as arg[0].
917 shells = []
918 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
919 for name in ['bash', 'ksh']:
920 sh = os.path.join(prefix, name)
921 if os.path.isfile(sh):
922 shells.append(sh)
923 if not shells: # Will probably work for any shell but csh.
924 self.skipTest("bash or ksh required for this test")
925 sh = '/bin/sh'
926 if os.path.isfile(sh) and not os.path.islink(sh):
927 # Test will fail if /bin/sh is a symlink to csh.
928 shells.append(sh)
929 for sh in shells:
930 p = subprocess.Popen("echo $0", executable=sh, shell=True,
931 stdout=subprocess.PIPE)
932 self.addCleanup(p.stdout.close)
933 self.assertEqual(p.stdout.read().strip(), sh)
934
935 def _kill_process(self, method, *args):
936 # Do not inherit file handles from the parent.
937 # It should fix failures on some platforms.
938 p = subprocess.Popen([sys.executable, "-c", """if 1:
939 import sys, time
940 sys.stdout.write('x\\n')
941 sys.stdout.flush()
942 time.sleep(30)
943 """],
944 close_fds=True,
945 stdin=subprocess.PIPE,
946 stdout=subprocess.PIPE,
947 stderr=subprocess.PIPE)
948 # Wait for the interpreter to be completely initialized before
949 # sending any signal.
950 p.stdout.read(1)
951 getattr(p, method)(*args)
952 return p
953
954 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
955 "Due to known OS bug (issue #16762)")
956 def _kill_dead_process(self, method, *args):
957 # Do not inherit file handles from the parent.
958 # It should fix failures on some platforms.
959 p = subprocess.Popen([sys.executable, "-c", """if 1:
960 import sys, time
961 sys.stdout.write('x\\n')
962 sys.stdout.flush()
963 """],
964 close_fds=True,
965 stdin=subprocess.PIPE,
966 stdout=subprocess.PIPE,
967 stderr=subprocess.PIPE)
968 # Wait for the interpreter to be completely initialized before
969 # sending any signal.
970 p.stdout.read(1)
971 # The process should end after this
972 time.sleep(1)
973 # This shouldn't raise even though the child is now dead
974 getattr(p, method)(*args)
975 p.communicate()
976
977 def test_send_signal(self):
978 p = self._kill_process('send_signal', signal.SIGINT)
979 _, stderr = p.communicate()
980 self.assertIn('KeyboardInterrupt', stderr)
981 self.assertNotEqual(p.wait(), 0)
982
983 def test_kill(self):
984 p = self._kill_process('kill')
985 _, stderr = p.communicate()
986 self.assertStderrEqual(stderr, '')
987 self.assertEqual(p.wait(), -signal.SIGKILL)
988
989 def test_terminate(self):
990 p = self._kill_process('terminate')
991 _, stderr = p.communicate()
992 self.assertStderrEqual(stderr, '')
993 self.assertEqual(p.wait(), -signal.SIGTERM)
994
995 def test_send_signal_dead(self):
996 # Sending a signal to a dead process
997 self._kill_dead_process('send_signal', signal.SIGINT)
998
999 def test_kill_dead(self):
1000 # Killing a dead process
1001 self._kill_dead_process('kill')
1002
1003 def test_terminate_dead(self):
1004 # Terminating a dead process
1005 self._kill_dead_process('terminate')
1006
1007 def check_close_std_fds(self, fds):
1008 # Issue #9905: test that subprocess pipes still work properly with
1009 # some standard fds closed
1010 stdin = 0
1011 newfds = []
1012 for a in fds:
1013 b = os.dup(a)
1014 newfds.append(b)
1015 if a == 0:
1016 stdin = b
1017 try:
1018 for fd in fds:
1019 os.close(fd)
1020 out, err = subprocess.Popen([sys.executable, "-c",
1021 'import sys;'
1022 'sys.stdout.write("apple");'
1023 'sys.stdout.flush();'
1024 'sys.stderr.write("orange")'],
1025 stdin=stdin,
1026 stdout=subprocess.PIPE,
1027 stderr=subprocess.PIPE).communicate()
1028 err = test_support.strip_python_stderr(err)
1029 self.assertEqual((out, err), (b'apple', b'orange'))
1030 finally:
1031 for b, a in zip(newfds, fds):
1032 os.dup2(b, a)
1033 for b in newfds:
1034 os.close(b)
1035
1036 def test_close_fd_0(self):
1037 self.check_close_std_fds([0])
1038
1039 def test_close_fd_1(self):
1040 self.check_close_std_fds([1])
1041
1042 def test_close_fd_2(self):
1043 self.check_close_std_fds([2])
1044
1045 def test_close_fds_0_1(self):
1046 self.check_close_std_fds([0, 1])
1047
1048 def test_close_fds_0_2(self):
1049 self.check_close_std_fds([0, 2])
1050
1051 def test_close_fds_1_2(self):
1052 self.check_close_std_fds([1, 2])
1053
1054 def test_close_fds_0_1_2(self):
1055 # Issue #10806: test that subprocess pipes still work properly with
1056 # all standard fds closed.
1057 self.check_close_std_fds([0, 1, 2])
1058
1059 def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1060 # open up some temporary files
1061 temps = [mkstemp() for i in range(3)]
1062 temp_fds = [fd for fd, fname in temps]
1063 try:
1064 # unlink the files -- we won't need to reopen them
1065 for fd, fname in temps:
1066 os.unlink(fname)
1067
1068 # save a copy of the standard file descriptors
1069 saved_fds = [os.dup(fd) for fd in range(3)]
1070 try:
1071 # duplicate the temp files over the standard fd's 0, 1, 2
1072 for fd, temp_fd in enumerate(temp_fds):
1073 os.dup2(temp_fd, fd)
1074
1075 # write some data to what will become stdin, and rewind
1076 os.write(stdin_no, b"STDIN")
1077 os.lseek(stdin_no, 0, 0)
1078
1079 # now use those files in the given order, so that subprocess
1080 # has to rearrange them in the child
1081 p = subprocess.Popen([sys.executable, "-c",
1082 'import sys; got = sys.stdin.read();'
1083 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1084 stdin=stdin_no,
1085 stdout=stdout_no,
1086 stderr=stderr_no)
1087 p.wait()
1088
1089 for fd in temp_fds:
1090 os.lseek(fd, 0, 0)
1091
1092 out = os.read(stdout_no, 1024)
1093 err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
1094 finally:
1095 for std, saved in enumerate(saved_fds):
1096 os.dup2(saved, std)
1097 os.close(saved)
1098
1099 self.assertEqual(out, b"got STDIN")
1100 self.assertEqual(err, b"err")
1101
1102 finally:
1103 for fd in temp_fds:
1104 os.close(fd)
1105
1106 # When duping fds, if there arises a situation where one of the fds is
1107 # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1108 # This tests all combinations of this.
1109 def test_swap_fds(self):
1110 self.check_swap_fds(0, 1, 2)
1111 self.check_swap_fds(0, 2, 1)
1112 self.check_swap_fds(1, 0, 2)
1113 self.check_swap_fds(1, 2, 0)
1114 self.check_swap_fds(2, 0, 1)
1115 self.check_swap_fds(2, 1, 0)
1116
1117 def test_wait_when_sigchild_ignored(self):
1118 # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
1119 sigchild_ignore = test_support.findfile("sigchild_ignore.py",
1120 subdir="subprocessdata")
1121 p = subprocess.Popen([sys.executable, sigchild_ignore],
1122 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1123 stdout, stderr = p.communicate()
1124 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
1125 " non-zero with this error:\n%s" % stderr)
1126
1127 def test_zombie_fast_process_del(self):
1128 # Issue #12650: on Unix, if Popen.__del__() was called before the
1129 # process exited, it wouldn't be added to subprocess._active, and would
1130 # remain a zombie.
1131 # spawn a Popen, and delete its reference before it exits
1132 p = subprocess.Popen([sys.executable, "-c",
1133 'import sys, time;'
1134 'time.sleep(0.2)'],
1135 stdout=subprocess.PIPE,
1136 stderr=subprocess.PIPE)
1137 self.addCleanup(p.stdout.close)
1138 self.addCleanup(p.stderr.close)
1139 ident = id(p)
1140 pid = p.pid
1141 del p
1142 # check that p is in the active processes list
1143 self.assertIn(ident, [id(o) for o in subprocess._active])
1144
1145 def test_leak_fast_process_del_killed(self):
1146 # Issue #12650: on Unix, if Popen.__del__() was called before the
1147 # process exited, and the process got killed by a signal, it would never
1148 # be removed from subprocess._active, which triggered a FD and memory
1149 # leak.
1150 # spawn a Popen, delete its reference and kill it
1151 p = subprocess.Popen([sys.executable, "-c",
1152 'import time;'
1153 'time.sleep(3)'],
1154 stdout=subprocess.PIPE,
1155 stderr=subprocess.PIPE)
1156 self.addCleanup(p.stdout.close)
1157 self.addCleanup(p.stderr.close)
1158 ident = id(p)
1159 pid = p.pid
1160 del p
1161 os.kill(pid, signal.SIGKILL)
1162 # check that p is in the active processes list
1163 self.assertIn(ident, [id(o) for o in subprocess._active])
1164
1165 # let some time for the process to exit, and create a new Popen: this
1166 # should trigger the wait() of p
1167 time.sleep(0.2)
1168 with self.assertRaises(EnvironmentError) as c:
1169 with subprocess.Popen(['nonexisting_i_hope'],
1170 stdout=subprocess.PIPE,
1171 stderr=subprocess.PIPE) as proc:
1172 pass
1173 # p should have been wait()ed on, and removed from the _active list
1174 self.assertRaises(OSError, os.waitpid, pid, 0)
1175 self.assertNotIn(ident, [id(o) for o in subprocess._active])
1176
1177 def test_pipe_cloexec(self):
1178 # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
1179 # and are not inherited by another child process.
1180 p1 = subprocess.Popen([sys.executable, "-c",
1181 'import os;'
1182 'os.read(0, 1)'
1183 ],
1184 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1185 stderr=subprocess.PIPE)
1186
1187 p2 = subprocess.Popen([sys.executable, "-c", """if True:
1188 import os, errno, sys
1189 for fd in %r:
1190 try:
1191 os.close(fd)
1192 except OSError as e:
1193 if e.errno != errno.EBADF:
1194 raise
1195 else:
1196 sys.exit(1)
1197 sys.exit(0)
1198 """ % [f.fileno() for f in (p1.stdin, p1.stdout,
1199 p1.stderr)]
1200 ],
1201 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1202 stderr=subprocess.PIPE, close_fds=False)
1203 p1.communicate('foo')
1204 _, stderr = p2.communicate()
1205
1206 self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
1207
1208
1209@unittest.skipUnless(mswindows, "Windows specific tests")
1210class Win32ProcessTestCase(BaseTestCase):
1211
1212 def test_startupinfo(self):
1213 # startupinfo argument
1214 # We uses hardcoded constants, because we do not want to
1215 # depend on win32all.
1216 STARTF_USESHOWWINDOW = 1
1217 SW_MAXIMIZE = 3
1218 startupinfo = subprocess.STARTUPINFO()
1219 startupinfo.dwFlags = STARTF_USESHOWWINDOW
1220 startupinfo.wShowWindow = SW_MAXIMIZE
1221 # Since Python is a console process, it won't be affected
1222 # by wShowWindow, but the argument should be silently
1223 # ignored
1224 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
1225 startupinfo=startupinfo)
1226
1227 def test_creationflags(self):
1228 # creationflags argument
1229 CREATE_NEW_CONSOLE = 16
1230 sys.stderr.write(" a DOS box should flash briefly ...\n")
1231 subprocess.call(sys.executable +
1232 ' -c "import time; time.sleep(0.25)"',
1233 creationflags=CREATE_NEW_CONSOLE)
1234
1235 def test_invalid_args(self):
1236 # invalid arguments should raise ValueError
1237 self.assertRaises(ValueError, subprocess.call,
1238 [sys.executable, "-c",
1239 "import sys; sys.exit(47)"],
1240 preexec_fn=lambda: 1)
1241 self.assertRaises(ValueError, subprocess.call,
1242 [sys.executable, "-c",
1243 "import sys; sys.exit(47)"],
1244 stdout=subprocess.PIPE,
1245 close_fds=True)
1246
1247 def test_close_fds(self):
1248 # close file descriptors
1249 rc = subprocess.call([sys.executable, "-c",
1250 "import sys; sys.exit(47)"],
1251 close_fds=True)
1252 self.assertEqual(rc, 47)
1253
1254 def test_shell_sequence(self):
1255 # Run command through the shell (sequence)
1256 newenv = os.environ.copy()
1257 newenv["FRUIT"] = "physalis"
1258 p = subprocess.Popen(["set"], shell=1,
1259 stdout=subprocess.PIPE,
1260 env=newenv)
1261 self.addCleanup(p.stdout.close)
1262 self.assertIn("physalis", p.stdout.read())
1263
1264 def test_shell_string(self):
1265 # Run command through the shell (string)
1266 newenv = os.environ.copy()
1267 newenv["FRUIT"] = "physalis"
1268 p = subprocess.Popen("set", shell=1,
1269 stdout=subprocess.PIPE,
1270 env=newenv)
1271 self.addCleanup(p.stdout.close)
1272 self.assertIn("physalis", p.stdout.read())
1273
1274 def test_call_string(self):
1275 # call() function with string argument on Windows
1276 rc = subprocess.call(sys.executable +
1277 ' -c "import sys; sys.exit(47)"')
1278 self.assertEqual(rc, 47)
1279
1280 def _kill_process(self, method, *args):
1281 # Some win32 buildbot raises EOFError if stdin is inherited
1282 p = subprocess.Popen([sys.executable, "-c", """if 1:
1283 import sys, time
1284 sys.stdout.write('x\\n')
1285 sys.stdout.flush()
1286 time.sleep(30)
1287 """],
1288 stdin=subprocess.PIPE,
1289 stdout=subprocess.PIPE,
1290 stderr=subprocess.PIPE)
1291 self.addCleanup(p.stdout.close)
1292 self.addCleanup(p.stderr.close)
1293 self.addCleanup(p.stdin.close)
1294 # Wait for the interpreter to be completely initialized before
1295 # sending any signal.
1296 p.stdout.read(1)
1297 getattr(p, method)(*args)
1298 _, stderr = p.communicate()
1299 self.assertStderrEqual(stderr, '')
1300 returncode = p.wait()
1301 self.assertNotEqual(returncode, 0)
1302
1303 def _kill_dead_process(self, method, *args):
1304 p = subprocess.Popen([sys.executable, "-c", """if 1:
1305 import sys, time
1306 sys.stdout.write('x\\n')
1307 sys.stdout.flush()
1308 sys.exit(42)
1309 """],
1310 stdin=subprocess.PIPE,
1311 stdout=subprocess.PIPE,
1312 stderr=subprocess.PIPE)
1313 self.addCleanup(p.stdout.close)
1314 self.addCleanup(p.stderr.close)
1315 self.addCleanup(p.stdin.close)
1316 # Wait for the interpreter to be completely initialized before
1317 # sending any signal.
1318 p.stdout.read(1)
1319 # The process should end after this
1320 time.sleep(1)
1321 # This shouldn't raise even though the child is now dead
1322 getattr(p, method)(*args)
1323 _, stderr = p.communicate()
1324 self.assertStderrEqual(stderr, b'')
1325 rc = p.wait()
1326 self.assertEqual(rc, 42)
1327
1328 def test_send_signal(self):
1329 self._kill_process('send_signal', signal.SIGTERM)
1330
1331 def test_kill(self):
1332 self._kill_process('kill')
1333
1334 def test_terminate(self):
1335 self._kill_process('terminate')
1336
1337 def test_send_signal_dead(self):
1338 self._kill_dead_process('send_signal', signal.SIGTERM)
1339
1340 def test_kill_dead(self):
1341 self._kill_dead_process('kill')
1342
1343 def test_terminate_dead(self):
1344 self._kill_dead_process('terminate')
1345
1346
1347@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
1348 "poll system call not supported")
1349class ProcessTestCaseNoPoll(ProcessTestCase):
1350 def setUp(self):
1351 subprocess._has_poll = False
1352 ProcessTestCase.setUp(self)
1353
1354 def tearDown(self):
1355 subprocess._has_poll = True
1356 ProcessTestCase.tearDown(self)
1357
1358
1359class HelperFunctionTests(unittest.TestCase):
1360 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
1361 def test_eintr_retry_call(self):
1362 record_calls = []
1363 def fake_os_func(*args):
1364 record_calls.append(args)
1365 if len(record_calls) == 2:
1366 raise OSError(errno.EINTR, "fake interrupted system call")
1367 return tuple(reversed(args))
1368
1369 self.assertEqual((999, 256),
1370 subprocess._eintr_retry_call(fake_os_func, 256, 999))
1371 self.assertEqual([(256, 999)], record_calls)
1372 # This time there will be an EINTR so it will loop once.
1373 self.assertEqual((666,),
1374 subprocess._eintr_retry_call(fake_os_func, 666))
1375 self.assertEqual([(256, 999), (666,), (666,)], record_calls)
1376
1377@unittest.skipUnless(mswindows, "mswindows only")
1378class CommandsWithSpaces (BaseTestCase):
1379
1380 def setUp(self):
1381 super(CommandsWithSpaces, self).setUp()
1382 f, fname = mkstemp(".py", "te st")
1383 self.fname = fname.lower ()
1384 os.write(f, b"import sys;"
1385 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
1386 )
1387 os.close(f)
1388
1389 def tearDown(self):
1390 os.remove(self.fname)
1391 super(CommandsWithSpaces, self).tearDown()
1392
1393 def with_spaces(self, *args, **kwargs):
1394 kwargs['stdout'] = subprocess.PIPE
1395 p = subprocess.Popen(*args, **kwargs)
1396 self.addCleanup(p.stdout.close)
1397 self.assertEqual(
1398 p.stdout.read ().decode("mbcs"),
1399 "2 [%r, 'ab cd']" % self.fname
1400 )
1401
1402 def test_shell_string_with_spaces(self):
1403 # call() function with string argument with spaces on Windows
1404 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1405 "ab cd"), shell=1)
1406
1407 def test_shell_sequence_with_spaces(self):
1408 # call() function with sequence argument with spaces on Windows
1409 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
1410
1411 def test_noshell_string_with_spaces(self):
1412 # call() function with string argument with spaces on Windows
1413 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1414 "ab cd"))
1415
1416 def test_noshell_sequence_with_spaces(self):
1417 # call() function with sequence argument with spaces on Windows
1418 self.with_spaces([sys.executable, self.fname, "ab cd"])
1419
1420def test_main():
1421 unit_tests = (ProcessTestCase,
1422 POSIXProcessTestCase,
1423 Win32ProcessTestCase,
1424 ProcessTestCaseNoPoll,
1425 HelperFunctionTests,
1426 CommandsWithSpaces)
1427
1428 test_support.run_unittest(*unit_tests)
1429 test_support.reap_children()
1430
1431if __name__ == "__main__":
1432 test_main()
Note: See TracBrowser for help on using the repository browser.