Changeset 391 for python/trunk/Lib/test/test_queue.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_queue.py
r2 r391 2 2 # to ensure the Queue locks remain stable. 3 3 import Queue 4 import sys5 import threading6 4 import time 7 5 import unittest 8 6 from test import test_support 7 threading = test_support.import_module('threading') 9 8 10 9 QUEUE_SIZE = 5 … … 44 43 45 44 class BlockingTestMixin: 45 46 def tearDown(self): 47 self.t = None 46 48 47 49 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): … … 81 83 82 84 83 class BaseQueueTest( unittest.TestCase,BlockingTestMixin):85 class BaseQueueTest(BlockingTestMixin): 84 86 def setUp(self): 85 87 self.cum = 0 … … 97 99 PriorityQueue = [111, 222, 333]) 98 100 actual_order = [q.get(), q.get(), q.get()] 99 self.assertEqual s(actual_order, target_order[q.__class__.__name__],100 101 self.assertEqual(actual_order, target_order[q.__class__.__name__], 102 "Didn't seem to queue the correct data!") 101 103 for i in range(QUEUE_SIZE-1): 102 104 q.put(i) 103 self.assert_(not q.empty(), "Queue should not be empty") 104 self.assert_(not q.full(), "Queue should not be full") 105 q.put("last") 106 self.assert_(q.full(), "Queue should be full") 107 try: 108 q.put("full", block=0) 105 self.assertTrue(not q.empty(), "Queue should not be empty") 106 self.assertTrue(not q.full(), "Queue should not be full") 107 last = 2 * QUEUE_SIZE 108 full = 3 * 2 * QUEUE_SIZE 109 q.put(last) 110 self.assertTrue(q.full(), "Queue should be full") 111 try: 112 q.put(full, block=0) 109 113 self.fail("Didn't appear to block with a full queue") 110 114 except Queue.Full: 111 115 pass 112 116 try: 113 q.put( "full", timeout=0.01)117 q.put(full, timeout=0.01) 114 118 self.fail("Didn't appear to time-out with a full queue") 115 119 except Queue.Full: 116 120 pass 117 121 # Test a blocking put 118 self.do_blocking_test(q.put, ( "full",), q.get, ())119 self.do_blocking_test(q.put, ( "full", True, 10), q.get, ())122 self.do_blocking_test(q.put, (full,), q.get, ()) 123 self.do_blocking_test(q.put, (full, True, 10), q.get, ()) 120 124 # Empty it 121 125 for i in range(QUEUE_SIZE): 122 126 q.get() 123 self.assert _(q.empty(), "Queue should be empty")127 self.assertTrue(q.empty(), "Queue should be empty") 124 128 try: 125 129 q.get(block=0) … … 154 158 q.put(i) 155 159 q.join() 156 self.assertEqual s(self.cum, sum(range(100)),157 160 self.assertEqual(self.cum, sum(range(100)), 161 "q.join() did not block until all tasks were done") 158 162 for i in (0,1): 159 163 q.put(None) # instruct the threads to close … … 191 195 192 196 193 class QueueTest(BaseQueueTest ):197 class QueueTest(BaseQueueTest, unittest.TestCase): 194 198 type2test = Queue.Queue 195 199 196 class LifoQueueTest(BaseQueueTest ):200 class LifoQueueTest(BaseQueueTest, unittest.TestCase): 197 201 type2test = Queue.LifoQueue 198 202 199 class PriorityQueueTest(BaseQueueTest ):203 class PriorityQueueTest(BaseQueueTest, unittest.TestCase): 200 204 type2test = Queue.PriorityQueue 201 205 … … 222 226 return Queue.Queue._get(self) 223 227 224 class FailingQueueTest( unittest.TestCase, BlockingTestMixin):228 class FailingQueueTest(BlockingTestMixin, unittest.TestCase): 225 229 226 230 def failing_queue_test(self, q): … … 243 247 pass 244 248 q.put("last") 245 self.assert _(q.full(), "Queue should be full")249 self.assertTrue(q.full(), "Queue should be full") 246 250 # Test a failing blocking put 247 251 q.fail_next_put = True … … 265 269 # put failed, but get succeeded - re-add 266 270 q.put("last") 267 self.assert _(q.full(), "Queue should be full")271 self.assertTrue(q.full(), "Queue should be full") 268 272 q.get() 269 self.assert _(not q.full(), "Queue should not be full")273 self.assertTrue(not q.full(), "Queue should not be full") 270 274 q.put("last") 271 self.assert _(q.full(), "Queue should be full")275 self.assertTrue(q.full(), "Queue should be full") 272 276 # Test a blocking put 273 277 self.do_blocking_test(q.put, ("full",), q.get, ()) … … 275 279 for i in range(QUEUE_SIZE): 276 280 q.get() 277 self.assert _(q.empty(), "Queue should be empty")281 self.assertTrue(q.empty(), "Queue should be empty") 278 282 q.put("first") 279 283 q.fail_next_get = True … … 283 287 except FailingQueueException: 284 288 pass 285 self.assert _(not q.empty(), "Queue should not be empty")289 self.assertTrue(not q.empty(), "Queue should not be empty") 286 290 q.fail_next_get = True 287 291 try: … … 290 294 except FailingQueueException: 291 295 pass 292 self.assert _(not q.empty(), "Queue should not be empty")296 self.assertTrue(not q.empty(), "Queue should not be empty") 293 297 q.get() 294 self.assert _(q.empty(), "Queue should be empty")298 self.assertTrue(q.empty(), "Queue should be empty") 295 299 q.fail_next_get = True 296 300 try: … … 301 305 pass 302 306 # put succeeded, but get failed. 303 self.assert _(not q.empty(), "Queue should not be empty")307 self.assertTrue(not q.empty(), "Queue should not be empty") 304 308 q.get() 305 self.assert _(q.empty(), "Queue should be empty")309 self.assertTrue(q.empty(), "Queue should be empty") 306 310 307 311 def test_failing_queue(self):
Note:
See TracChangeset
for help on using the changeset viewer.