Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_pty.py

    r2 r391  
     1from test.test_support import verbose, run_unittest, import_module
     2
     3#Skip these tests if either fcntl or termios is not available
     4fcntl = import_module('fcntl')
     5import_module('termios')
     6
    17import errno
    2 import fcntl
    38import pty
    49import os
    510import sys
     11import select
    612import signal
    7 from test.test_support import verbose, TestSkipped, run_unittest
     13import socket
    814import unittest
    915
     
    7076        except OSError:
    7177            # " An optional feature could not be imported " ... ?
    72             raise TestSkipped, "Pseudo-terminals (seemingly) not functional."
     78            raise unittest.SkipTest, "Pseudo-terminals (seemingly) not functional."
    7379
    7480        self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
     
    8389        try:
    8490            s1 = os.read(master_fd, 1024)
    85             self.assertEquals('', s1)
     91            self.assertEqual('', s1)
    8692        except OSError, e:
    8793            if e.errno != errno.EAGAIN:
     
    9399        os.write(slave_fd, TEST_STRING_1)
    94100        s1 = os.read(master_fd, 1024)
    95         self.assertEquals('I wish to buy a fish license.\n',
    96                           normalize_output(s1))
     101        self.assertEqual('I wish to buy a fish license.\n',
     102                         normalize_output(s1))
    97103
    98104        debug("Writing chunked output")
     
    100106        os.write(slave_fd, TEST_STRING_2[5:])
    101107        s2 = os.read(master_fd, 1024)
    102         self.assertEquals('For my pet fish, Eric.\n', normalize_output(s2))
     108        self.assertEqual('For my pet fish, Eric.\n', normalize_output(s2))
    103109
    104110        os.close(slave_fd)
     
    147153            # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
    148154            # X even the small writes in the child above will block it.  Also
    149             # on Linux, the read() will throw an OSError (input/output error)
     155            # on Linux, the read() will raise an OSError (input/output error)
    150156            # when it tries to read past the end of the buffer but the child's
    151157            # already exited, so catch and discard those exceptions.  It's not
     
    190196        # pty.fork() passed.
    191197
     198
     199class SmallPtyTests(unittest.TestCase):
     200    """These tests don't spawn children or hang."""
     201
     202    def setUp(self):
     203        self.orig_stdin_fileno = pty.STDIN_FILENO
     204        self.orig_stdout_fileno = pty.STDOUT_FILENO
     205        self.orig_pty_select = pty.select
     206        self.fds = []  # A list of file descriptors to close.
     207        self.select_rfds_lengths = []
     208        self.select_rfds_results = []
     209
     210    def tearDown(self):
     211        pty.STDIN_FILENO = self.orig_stdin_fileno
     212        pty.STDOUT_FILENO = self.orig_stdout_fileno
     213        pty.select = self.orig_pty_select
     214        for fd in self.fds:
     215            try:
     216                os.close(fd)
     217            except:
     218                pass
     219
     220    def _pipe(self):
     221        pipe_fds = os.pipe()
     222        self.fds.extend(pipe_fds)
     223        return pipe_fds
     224
     225    def _mock_select(self, rfds, wfds, xfds):
     226        # This will raise IndexError when no more expected calls exist.
     227        self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
     228        return self.select_rfds_results.pop(0), [], []
     229
     230    def test__copy_to_each(self):
     231        """Test the normal data case on both master_fd and stdin."""
     232        read_from_stdout_fd, mock_stdout_fd = self._pipe()
     233        pty.STDOUT_FILENO = mock_stdout_fd
     234        mock_stdin_fd, write_to_stdin_fd = self._pipe()
     235        pty.STDIN_FILENO = mock_stdin_fd
     236        socketpair = socket.socketpair()
     237        masters = [s.fileno() for s in socketpair]
     238        self.fds.extend(masters)
     239
     240        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
     241        os.write(masters[1], b'from master')
     242        os.write(write_to_stdin_fd, b'from stdin')
     243
     244        # Expect two select calls, the last one will cause IndexError
     245        pty.select = self._mock_select
     246        self.select_rfds_lengths.append(2)
     247        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
     248        self.select_rfds_lengths.append(2)
     249
     250        with self.assertRaises(IndexError):
     251            pty._copy(masters[0])
     252
     253        # Test that the right data went to the right places.
     254        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
     255        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
     256        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
     257        self.assertEqual(os.read(masters[1], 20), b'from stdin')
     258
     259    def test__copy_eof_on_all(self):
     260        """Test the empty read EOF case on both master_fd and stdin."""
     261        read_from_stdout_fd, mock_stdout_fd = self._pipe()
     262        pty.STDOUT_FILENO = mock_stdout_fd
     263        mock_stdin_fd, write_to_stdin_fd = self._pipe()
     264        pty.STDIN_FILENO = mock_stdin_fd
     265        socketpair = socket.socketpair()
     266        masters = [s.fileno() for s in socketpair]
     267        self.fds.extend(masters)
     268
     269        os.close(masters[1])
     270        socketpair[1].close()
     271        os.close(write_to_stdin_fd)
     272
     273        # Expect two select calls, the last one will cause IndexError
     274        pty.select = self._mock_select
     275        self.select_rfds_lengths.append(2)
     276        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
     277        # We expect that both fds were removed from the fds list as they
     278        # both encountered an EOF before the second select call.
     279        self.select_rfds_lengths.append(0)
     280
     281        with self.assertRaises(IndexError):
     282            pty._copy(masters[0])
     283
     284
    192285def test_main(verbose=None):
    193     run_unittest(PtyTest)
     286    run_unittest(SmallPtyTests, PtyTest)
    194287
    195288if __name__ == "__main__":
Note: See TracChangeset for help on using the changeset viewer.