1 | # Common utility functions used by various script execution tests
|
---|
2 | # e.g. test_cmd_line, test_cmd_line_script and test_runpy
|
---|
3 |
|
---|
4 | import sys
|
---|
5 | import os
|
---|
6 | import re
|
---|
7 | import os.path
|
---|
8 | import tempfile
|
---|
9 | import subprocess
|
---|
10 | import py_compile
|
---|
11 | import contextlib
|
---|
12 | import shutil
|
---|
13 | try:
|
---|
14 | import zipfile
|
---|
15 | except ImportError:
|
---|
16 | # If Python is build without Unicode support, importing _io will
|
---|
17 | # fail, which, in turn, means that zipfile cannot be imported
|
---|
18 | # Most of this module can then still be used.
|
---|
19 | pass
|
---|
20 |
|
---|
21 | from test.test_support import strip_python_stderr
|
---|
22 |
|
---|
23 | # Executing the interpreter in a subprocess
|
---|
24 | def _assert_python(expected_success, *args, **env_vars):
|
---|
25 | cmd_line = [sys.executable]
|
---|
26 | if not env_vars:
|
---|
27 | cmd_line.append('-E')
|
---|
28 | cmd_line.extend(args)
|
---|
29 | # Need to preserve the original environment, for in-place testing of
|
---|
30 | # shared library builds.
|
---|
31 | env = os.environ.copy()
|
---|
32 | env.update(env_vars)
|
---|
33 | p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
|
---|
34 | stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
---|
35 | env=env)
|
---|
36 | try:
|
---|
37 | out, err = p.communicate()
|
---|
38 | finally:
|
---|
39 | subprocess._cleanup()
|
---|
40 | p.stdout.close()
|
---|
41 | p.stderr.close()
|
---|
42 | rc = p.returncode
|
---|
43 | err = strip_python_stderr(err)
|
---|
44 | if (rc and expected_success) or (not rc and not expected_success):
|
---|
45 | raise AssertionError(
|
---|
46 | "Process return code is %d, "
|
---|
47 | "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
|
---|
48 | return rc, out, err
|
---|
49 |
|
---|
50 | def assert_python_ok(*args, **env_vars):
|
---|
51 | """
|
---|
52 | Assert that running the interpreter with `args` and optional environment
|
---|
53 | variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
|
---|
54 | """
|
---|
55 | return _assert_python(True, *args, **env_vars)
|
---|
56 |
|
---|
57 | def assert_python_failure(*args, **env_vars):
|
---|
58 | """
|
---|
59 | Assert that running the interpreter with `args` and optional environment
|
---|
60 | variables `env_vars` fails and return a (return code, stdout, stderr) tuple.
|
---|
61 | """
|
---|
62 | return _assert_python(False, *args, **env_vars)
|
---|
63 |
|
---|
64 | def python_exit_code(*args):
|
---|
65 | cmd_line = [sys.executable, '-E']
|
---|
66 | cmd_line.extend(args)
|
---|
67 | with open(os.devnull, 'w') as devnull:
|
---|
68 | return subprocess.call(cmd_line, stdout=devnull,
|
---|
69 | stderr=subprocess.STDOUT)
|
---|
70 |
|
---|
71 | def spawn_python(*args, **kwargs):
|
---|
72 | cmd_line = [sys.executable, '-E']
|
---|
73 | cmd_line.extend(args)
|
---|
74 | return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
|
---|
75 | stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
---|
76 | **kwargs)
|
---|
77 |
|
---|
78 | def kill_python(p):
|
---|
79 | p.stdin.close()
|
---|
80 | data = p.stdout.read()
|
---|
81 | p.stdout.close()
|
---|
82 | # try to cleanup the child so we don't appear to leak when running
|
---|
83 | # with regrtest -R.
|
---|
84 | p.wait()
|
---|
85 | subprocess._cleanup()
|
---|
86 | return data
|
---|
87 |
|
---|
88 | def run_python(*args, **kwargs):
|
---|
89 | if __debug__:
|
---|
90 | p = spawn_python(*args, **kwargs)
|
---|
91 | else:
|
---|
92 | p = spawn_python('-O', *args, **kwargs)
|
---|
93 | stdout_data = kill_python(p)
|
---|
94 | return p.wait(), stdout_data
|
---|
95 |
|
---|
96 | # Script creation utilities
|
---|
97 | @contextlib.contextmanager
|
---|
98 | def temp_dir():
|
---|
99 | dirname = tempfile.mkdtemp()
|
---|
100 | dirname = os.path.realpath(dirname)
|
---|
101 | try:
|
---|
102 | yield dirname
|
---|
103 | finally:
|
---|
104 | shutil.rmtree(dirname)
|
---|
105 |
|
---|
106 | def make_script(script_dir, script_basename, source):
|
---|
107 | script_filename = script_basename+os.extsep+'py'
|
---|
108 | script_name = os.path.join(script_dir, script_filename)
|
---|
109 | script_file = open(script_name, 'w')
|
---|
110 | script_file.write(source)
|
---|
111 | script_file.close()
|
---|
112 | return script_name
|
---|
113 |
|
---|
114 | def compile_script(script_name):
|
---|
115 | py_compile.compile(script_name, doraise=True)
|
---|
116 | if __debug__:
|
---|
117 | compiled_name = script_name + 'c'
|
---|
118 | else:
|
---|
119 | compiled_name = script_name + 'o'
|
---|
120 | return compiled_name
|
---|
121 |
|
---|
122 | def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
|
---|
123 | zip_filename = zip_basename+os.extsep+'zip'
|
---|
124 | zip_name = os.path.join(zip_dir, zip_filename)
|
---|
125 | zip_file = zipfile.ZipFile(zip_name, 'w')
|
---|
126 | if name_in_zip is None:
|
---|
127 | name_in_zip = os.path.basename(script_name)
|
---|
128 | zip_file.write(script_name, name_in_zip)
|
---|
129 | zip_file.close()
|
---|
130 | #if test.test_support.verbose:
|
---|
131 | # zip_file = zipfile.ZipFile(zip_name, 'r')
|
---|
132 | # print 'Contents of %r:' % zip_name
|
---|
133 | # zip_file.printdir()
|
---|
134 | # zip_file.close()
|
---|
135 | return zip_name, os.path.join(zip_name, name_in_zip)
|
---|
136 |
|
---|
137 | def make_pkg(pkg_dir):
|
---|
138 | os.mkdir(pkg_dir)
|
---|
139 | make_script(pkg_dir, '__init__', '')
|
---|
140 |
|
---|
141 | def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
|
---|
142 | source, depth=1, compiled=False):
|
---|
143 | unlink = []
|
---|
144 | init_name = make_script(zip_dir, '__init__', '')
|
---|
145 | unlink.append(init_name)
|
---|
146 | init_basename = os.path.basename(init_name)
|
---|
147 | script_name = make_script(zip_dir, script_basename, source)
|
---|
148 | unlink.append(script_name)
|
---|
149 | if compiled:
|
---|
150 | init_name = compile_script(init_name)
|
---|
151 | script_name = compile_script(script_name)
|
---|
152 | unlink.extend((init_name, script_name))
|
---|
153 | pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
|
---|
154 | script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
|
---|
155 | zip_filename = zip_basename+os.extsep+'zip'
|
---|
156 | zip_name = os.path.join(zip_dir, zip_filename)
|
---|
157 | zip_file = zipfile.ZipFile(zip_name, 'w')
|
---|
158 | for name in pkg_names:
|
---|
159 | init_name_in_zip = os.path.join(name, init_basename)
|
---|
160 | zip_file.write(init_name, init_name_in_zip)
|
---|
161 | zip_file.write(script_name, script_name_in_zip)
|
---|
162 | zip_file.close()
|
---|
163 | for name in unlink:
|
---|
164 | os.unlink(name)
|
---|
165 | #if test.test_support.verbose:
|
---|
166 | # zip_file = zipfile.ZipFile(zip_name, 'r')
|
---|
167 | # print 'Contents of %r:' % zip_name
|
---|
168 | # zip_file.printdir()
|
---|
169 | # zip_file.close()
|
---|
170 | return zip_name, os.path.join(zip_name, script_name_in_zip)
|
---|