Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_thread.py

    r2 r391  
    33import random
    44from test import test_support
    5 import thread
     5thread = test_support.import_module('thread')
    66import time
    77import sys
     8import weakref
    89
    910from test import lock_tests
     
    6566    def test_stack_size(self):
    6667        # Various stack size tests.
    67         self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")
     68        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
    6869
    6970        thread.stack_size(0)
    70         self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")
     71        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
    7172
    7273        if os.name not in ("nt", "os2", "posix"):
     
    8889            for tss in (262144, 0x100000, 0):
    8990                thread.stack_size(tss)
    90                 self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
     91                self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
    9192                verbose_print("successfully set stack_size(%d)" % tss)
    9293
     
    103104
    104105            thread.stack_size(0)
     106
     107    def test__count(self):
     108        # Test the _count() function.
     109        orig = thread._count()
     110        mut = thread.allocate_lock()
     111        mut.acquire()
     112        started = []
     113        def task():
     114            started.append(None)
     115            mut.acquire()
     116            mut.release()
     117        thread.start_new_thread(task, ())
     118        while not started:
     119            time.sleep(0.01)
     120        self.assertEqual(thread._count(), orig + 1)
     121        # Allow the task to finish.
     122        mut.release()
     123        # The only reliable way to be sure that the thread ended from the
     124        # interpreter's point of view is to wait for the function object to be
     125        # destroyed.
     126        done = []
     127        wr = weakref.ref(task, lambda _: done.append(None))
     128        del task
     129        while not done:
     130            time.sleep(0.01)
     131        self.assertEqual(thread._count(), orig)
     132
     133    def test_save_exception_state_on_error(self):
     134        # See issue #14474
     135        def task():
     136            started.release()
     137            raise SyntaxError
     138        def mywrite(self, *args):
     139            try:
     140                raise ValueError
     141            except ValueError:
     142                pass
     143            real_write(self, *args)
     144        c = thread._count()
     145        started = thread.allocate_lock()
     146        with test_support.captured_output("stderr") as stderr:
     147            real_write = stderr.write
     148            stderr.write = mywrite
     149            started.acquire()
     150            thread.start_new_thread(task, ())
     151            started.acquire()
     152            while thread._count() > c:
     153                time.sleep(0.01)
     154        self.assertIn("Traceback", stderr.getvalue())
    105155
    106156
     
    175225        self.read_fd, self.write_fd = os.pipe()
    176226
    177     def _test_forkinthread(self):
     227    @unittest.skipIf(sys.platform.startswith('win'),
     228                     "This test is only appropriate for POSIX-like systems.")
     229    @test_support.reap_threads
     230    def test_forkinthread(self):
    178231        def thread1():
    179232            try:
     
    193246                         "Unable to fork() in thread")
    194247
    195     if not sys.platform.startswith('win'):
    196         test_forkinthread = _test_forkinthread
    197 
    198248    def tearDown(self):
    199249        try:
Note: See TracChangeset for help on using the changeset viewer.