1 | # Test case for the os.poll() function
|
---|
2 |
|
---|
3 | import os
|
---|
4 | import random
|
---|
5 | import select
|
---|
6 | import _testcapi
|
---|
7 | try:
|
---|
8 | import threading
|
---|
9 | except ImportError:
|
---|
10 | threading = None
|
---|
11 | import time
|
---|
12 | import unittest
|
---|
13 | from test.test_support import TESTFN, run_unittest, reap_threads
|
---|
14 |
|
---|
15 | try:
|
---|
16 | select.poll
|
---|
17 | except AttributeError:
|
---|
18 | raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
|
---|
19 |
|
---|
20 |
|
---|
21 | def find_ready_matching(ready, flag):
|
---|
22 | match = []
|
---|
23 | for fd, mode in ready:
|
---|
24 | if mode & flag:
|
---|
25 | match.append(fd)
|
---|
26 | return match
|
---|
27 |
|
---|
28 | class PollTests(unittest.TestCase):
|
---|
29 |
|
---|
30 | def test_poll1(self):
|
---|
31 | # Basic functional test of poll object
|
---|
32 | # Create a bunch of pipe and test that poll works with them.
|
---|
33 |
|
---|
34 | p = select.poll()
|
---|
35 |
|
---|
36 | NUM_PIPES = 12
|
---|
37 | MSG = " This is a test."
|
---|
38 | MSG_LEN = len(MSG)
|
---|
39 | readers = []
|
---|
40 | writers = []
|
---|
41 | r2w = {}
|
---|
42 | w2r = {}
|
---|
43 |
|
---|
44 | for i in range(NUM_PIPES):
|
---|
45 | rd, wr = os.pipe()
|
---|
46 | p.register(rd)
|
---|
47 | p.modify(rd, select.POLLIN)
|
---|
48 | p.register(wr, select.POLLOUT)
|
---|
49 | readers.append(rd)
|
---|
50 | writers.append(wr)
|
---|
51 | r2w[rd] = wr
|
---|
52 | w2r[wr] = rd
|
---|
53 |
|
---|
54 | bufs = []
|
---|
55 |
|
---|
56 | while writers:
|
---|
57 | ready = p.poll()
|
---|
58 | ready_writers = find_ready_matching(ready, select.POLLOUT)
|
---|
59 | if not ready_writers:
|
---|
60 | raise RuntimeError, "no pipes ready for writing"
|
---|
61 | wr = random.choice(ready_writers)
|
---|
62 | os.write(wr, MSG)
|
---|
63 |
|
---|
64 | ready = p.poll()
|
---|
65 | ready_readers = find_ready_matching(ready, select.POLLIN)
|
---|
66 | if not ready_readers:
|
---|
67 | raise RuntimeError, "no pipes ready for reading"
|
---|
68 | rd = random.choice(ready_readers)
|
---|
69 | buf = os.read(rd, MSG_LEN)
|
---|
70 | self.assertEqual(len(buf), MSG_LEN)
|
---|
71 | bufs.append(buf)
|
---|
72 | os.close(r2w[rd]) ; os.close( rd )
|
---|
73 | p.unregister( r2w[rd] )
|
---|
74 | p.unregister( rd )
|
---|
75 | writers.remove(r2w[rd])
|
---|
76 |
|
---|
77 | self.assertEqual(bufs, [MSG] * NUM_PIPES)
|
---|
78 |
|
---|
79 | def poll_unit_tests(self):
|
---|
80 | # returns NVAL for invalid file descriptor
|
---|
81 | FD = 42
|
---|
82 | try:
|
---|
83 | os.close(FD)
|
---|
84 | except OSError:
|
---|
85 | pass
|
---|
86 | p = select.poll()
|
---|
87 | p.register(FD)
|
---|
88 | r = p.poll()
|
---|
89 | self.assertEqual(r[0], (FD, select.POLLNVAL))
|
---|
90 |
|
---|
91 | f = open(TESTFN, 'w')
|
---|
92 | fd = f.fileno()
|
---|
93 | p = select.poll()
|
---|
94 | p.register(f)
|
---|
95 | r = p.poll()
|
---|
96 | self.assertEqual(r[0][0], fd)
|
---|
97 | f.close()
|
---|
98 | r = p.poll()
|
---|
99 | self.assertEqual(r[0], (fd, select.POLLNVAL))
|
---|
100 | os.unlink(TESTFN)
|
---|
101 |
|
---|
102 | # type error for invalid arguments
|
---|
103 | p = select.poll()
|
---|
104 | self.assertRaises(TypeError, p.register, p)
|
---|
105 | self.assertRaises(TypeError, p.unregister, p)
|
---|
106 |
|
---|
107 | # can't unregister non-existent object
|
---|
108 | p = select.poll()
|
---|
109 | self.assertRaises(KeyError, p.unregister, 3)
|
---|
110 |
|
---|
111 | # Test error cases
|
---|
112 | pollster = select.poll()
|
---|
113 | class Nope:
|
---|
114 | pass
|
---|
115 |
|
---|
116 | class Almost:
|
---|
117 | def fileno(self):
|
---|
118 | return 'fileno'
|
---|
119 |
|
---|
120 | self.assertRaises(TypeError, pollster.register, Nope(), 0)
|
---|
121 | self.assertRaises(TypeError, pollster.register, Almost(), 0)
|
---|
122 |
|
---|
123 | # Another test case for poll(). This is copied from the test case for
|
---|
124 | # select(), modified to use poll() instead.
|
---|
125 |
|
---|
126 | def test_poll2(self):
|
---|
127 | cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
|
---|
128 | p = os.popen(cmd, 'r')
|
---|
129 | pollster = select.poll()
|
---|
130 | pollster.register( p, select.POLLIN )
|
---|
131 | for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
|
---|
132 | fdlist = pollster.poll(tout)
|
---|
133 | if (fdlist == []):
|
---|
134 | continue
|
---|
135 | fd, flags = fdlist[0]
|
---|
136 | if flags & select.POLLHUP:
|
---|
137 | line = p.readline()
|
---|
138 | if line != "":
|
---|
139 | self.fail('error: pipe seems to be closed, but still returns data')
|
---|
140 | continue
|
---|
141 |
|
---|
142 | elif flags & select.POLLIN:
|
---|
143 | line = p.readline()
|
---|
144 | if not line:
|
---|
145 | break
|
---|
146 | continue
|
---|
147 | else:
|
---|
148 | self.fail('Unexpected return value from select.poll: %s' % fdlist)
|
---|
149 | p.close()
|
---|
150 |
|
---|
151 | def test_poll3(self):
|
---|
152 | # test int overflow
|
---|
153 | pollster = select.poll()
|
---|
154 | pollster.register(1)
|
---|
155 |
|
---|
156 | self.assertRaises(OverflowError, pollster.poll, 1L << 64)
|
---|
157 |
|
---|
158 | x = 2 + 3
|
---|
159 | if x != 5:
|
---|
160 | self.fail('Overflow must have occurred')
|
---|
161 |
|
---|
162 | pollster = select.poll()
|
---|
163 | # Issue 15989
|
---|
164 | self.assertRaises(OverflowError, pollster.register, 0,
|
---|
165 | _testcapi.SHRT_MAX + 1)
|
---|
166 | self.assertRaises(OverflowError, pollster.register, 0,
|
---|
167 | _testcapi.USHRT_MAX + 1)
|
---|
168 | self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
|
---|
169 | self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
|
---|
170 |
|
---|
171 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
172 | @reap_threads
|
---|
173 | def test_threaded_poll(self):
|
---|
174 | r, w = os.pipe()
|
---|
175 | self.addCleanup(os.close, r)
|
---|
176 | self.addCleanup(os.close, w)
|
---|
177 | rfds = []
|
---|
178 | for i in range(10):
|
---|
179 | fd = os.dup(r)
|
---|
180 | self.addCleanup(os.close, fd)
|
---|
181 | rfds.append(fd)
|
---|
182 | pollster = select.poll()
|
---|
183 | for fd in rfds:
|
---|
184 | pollster.register(fd, select.POLLIN)
|
---|
185 |
|
---|
186 | t = threading.Thread(target=pollster.poll)
|
---|
187 | t.start()
|
---|
188 | try:
|
---|
189 | time.sleep(0.5)
|
---|
190 | # trigger ufds array reallocation
|
---|
191 | for fd in rfds:
|
---|
192 | pollster.unregister(fd)
|
---|
193 | pollster.register(w, select.POLLOUT)
|
---|
194 | self.assertRaises(RuntimeError, pollster.poll)
|
---|
195 | finally:
|
---|
196 | # and make the call to poll() from the thread return
|
---|
197 | os.write(w, b'spam')
|
---|
198 | t.join()
|
---|
199 |
|
---|
200 |
|
---|
201 | def test_main():
|
---|
202 | run_unittest(PollTests)
|
---|
203 |
|
---|
204 | if __name__ == '__main__':
|
---|
205 | test_main()
|
---|