Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_poll.py

    r2 r391  
    11# Test case for the os.poll() function
    22
    3 import os, select, random, unittest
    4 from test.test_support import TestSkipped, TESTFN, run_unittest
     3import os
     4import random
     5import select
     6import _testcapi
     7try:
     8    import threading
     9except ImportError:
     10    threading = None
     11import time
     12import unittest
     13from test.test_support import TESTFN, run_unittest, reap_threads
    514
    615try:
    716    select.poll
    817except AttributeError:
    9     raise TestSkipped, "select.poll not defined -- skipping test_poll"
     18    raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
    1019
    1120
     
    151160            self.fail('Overflow must have occurred')
    152161
     162        pollster = select.poll()
     163        # Issue 15989
     164        self.assertRaises(OverflowError, pollster.register, 0,
     165                          _testcapi.SHRT_MAX + 1)
     166        self.assertRaises(OverflowError, pollster.register, 0,
     167                          _testcapi.USHRT_MAX + 1)
     168        self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
     169        self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
     170
     171    @unittest.skipUnless(threading, 'Threading required for this test.')
     172    @reap_threads
     173    def test_threaded_poll(self):
     174        r, w = os.pipe()
     175        self.addCleanup(os.close, r)
     176        self.addCleanup(os.close, w)
     177        rfds = []
     178        for i in range(10):
     179            fd = os.dup(r)
     180            self.addCleanup(os.close, fd)
     181            rfds.append(fd)
     182        pollster = select.poll()
     183        for fd in rfds:
     184            pollster.register(fd, select.POLLIN)
     185
     186        t = threading.Thread(target=pollster.poll)
     187        t.start()
     188        try:
     189            time.sleep(0.5)
     190            # trigger ufds array reallocation
     191            for fd in rfds:
     192                pollster.unregister(fd)
     193            pollster.register(w, select.POLLOUT)
     194            self.assertRaises(RuntimeError, pollster.poll)
     195        finally:
     196            # and make the call to poll() from the thread return
     197            os.write(w, b'spam')
     198            t.join()
     199
     200
    153201def test_main():
    154202    run_unittest(PollTests)
Note: See TracChangeset for help on using the changeset viewer.