Changeset 391 for python/trunk/Lib/test/test_poll.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_poll.py
r2 r391 1 1 # Test case for the os.poll() function 2 2 3 import os, select, random, unittest 4 from test.test_support import TestSkipped, TESTFN, run_unittest 3 import os 4 import random 5 import select 6 import _testcapi 7 try: 8 import threading 9 except ImportError: 10 threading = None 11 import time 12 import unittest 13 from test.test_support import TESTFN, run_unittest, reap_threads 5 14 6 15 try: 7 16 select.poll 8 17 except AttributeError: 9 raise TestSkipped, "select.poll not defined -- skipping test_poll"18 raise unittest.SkipTest, "select.poll not defined -- skipping test_poll" 10 19 11 20 … … 151 160 self.fail('Overflow must have occurred') 152 161 162 pollster = select.poll() 163 # Issue 15989 164 self.assertRaises(OverflowError, pollster.register, 0, 165 _testcapi.SHRT_MAX + 1) 166 self.assertRaises(OverflowError, pollster.register, 0, 167 _testcapi.USHRT_MAX + 1) 168 self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1) 169 self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1) 170 171 @unittest.skipUnless(threading, 'Threading required for this test.') 172 @reap_threads 173 def test_threaded_poll(self): 174 r, w = os.pipe() 175 self.addCleanup(os.close, r) 176 self.addCleanup(os.close, w) 177 rfds = [] 178 for i in range(10): 179 fd = os.dup(r) 180 self.addCleanup(os.close, fd) 181 rfds.append(fd) 182 pollster = select.poll() 183 for fd in rfds: 184 pollster.register(fd, select.POLLIN) 185 186 t = threading.Thread(target=pollster.poll) 187 t.start() 188 try: 189 time.sleep(0.5) 190 # trigger ufds array reallocation 191 for fd in rfds: 192 pollster.unregister(fd) 193 pollster.register(w, select.POLLOUT) 194 self.assertRaises(RuntimeError, pollster.poll) 195 finally: 196 # and make the call to poll() from the thread return 197 os.write(w, b'spam') 198 t.join() 199 200 153 201 def test_main(): 154 202 run_unittest(PollTests)
Note:
See TracChangeset
for help on using the changeset viewer.