Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_queue.py

    r2 r391  
    22# to ensure the Queue locks remain stable.
    33import Queue
    4 import sys
    5 import threading
    64import time
    75import unittest
    86from test import test_support
     7threading = test_support.import_module('threading')
    98
    109QUEUE_SIZE = 5
     
    4443
    4544class BlockingTestMixin:
     45
     46    def tearDown(self):
     47        self.t = None
    4648
    4749    def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
     
    8183
    8284
    83 class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
     85class BaseQueueTest(BlockingTestMixin):
    8486    def setUp(self):
    8587        self.cum = 0
     
    9799                            PriorityQueue = [111, 222, 333])
    98100        actual_order = [q.get(), q.get(), q.get()]
    99         self.assertEquals(actual_order, target_order[q.__class__.__name__],
    100                           "Didn't seem to queue the correct data!")
     101        self.assertEqual(actual_order, target_order[q.__class__.__name__],
     102                         "Didn't seem to queue the correct data!")
    101103        for i in range(QUEUE_SIZE-1):
    102104            q.put(i)
    103             self.assert_(not q.empty(), "Queue should not be empty")
    104         self.assert_(not q.full(), "Queue should not be full")
    105         q.put("last")
    106         self.assert_(q.full(), "Queue should be full")
    107         try:
    108             q.put("full", block=0)
     105            self.assertTrue(not q.empty(), "Queue should not be empty")
     106        self.assertTrue(not q.full(), "Queue should not be full")
     107        last = 2 * QUEUE_SIZE
     108        full = 3 * 2 * QUEUE_SIZE
     109        q.put(last)
     110        self.assertTrue(q.full(), "Queue should be full")
     111        try:
     112            q.put(full, block=0)
    109113            self.fail("Didn't appear to block with a full queue")
    110114        except Queue.Full:
    111115            pass
    112116        try:
    113             q.put("full", timeout=0.01)
     117            q.put(full, timeout=0.01)
    114118            self.fail("Didn't appear to time-out with a full queue")
    115119        except Queue.Full:
    116120            pass
    117121        # Test a blocking put
    118         self.do_blocking_test(q.put, ("full",), q.get, ())
    119         self.do_blocking_test(q.put, ("full", True, 10), q.get, ())
     122        self.do_blocking_test(q.put, (full,), q.get, ())
     123        self.do_blocking_test(q.put, (full, True, 10), q.get, ())
    120124        # Empty it
    121125        for i in range(QUEUE_SIZE):
    122126            q.get()
    123         self.assert_(q.empty(), "Queue should be empty")
     127        self.assertTrue(q.empty(), "Queue should be empty")
    124128        try:
    125129            q.get(block=0)
     
    154158            q.put(i)
    155159        q.join()
    156         self.assertEquals(self.cum, sum(range(100)),
    157                           "q.join() did not block until all tasks were done")
     160        self.assertEqual(self.cum, sum(range(100)),
     161                         "q.join() did not block until all tasks were done")
    158162        for i in (0,1):
    159163            q.put(None)         # instruct the threads to close
     
    191195
    192196
    193 class QueueTest(BaseQueueTest):
     197class QueueTest(BaseQueueTest, unittest.TestCase):
    194198    type2test = Queue.Queue
    195199
    196 class LifoQueueTest(BaseQueueTest):
     200class LifoQueueTest(BaseQueueTest, unittest.TestCase):
    197201    type2test = Queue.LifoQueue
    198202
    199 class PriorityQueueTest(BaseQueueTest):
     203class PriorityQueueTest(BaseQueueTest, unittest.TestCase):
    200204    type2test = Queue.PriorityQueue
    201205
     
    222226        return Queue.Queue._get(self)
    223227
    224 class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
     228class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
    225229
    226230    def failing_queue_test(self, q):
     
    243247            pass
    244248        q.put("last")
    245         self.assert_(q.full(), "Queue should be full")
     249        self.assertTrue(q.full(), "Queue should be full")
    246250        # Test a failing blocking put
    247251        q.fail_next_put = True
     
    265269        # put failed, but get succeeded - re-add
    266270        q.put("last")
    267         self.assert_(q.full(), "Queue should be full")
     271        self.assertTrue(q.full(), "Queue should be full")
    268272        q.get()
    269         self.assert_(not q.full(), "Queue should not be full")
     273        self.assertTrue(not q.full(), "Queue should not be full")
    270274        q.put("last")
    271         self.assert_(q.full(), "Queue should be full")
     275        self.assertTrue(q.full(), "Queue should be full")
    272276        # Test a blocking put
    273277        self.do_blocking_test(q.put, ("full",), q.get, ())
     
    275279        for i in range(QUEUE_SIZE):
    276280            q.get()
    277         self.assert_(q.empty(), "Queue should be empty")
     281        self.assertTrue(q.empty(), "Queue should be empty")
    278282        q.put("first")
    279283        q.fail_next_get = True
     
    283287        except FailingQueueException:
    284288            pass
    285         self.assert_(not q.empty(), "Queue should not be empty")
     289        self.assertTrue(not q.empty(), "Queue should not be empty")
    286290        q.fail_next_get = True
    287291        try:
     
    290294        except FailingQueueException:
    291295            pass
    292         self.assert_(not q.empty(), "Queue should not be empty")
     296        self.assertTrue(not q.empty(), "Queue should not be empty")
    293297        q.get()
    294         self.assert_(q.empty(), "Queue should be empty")
     298        self.assertTrue(q.empty(), "Queue should be empty")
    295299        q.fail_next_get = True
    296300        try:
     
    301305            pass
    302306        # put succeeded, but get failed.
    303         self.assert_(not q.empty(), "Queue should not be empty")
     307        self.assertTrue(not q.empty(), "Queue should not be empty")
    304308        q.get()
    305         self.assert_(q.empty(), "Queue should be empty")
     309        self.assertTrue(q.empty(), "Queue should be empty")
    306310
    307311    def test_failing_queue(self):
Note: See TracChangeset for help on using the changeset viewer.