1 | from test.test_support import verbose, run_unittest, import_module
|
---|
2 |
|
---|
3 | #Skip these tests if either fcntl or termios is not available
|
---|
4 | fcntl = import_module('fcntl')
|
---|
5 | import_module('termios')
|
---|
6 |
|
---|
7 | import errno
|
---|
8 | import pty
|
---|
9 | import os
|
---|
10 | import sys
|
---|
11 | import select
|
---|
12 | import signal
|
---|
13 | import socket
|
---|
14 | import unittest
|
---|
15 |
|
---|
16 | TEST_STRING_1 = "I wish to buy a fish license.\n"
|
---|
17 | TEST_STRING_2 = "For my pet fish, Eric.\n"
|
---|
18 |
|
---|
19 | if verbose:
|
---|
20 | def debug(msg):
|
---|
21 | print msg
|
---|
22 | else:
|
---|
23 | def debug(msg):
|
---|
24 | pass
|
---|
25 |
|
---|
26 |
|
---|
27 | def normalize_output(data):
|
---|
28 | # Some operating systems do conversions on newline. We could possibly
|
---|
29 | # fix that by doing the appropriate termios.tcsetattr()s. I couldn't
|
---|
30 | # figure out the right combo on Tru64 and I don't have an IRIX box.
|
---|
31 | # So just normalize the output and doc the problem O/Ses by allowing
|
---|
32 | # certain combinations for some platforms, but avoid allowing other
|
---|
33 | # differences (like extra whitespace, trailing garbage, etc.)
|
---|
34 |
|
---|
35 | # This is about the best we can do without getting some feedback
|
---|
36 | # from someone more knowledgable.
|
---|
37 |
|
---|
38 | # OSF/1 (Tru64) apparently turns \n into \r\r\n.
|
---|
39 | if data.endswith('\r\r\n'):
|
---|
40 | return data.replace('\r\r\n', '\n')
|
---|
41 |
|
---|
42 | # IRIX apparently turns \n into \r\n.
|
---|
43 | if data.endswith('\r\n'):
|
---|
44 | return data.replace('\r\n', '\n')
|
---|
45 |
|
---|
46 | return data
|
---|
47 |
|
---|
48 |
|
---|
49 | # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
|
---|
50 | # because pty code is not too portable.
|
---|
51 | # XXX(nnorwitz): these tests leak fds when there is an error.
|
---|
52 | class PtyTest(unittest.TestCase):
|
---|
53 | def setUp(self):
|
---|
54 | # isatty() and close() can hang on some platforms. Set an alarm
|
---|
55 | # before running the test to make sure we don't hang forever.
|
---|
56 | self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
|
---|
57 | signal.alarm(10)
|
---|
58 |
|
---|
59 | def tearDown(self):
|
---|
60 | # remove alarm, restore old alarm handler
|
---|
61 | signal.alarm(0)
|
---|
62 | signal.signal(signal.SIGALRM, self.old_alarm)
|
---|
63 |
|
---|
64 | def handle_sig(self, sig, frame):
|
---|
65 | self.fail("isatty hung")
|
---|
66 |
|
---|
67 | def test_basic(self):
|
---|
68 | try:
|
---|
69 | debug("Calling master_open()")
|
---|
70 | master_fd, slave_name = pty.master_open()
|
---|
71 | debug("Got master_fd '%d', slave_name '%s'" %
|
---|
72 | (master_fd, slave_name))
|
---|
73 | debug("Calling slave_open(%r)" % (slave_name,))
|
---|
74 | slave_fd = pty.slave_open(slave_name)
|
---|
75 | debug("Got slave_fd '%d'" % slave_fd)
|
---|
76 | except OSError:
|
---|
77 | # " An optional feature could not be imported " ... ?
|
---|
78 | raise unittest.SkipTest, "Pseudo-terminals (seemingly) not functional."
|
---|
79 |
|
---|
80 | self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
|
---|
81 |
|
---|
82 | # Solaris requires reading the fd before anything is returned.
|
---|
83 | # My guess is that since we open and close the slave fd
|
---|
84 | # in master_open(), we need to read the EOF.
|
---|
85 |
|
---|
86 | # Ensure the fd is non-blocking in case there's nothing to read.
|
---|
87 | orig_flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
|
---|
88 | fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags | os.O_NONBLOCK)
|
---|
89 | try:
|
---|
90 | s1 = os.read(master_fd, 1024)
|
---|
91 | self.assertEqual('', s1)
|
---|
92 | except OSError, e:
|
---|
93 | if e.errno != errno.EAGAIN:
|
---|
94 | raise
|
---|
95 | # Restore the original flags.
|
---|
96 | fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags)
|
---|
97 |
|
---|
98 | debug("Writing to slave_fd")
|
---|
99 | os.write(slave_fd, TEST_STRING_1)
|
---|
100 | s1 = os.read(master_fd, 1024)
|
---|
101 | self.assertEqual('I wish to buy a fish license.\n',
|
---|
102 | normalize_output(s1))
|
---|
103 |
|
---|
104 | debug("Writing chunked output")
|
---|
105 | os.write(slave_fd, TEST_STRING_2[:5])
|
---|
106 | os.write(slave_fd, TEST_STRING_2[5:])
|
---|
107 | s2 = os.read(master_fd, 1024)
|
---|
108 | self.assertEqual('For my pet fish, Eric.\n', normalize_output(s2))
|
---|
109 |
|
---|
110 | os.close(slave_fd)
|
---|
111 | os.close(master_fd)
|
---|
112 |
|
---|
113 |
|
---|
114 | def test_fork(self):
|
---|
115 | debug("calling pty.fork()")
|
---|
116 | pid, master_fd = pty.fork()
|
---|
117 | if pid == pty.CHILD:
|
---|
118 | # stdout should be connected to a tty.
|
---|
119 | if not os.isatty(1):
|
---|
120 | debug("Child's fd 1 is not a tty?!")
|
---|
121 | os._exit(3)
|
---|
122 |
|
---|
123 | # After pty.fork(), the child should already be a session leader.
|
---|
124 | # (on those systems that have that concept.)
|
---|
125 | debug("In child, calling os.setsid()")
|
---|
126 | try:
|
---|
127 | os.setsid()
|
---|
128 | except OSError:
|
---|
129 | # Good, we already were session leader
|
---|
130 | debug("Good: OSError was raised.")
|
---|
131 | pass
|
---|
132 | except AttributeError:
|
---|
133 | # Have pty, but not setsid()?
|
---|
134 | debug("No setsid() available?")
|
---|
135 | pass
|
---|
136 | except:
|
---|
137 | # We don't want this error to propagate, escaping the call to
|
---|
138 | # os._exit() and causing very peculiar behavior in the calling
|
---|
139 | # regrtest.py !
|
---|
140 | # Note: could add traceback printing here.
|
---|
141 | debug("An unexpected error was raised.")
|
---|
142 | os._exit(1)
|
---|
143 | else:
|
---|
144 | debug("os.setsid() succeeded! (bad!)")
|
---|
145 | os._exit(2)
|
---|
146 | os._exit(4)
|
---|
147 | else:
|
---|
148 | debug("Waiting for child (%d) to finish." % pid)
|
---|
149 | # In verbose mode, we have to consume the debug output from the
|
---|
150 | # child or the child will block, causing this test to hang in the
|
---|
151 | # parent's waitpid() call. The child blocks after a
|
---|
152 | # platform-dependent amount of data is written to its fd. On
|
---|
153 | # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
|
---|
154 | # X even the small writes in the child above will block it. Also
|
---|
155 | # on Linux, the read() will raise an OSError (input/output error)
|
---|
156 | # when it tries to read past the end of the buffer but the child's
|
---|
157 | # already exited, so catch and discard those exceptions. It's not
|
---|
158 | # worth checking for EIO.
|
---|
159 | while True:
|
---|
160 | try:
|
---|
161 | data = os.read(master_fd, 80)
|
---|
162 | except OSError:
|
---|
163 | break
|
---|
164 | if not data:
|
---|
165 | break
|
---|
166 | sys.stdout.write(data.replace('\r\n', '\n'))
|
---|
167 |
|
---|
168 | ##line = os.read(master_fd, 80)
|
---|
169 | ##lines = line.replace('\r\n', '\n').split('\n')
|
---|
170 | ##if False and lines != ['In child, calling os.setsid()',
|
---|
171 | ## 'Good: OSError was raised.', '']:
|
---|
172 | ## raise TestFailed("Unexpected output from child: %r" % line)
|
---|
173 |
|
---|
174 | (pid, status) = os.waitpid(pid, 0)
|
---|
175 | res = status >> 8
|
---|
176 | debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
|
---|
177 | if res == 1:
|
---|
178 | self.fail("Child raised an unexpected exception in os.setsid()")
|
---|
179 | elif res == 2:
|
---|
180 | self.fail("pty.fork() failed to make child a session leader.")
|
---|
181 | elif res == 3:
|
---|
182 | self.fail("Child spawned by pty.fork() did not have a tty as stdout")
|
---|
183 | elif res != 4:
|
---|
184 | self.fail("pty.fork() failed for unknown reasons.")
|
---|
185 |
|
---|
186 | ##debug("Reading from master_fd now that the child has exited")
|
---|
187 | ##try:
|
---|
188 | ## s1 = os.read(master_fd, 1024)
|
---|
189 | ##except os.error:
|
---|
190 | ## pass
|
---|
191 | ##else:
|
---|
192 | ## raise TestFailed("Read from master_fd did not raise exception")
|
---|
193 |
|
---|
194 | os.close(master_fd)
|
---|
195 |
|
---|
196 | # pty.fork() passed.
|
---|
197 |
|
---|
198 |
|
---|
199 | class SmallPtyTests(unittest.TestCase):
|
---|
200 | """These tests don't spawn children or hang."""
|
---|
201 |
|
---|
202 | def setUp(self):
|
---|
203 | self.orig_stdin_fileno = pty.STDIN_FILENO
|
---|
204 | self.orig_stdout_fileno = pty.STDOUT_FILENO
|
---|
205 | self.orig_pty_select = pty.select
|
---|
206 | self.fds = [] # A list of file descriptors to close.
|
---|
207 | self.select_rfds_lengths = []
|
---|
208 | self.select_rfds_results = []
|
---|
209 |
|
---|
210 | def tearDown(self):
|
---|
211 | pty.STDIN_FILENO = self.orig_stdin_fileno
|
---|
212 | pty.STDOUT_FILENO = self.orig_stdout_fileno
|
---|
213 | pty.select = self.orig_pty_select
|
---|
214 | for fd in self.fds:
|
---|
215 | try:
|
---|
216 | os.close(fd)
|
---|
217 | except:
|
---|
218 | pass
|
---|
219 |
|
---|
220 | def _pipe(self):
|
---|
221 | pipe_fds = os.pipe()
|
---|
222 | self.fds.extend(pipe_fds)
|
---|
223 | return pipe_fds
|
---|
224 |
|
---|
225 | def _mock_select(self, rfds, wfds, xfds):
|
---|
226 | # This will raise IndexError when no more expected calls exist.
|
---|
227 | self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
|
---|
228 | return self.select_rfds_results.pop(0), [], []
|
---|
229 |
|
---|
230 | def test__copy_to_each(self):
|
---|
231 | """Test the normal data case on both master_fd and stdin."""
|
---|
232 | read_from_stdout_fd, mock_stdout_fd = self._pipe()
|
---|
233 | pty.STDOUT_FILENO = mock_stdout_fd
|
---|
234 | mock_stdin_fd, write_to_stdin_fd = self._pipe()
|
---|
235 | pty.STDIN_FILENO = mock_stdin_fd
|
---|
236 | socketpair = socket.socketpair()
|
---|
237 | masters = [s.fileno() for s in socketpair]
|
---|
238 | self.fds.extend(masters)
|
---|
239 |
|
---|
240 | # Feed data. Smaller than PIPEBUF. These writes will not block.
|
---|
241 | os.write(masters[1], b'from master')
|
---|
242 | os.write(write_to_stdin_fd, b'from stdin')
|
---|
243 |
|
---|
244 | # Expect two select calls, the last one will cause IndexError
|
---|
245 | pty.select = self._mock_select
|
---|
246 | self.select_rfds_lengths.append(2)
|
---|
247 | self.select_rfds_results.append([mock_stdin_fd, masters[0]])
|
---|
248 | self.select_rfds_lengths.append(2)
|
---|
249 |
|
---|
250 | with self.assertRaises(IndexError):
|
---|
251 | pty._copy(masters[0])
|
---|
252 |
|
---|
253 | # Test that the right data went to the right places.
|
---|
254 | rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
|
---|
255 | self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
|
---|
256 | self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
|
---|
257 | self.assertEqual(os.read(masters[1], 20), b'from stdin')
|
---|
258 |
|
---|
259 | def test__copy_eof_on_all(self):
|
---|
260 | """Test the empty read EOF case on both master_fd and stdin."""
|
---|
261 | read_from_stdout_fd, mock_stdout_fd = self._pipe()
|
---|
262 | pty.STDOUT_FILENO = mock_stdout_fd
|
---|
263 | mock_stdin_fd, write_to_stdin_fd = self._pipe()
|
---|
264 | pty.STDIN_FILENO = mock_stdin_fd
|
---|
265 | socketpair = socket.socketpair()
|
---|
266 | masters = [s.fileno() for s in socketpair]
|
---|
267 | self.fds.extend(masters)
|
---|
268 |
|
---|
269 | os.close(masters[1])
|
---|
270 | socketpair[1].close()
|
---|
271 | os.close(write_to_stdin_fd)
|
---|
272 |
|
---|
273 | # Expect two select calls, the last one will cause IndexError
|
---|
274 | pty.select = self._mock_select
|
---|
275 | self.select_rfds_lengths.append(2)
|
---|
276 | self.select_rfds_results.append([mock_stdin_fd, masters[0]])
|
---|
277 | # We expect that both fds were removed from the fds list as they
|
---|
278 | # both encountered an EOF before the second select call.
|
---|
279 | self.select_rfds_lengths.append(0)
|
---|
280 |
|
---|
281 | with self.assertRaises(IndexError):
|
---|
282 | pty._copy(masters[0])
|
---|
283 |
|
---|
284 |
|
---|
285 | def test_main(verbose=None):
|
---|
286 | run_unittest(SmallPtyTests, PtyTest)
|
---|
287 |
|
---|
288 | if __name__ == "__main__":
|
---|
289 | test_main()
|
---|