1 | import array
|
---|
2 | import unittest
|
---|
3 | from test.test_support import run_unittest, import_module, get_attribute
|
---|
4 | import os, struct
|
---|
5 | fcntl = import_module('fcntl')
|
---|
6 | termios = import_module('termios')
|
---|
7 | get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature
|
---|
8 |
|
---|
9 | try:
|
---|
10 | tty = open("/dev/tty", "r")
|
---|
11 | except IOError:
|
---|
12 | raise unittest.SkipTest("Unable to open /dev/tty")
|
---|
13 | else:
|
---|
14 | # Skip if another process is in foreground
|
---|
15 | r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
|
---|
16 | tty.close()
|
---|
17 | rpgrp = struct.unpack("i", r)[0]
|
---|
18 | if rpgrp not in (os.getpgrp(), os.getsid(0)):
|
---|
19 | raise unittest.SkipTest("Neither the process group nor the session "
|
---|
20 | "are attached to /dev/tty")
|
---|
21 | del tty, r, rpgrp
|
---|
22 |
|
---|
23 | try:
|
---|
24 | import pty
|
---|
25 | except ImportError:
|
---|
26 | pty = None
|
---|
27 |
|
---|
28 | class IoctlTests(unittest.TestCase):
|
---|
29 | def test_ioctl(self):
|
---|
30 | # If this process has been put into the background, TIOCGPGRP returns
|
---|
31 | # the session ID instead of the process group id.
|
---|
32 | ids = (os.getpgrp(), os.getsid(0))
|
---|
33 | tty = open("/dev/tty", "r")
|
---|
34 | r = fcntl.ioctl(tty, termios.TIOCGPGRP, " ")
|
---|
35 | rpgrp = struct.unpack("i", r)[0]
|
---|
36 | self.assertIn(rpgrp, ids)
|
---|
37 |
|
---|
38 | def _check_ioctl_mutate_len(self, nbytes=None):
|
---|
39 | buf = array.array('i')
|
---|
40 | intsize = buf.itemsize
|
---|
41 | ids = (os.getpgrp(), os.getsid(0))
|
---|
42 | # A fill value unlikely to be in `ids`
|
---|
43 | fill = -12345
|
---|
44 | if nbytes is not None:
|
---|
45 | # Extend the buffer so that it is exactly `nbytes` bytes long
|
---|
46 | buf.extend([fill] * (nbytes // intsize))
|
---|
47 | self.assertEqual(len(buf) * intsize, nbytes) # sanity check
|
---|
48 | else:
|
---|
49 | buf.append(fill)
|
---|
50 | with open("/dev/tty", "r") as tty:
|
---|
51 | r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
|
---|
52 | rpgrp = buf[0]
|
---|
53 | self.assertEqual(r, 0)
|
---|
54 | self.assertIn(rpgrp, ids)
|
---|
55 |
|
---|
56 | def test_ioctl_mutate(self):
|
---|
57 | self._check_ioctl_mutate_len()
|
---|
58 |
|
---|
59 | def test_ioctl_mutate_1024(self):
|
---|
60 | # Issue #9758: a mutable buffer of exactly 1024 bytes wouldn't be
|
---|
61 | # copied back after the system call.
|
---|
62 | self._check_ioctl_mutate_len(1024)
|
---|
63 |
|
---|
64 | def test_ioctl_mutate_2048(self):
|
---|
65 | # Test with a larger buffer, just for the record.
|
---|
66 | self._check_ioctl_mutate_len(2048)
|
---|
67 |
|
---|
68 | def test_ioctl_signed_unsigned_code_param(self):
|
---|
69 | if not pty:
|
---|
70 | raise unittest.SkipTest('pty module required')
|
---|
71 | mfd, sfd = pty.openpty()
|
---|
72 | try:
|
---|
73 | if termios.TIOCSWINSZ < 0:
|
---|
74 | set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
|
---|
75 | set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL
|
---|
76 | else:
|
---|
77 | set_winsz_opcode_pos = termios.TIOCSWINSZ
|
---|
78 | set_winsz_opcode_maybe_neg, = struct.unpack("i",
|
---|
79 | struct.pack("I", termios.TIOCSWINSZ))
|
---|
80 |
|
---|
81 | our_winsz = struct.pack("HHHH",80,25,0,0)
|
---|
82 | # test both with a positive and potentially negative ioctl code
|
---|
83 | new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
|
---|
84 | new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
|
---|
85 | finally:
|
---|
86 | os.close(mfd)
|
---|
87 | os.close(sfd)
|
---|
88 |
|
---|
89 | def test_main():
|
---|
90 | run_unittest(IoctlTests)
|
---|
91 |
|
---|
92 | if __name__ == "__main__":
|
---|
93 | test_main()
|
---|