Changeset 391 for python/trunk/Lib/test/test_zlib.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_zlib.py
r2 r391 1 1 import unittest 2 from test import test_support 3 import zlib 2 from test.test_support import TESTFN, run_unittest, import_module, unlink, requires 4 3 import binascii 5 4 import random 5 from test.test_support import precisionbigmemtest, _1G, _4G 6 import sys 7 8 try: 9 import mmap 10 except ImportError: 11 mmap = None 12 13 zlib = import_module('zlib') 6 14 7 15 … … 10 18 def test_crc32start(self): 11 19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 12 self.assert _(zlib.crc32("abc", 0xffffffff))20 self.assertTrue(zlib.crc32("abc", 0xffffffff)) 13 21 14 22 def test_crc32empty(self): … … 19 27 def test_adler32start(self): 20 28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 21 self.assert _(zlib.adler32("abc", 0xffffffff))29 self.assertTrue(zlib.adler32("abc", 0xffffffff)) 22 30 23 31 def test_adler32empty(self): … … 82 90 def test_baddecompressobj(self): 83 91 # verify failure on building decompress object with bad params 84 self.assertRaises(ValueError, zlib.decompressobj, 0)92 self.assertRaises(ValueError, zlib.decompressobj, -1) 85 93 86 94 def test_decompressobj_badflush(self): … … 90 98 91 99 92 93 class CompressTestCase(unittest.TestCase): 100 class BaseCompressTestCase(object): 101 def check_big_compress_buffer(self, size, compress_func): 102 _1M = 1024 * 1024 103 fmt = "%%0%dx" % (2 * _1M) 104 # Generate 10MB worth of random, and expand it by repeating it. 105 # The assumption is that zlib's memory is not big enough to exploit 106 # such spread out redundancy. 107 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) 108 for i in range(10)]) 109 data = data * (size // len(data) + 1) 110 try: 111 compress_func(data) 112 finally: 113 # Release memory 114 data = None 115 116 def check_big_decompress_buffer(self, size, decompress_func): 117 data = 'x' * size 118 try: 119 compressed = zlib.compress(data, 1) 120 finally: 121 # Release memory 122 data = None 123 data = decompress_func(compressed) 124 # Sanity check 125 try: 126 self.assertEqual(len(data), size) 127 self.assertEqual(len(data.strip('x')), 0) 128 finally: 129 data = None 130 131 132 class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 94 133 # Test compression in one go (whole message compression) 95 134 def test_speech(self): … … 103 142 self.assertEqual(zlib.decompress(x), data) 104 143 105 106 107 108 class CompressObjectTestCase(unittest.TestCase): 144 def test_incomplete_stream(self): 145 # An useful error message is given 146 x = zlib.compress(HAMLET_SCENE) 147 self.assertRaisesRegexp(zlib.error, 148 "Error -5 while decompressing data: incomplete or truncated stream", 149 zlib.decompress, x[:-1]) 150 151 # Memory use of the following functions takes into account overallocation 152 153 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 154 def test_big_compress_buffer(self, size): 155 compress = lambda s: zlib.compress(s, 1) 156 self.check_big_compress_buffer(size, compress) 157 158 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 159 def test_big_decompress_buffer(self, size): 160 self.check_big_decompress_buffer(size, zlib.decompress) 161 162 163 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 109 164 # Test compression object 110 165 def test_pair(self): … … 208 263 #max_length = 1 + len(cb)//10 209 264 chunk = dco.decompress(cb, dcx) 210 self. failIf(len(chunk) > dcx,265 self.assertFalse(len(chunk) > dcx, 211 266 'chunk too big (%d>%d)' % (len(chunk), dcx)) 212 267 bufs.append(chunk) … … 233 288 max_length = 1 + len(cb)//10 234 289 chunk = dco.decompress(cb, max_length) 235 self. failIf(len(chunk) > max_length,290 self.assertFalse(len(chunk) > max_length, 236 291 'chunk too big (%d>%d)' % (len(chunk),max_length)) 237 292 bufs.append(chunk) … … 242 297 while chunk: 243 298 chunk = dco.decompress('', max_length) 244 self. failIf(len(chunk) > max_length,299 self.assertFalse(len(chunk) > max_length, 245 300 'chunk too big (%d>%d)' % (len(chunk),max_length)) 246 301 bufs.append(chunk) … … 255 310 self.assertRaises(ValueError, dco.decompress, "", -1) 256 311 self.assertEqual('', dco.unconsumed_tail) 312 313 def test_clear_unconsumed_tail(self): 314 # Issue #12050: calling decompress() without providing max_length 315 # should clear the unconsumed_tail attribute. 316 cdata = "x\x9cKLJ\x06\x00\x02M\x01" # "abc" 317 dco = zlib.decompressobj() 318 ddata = dco.decompress(cdata, 1) 319 ddata += dco.decompress(dco.unconsumed_tail) 320 self.assertEqual(dco.unconsumed_tail, "") 257 321 258 322 def test_flushes(self): … … 316 380 317 381 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 318 self. failUnless(co.flush()) # Returns a zlib header382 self.assertTrue(co.flush()) # Returns a zlib header 319 383 dco = zlib.decompressobj() 320 384 self.assertEqual(dco.flush(), "") # Returns nothing 385 386 def test_decompress_incomplete_stream(self): 387 # This is 'foo', deflated 388 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 389 # For the record 390 self.assertEqual(zlib.decompress(x), 'foo') 391 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 392 # Omitting the stream end works with decompressor objects 393 # (see issue #8672). 394 dco = zlib.decompressobj() 395 y = dco.decompress(x[:-5]) 396 y += dco.flush() 397 self.assertEqual(y, 'foo') 398 399 def test_flush_with_freed_input(self): 400 # Issue #16411: decompressor accesses input to last decompress() call 401 # in flush(), even if this object has been freed in the meanwhile. 402 input1 = 'abcdefghijklmnopqrstuvwxyz' 403 input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM' 404 data = zlib.compress(input1) 405 dco = zlib.decompressobj() 406 dco.decompress(data, 1) 407 del data 408 data = zlib.compress(input2) 409 self.assertEqual(dco.flush(), input1[1:]) 321 410 322 411 if hasattr(zlib.compressobj(), "copy"): … … 350 439 self.assertRaises(ValueError, c.copy) 351 440 441 def test_decompress_unused_data(self): 442 # Repeated calls to decompress() after EOF should accumulate data in 443 # dco.unused_data, instead of just storing the arg to the last call. 444 source = b'abcdefghijklmnopqrstuvwxyz' 445 remainder = b'0123456789' 446 y = zlib.compress(source) 447 x = y + remainder 448 for maxlen in 0, 1000: 449 for step in 1, 2, len(y), len(x): 450 dco = zlib.decompressobj() 451 data = b'' 452 for i in range(0, len(x), step): 453 if i < len(y): 454 self.assertEqual(dco.unused_data, b'') 455 if maxlen == 0: 456 data += dco.decompress(x[i : i + step]) 457 self.assertEqual(dco.unconsumed_tail, b'') 458 else: 459 data += dco.decompress( 460 dco.unconsumed_tail + x[i : i + step], maxlen) 461 data += dco.flush() 462 self.assertEqual(data, source) 463 self.assertEqual(dco.unconsumed_tail, b'') 464 self.assertEqual(dco.unused_data, remainder) 465 352 466 if hasattr(zlib.decompressobj(), "copy"): 353 467 def test_decompresscopy(self): … … 379 493 d.flush() 380 494 self.assertRaises(ValueError, d.copy) 495 496 # Memory use of the following functions takes into account overallocation 497 498 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 499 def test_big_compress_buffer(self, size): 500 c = zlib.compressobj(1) 501 compress = lambda s: c.compress(s) + c.flush() 502 self.check_big_compress_buffer(size, compress) 503 504 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 505 def test_big_decompress_buffer(self, size): 506 d = zlib.decompressobj() 507 decompress = lambda s: d.decompress(s) + d.flush() 508 self.check_big_decompress_buffer(size, decompress) 509 381 510 382 511 def genblock(seed, length, step=1024, generator=random): … … 470 599 471 600 def test_main(): 472 test_support.run_unittest(601 run_unittest( 473 602 ChecksumTestCase, 474 603 ExceptionTestCase,
Note:
See TracChangeset
for help on using the changeset viewer.