Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_zlib.py

    r2 r391  
    11import unittest
    2 from test import test_support
    3 import zlib
     2from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
    43import binascii
    54import random
     5from test.test_support import precisionbigmemtest, _1G, _4G
     6import sys
     7
     8try:
     9    import mmap
     10except ImportError:
     11    mmap = None
     12
     13zlib = import_module('zlib')
    614
    715
     
    1018    def test_crc32start(self):
    1119        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
    12         self.assert_(zlib.crc32("abc", 0xffffffff))
     20        self.assertTrue(zlib.crc32("abc", 0xffffffff))
    1321
    1422    def test_crc32empty(self):
     
    1927    def test_adler32start(self):
    2028        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
    21         self.assert_(zlib.adler32("abc", 0xffffffff))
     29        self.assertTrue(zlib.adler32("abc", 0xffffffff))
    2230
    2331    def test_adler32empty(self):
     
    8290    def test_baddecompressobj(self):
    8391        # verify failure on building decompress object with bad params
    84         self.assertRaises(ValueError, zlib.decompressobj, 0)
     92        self.assertRaises(ValueError, zlib.decompressobj, -1)
    8593
    8694    def test_decompressobj_badflush(self):
     
    9098
    9199
    92 
    93 class CompressTestCase(unittest.TestCase):
     100class BaseCompressTestCase(object):
     101    def check_big_compress_buffer(self, size, compress_func):
     102        _1M = 1024 * 1024
     103        fmt = "%%0%dx" % (2 * _1M)
     104        # Generate 10MB worth of random, and expand it by repeating it.
     105        # The assumption is that zlib's memory is not big enough to exploit
     106        # such spread out redundancy.
     107        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
     108                        for i in range(10)])
     109        data = data * (size // len(data) + 1)
     110        try:
     111            compress_func(data)
     112        finally:
     113            # Release memory
     114            data = None
     115
     116    def check_big_decompress_buffer(self, size, decompress_func):
     117        data = 'x' * size
     118        try:
     119            compressed = zlib.compress(data, 1)
     120        finally:
     121            # Release memory
     122            data = None
     123        data = decompress_func(compressed)
     124        # Sanity check
     125        try:
     126            self.assertEqual(len(data), size)
     127            self.assertEqual(len(data.strip('x')), 0)
     128        finally:
     129            data = None
     130
     131
     132class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
    94133    # Test compression in one go (whole message compression)
    95134    def test_speech(self):
     
    103142        self.assertEqual(zlib.decompress(x), data)
    104143
    105 
    106 
    107 
    108 class CompressObjectTestCase(unittest.TestCase):
     144    def test_incomplete_stream(self):
     145        # An useful error message is given
     146        x = zlib.compress(HAMLET_SCENE)
     147        self.assertRaisesRegexp(zlib.error,
     148            "Error -5 while decompressing data: incomplete or truncated stream",
     149            zlib.decompress, x[:-1])
     150
     151    # Memory use of the following functions takes into account overallocation
     152
     153    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
     154    def test_big_compress_buffer(self, size):
     155        compress = lambda s: zlib.compress(s, 1)
     156        self.check_big_compress_buffer(size, compress)
     157
     158    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
     159    def test_big_decompress_buffer(self, size):
     160        self.check_big_decompress_buffer(size, zlib.decompress)
     161
     162
     163class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
    109164    # Test compression object
    110165    def test_pair(self):
     
    208263            #max_length = 1 + len(cb)//10
    209264            chunk = dco.decompress(cb, dcx)
    210             self.failIf(len(chunk) > dcx,
     265            self.assertFalse(len(chunk) > dcx,
    211266                    'chunk too big (%d>%d)' % (len(chunk), dcx))
    212267            bufs.append(chunk)
     
    233288            max_length = 1 + len(cb)//10
    234289            chunk = dco.decompress(cb, max_length)
    235             self.failIf(len(chunk) > max_length,
     290            self.assertFalse(len(chunk) > max_length,
    236291                        'chunk too big (%d>%d)' % (len(chunk),max_length))
    237292            bufs.append(chunk)
     
    242297            while chunk:
    243298                chunk = dco.decompress('', max_length)
    244                 self.failIf(len(chunk) > max_length,
     299                self.assertFalse(len(chunk) > max_length,
    245300                            'chunk too big (%d>%d)' % (len(chunk),max_length))
    246301                bufs.append(chunk)
     
    255310        self.assertRaises(ValueError, dco.decompress, "", -1)
    256311        self.assertEqual('', dco.unconsumed_tail)
     312
     313    def test_clear_unconsumed_tail(self):
     314        # Issue #12050: calling decompress() without providing max_length
     315        # should clear the unconsumed_tail attribute.
     316        cdata = "x\x9cKLJ\x06\x00\x02M\x01"     # "abc"
     317        dco = zlib.decompressobj()
     318        ddata = dco.decompress(cdata, 1)
     319        ddata += dco.decompress(dco.unconsumed_tail)
     320        self.assertEqual(dco.unconsumed_tail, "")
    257321
    258322    def test_flushes(self):
     
    316380
    317381        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
    318         self.failUnless(co.flush())  # Returns a zlib header
     382        self.assertTrue(co.flush())  # Returns a zlib header
    319383        dco = zlib.decompressobj()
    320384        self.assertEqual(dco.flush(), "") # Returns nothing
     385
     386    def test_decompress_incomplete_stream(self):
     387        # This is 'foo', deflated
     388        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
     389        # For the record
     390        self.assertEqual(zlib.decompress(x), 'foo')
     391        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
     392        # Omitting the stream end works with decompressor objects
     393        # (see issue #8672).
     394        dco = zlib.decompressobj()
     395        y = dco.decompress(x[:-5])
     396        y += dco.flush()
     397        self.assertEqual(y, 'foo')
     398
     399    def test_flush_with_freed_input(self):
     400        # Issue #16411: decompressor accesses input to last decompress() call
     401        # in flush(), even if this object has been freed in the meanwhile.
     402        input1 = 'abcdefghijklmnopqrstuvwxyz'
     403        input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM'
     404        data = zlib.compress(input1)
     405        dco = zlib.decompressobj()
     406        dco.decompress(data, 1)
     407        del data
     408        data = zlib.compress(input2)
     409        self.assertEqual(dco.flush(), input1[1:])
    321410
    322411    if hasattr(zlib.compressobj(), "copy"):
     
    350439            self.assertRaises(ValueError, c.copy)
    351440
     441    def test_decompress_unused_data(self):
     442        # Repeated calls to decompress() after EOF should accumulate data in
     443        # dco.unused_data, instead of just storing the arg to the last call.
     444        source = b'abcdefghijklmnopqrstuvwxyz'
     445        remainder = b'0123456789'
     446        y = zlib.compress(source)
     447        x = y + remainder
     448        for maxlen in 0, 1000:
     449            for step in 1, 2, len(y), len(x):
     450                dco = zlib.decompressobj()
     451                data = b''
     452                for i in range(0, len(x), step):
     453                    if i < len(y):
     454                        self.assertEqual(dco.unused_data, b'')
     455                    if maxlen == 0:
     456                        data += dco.decompress(x[i : i + step])
     457                        self.assertEqual(dco.unconsumed_tail, b'')
     458                    else:
     459                        data += dco.decompress(
     460                                dco.unconsumed_tail + x[i : i + step], maxlen)
     461                data += dco.flush()
     462                self.assertEqual(data, source)
     463                self.assertEqual(dco.unconsumed_tail, b'')
     464                self.assertEqual(dco.unused_data, remainder)
     465
    352466    if hasattr(zlib.decompressobj(), "copy"):
    353467        def test_decompresscopy(self):
     
    379493            d.flush()
    380494            self.assertRaises(ValueError, d.copy)
     495
     496    # Memory use of the following functions takes into account overallocation
     497
     498    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
     499    def test_big_compress_buffer(self, size):
     500        c = zlib.compressobj(1)
     501        compress = lambda s: c.compress(s) + c.flush()
     502        self.check_big_compress_buffer(size, compress)
     503
     504    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
     505    def test_big_decompress_buffer(self, size):
     506        d = zlib.decompressobj()
     507        decompress = lambda s: d.decompress(s) + d.flush()
     508        self.check_big_decompress_buffer(size, decompress)
     509
    381510
    382511def genblock(seed, length, step=1024, generator=random):
     
    470599
    471600def test_main():
    472     test_support.run_unittest(
     601    run_unittest(
    473602        ChecksumTestCase,
    474603        ExceptionTestCase,
Note: See TracChangeset for help on using the changeset viewer.