1 | #! /usr/bin/env python
|
---|
2 | """Test script for the gzip module.
|
---|
3 | """
|
---|
4 |
|
---|
5 | import unittest
|
---|
6 | from test import test_support
|
---|
7 | import os
|
---|
8 | import io
|
---|
9 | import struct
|
---|
10 | gzip = test_support.import_module('gzip')
|
---|
11 |
|
---|
12 | data1 = """ int length=DEFAULTALLOC, err = Z_OK;
|
---|
13 | PyObject *RetVal;
|
---|
14 | int flushmode = Z_FINISH;
|
---|
15 | unsigned long start_total_out;
|
---|
16 |
|
---|
17 | """
|
---|
18 |
|
---|
19 | data2 = """/* zlibmodule.c -- gzip-compatible data compression */
|
---|
20 | /* See http://www.gzip.org/zlib/
|
---|
21 | /* See http://www.winimage.com/zLibDll for Windows */
|
---|
22 | """
|
---|
23 |
|
---|
24 |
|
---|
25 | class TestGzip(unittest.TestCase):
|
---|
26 | filename = test_support.TESTFN
|
---|
27 |
|
---|
28 | def setUp(self):
|
---|
29 | test_support.unlink(self.filename)
|
---|
30 |
|
---|
31 | def tearDown(self):
|
---|
32 | test_support.unlink(self.filename)
|
---|
33 |
|
---|
34 |
|
---|
35 | def test_write(self):
|
---|
36 | with gzip.GzipFile(self.filename, 'wb') as f:
|
---|
37 | f.write(data1 * 50)
|
---|
38 |
|
---|
39 | # Try flush and fileno.
|
---|
40 | f.flush()
|
---|
41 | f.fileno()
|
---|
42 | if hasattr(os, 'fsync'):
|
---|
43 | os.fsync(f.fileno())
|
---|
44 | f.close()
|
---|
45 |
|
---|
46 | # Test multiple close() calls.
|
---|
47 | f.close()
|
---|
48 |
|
---|
49 | def test_read(self):
|
---|
50 | self.test_write()
|
---|
51 | # Try reading.
|
---|
52 | with gzip.GzipFile(self.filename, 'r') as f:
|
---|
53 | d = f.read()
|
---|
54 | self.assertEqual(d, data1*50)
|
---|
55 |
|
---|
56 | def test_read_universal_newlines(self):
|
---|
57 | # Issue #5148: Reading breaks when mode contains 'U'.
|
---|
58 | self.test_write()
|
---|
59 | with gzip.GzipFile(self.filename, 'rU') as f:
|
---|
60 | d = f.read()
|
---|
61 | self.assertEqual(d, data1*50)
|
---|
62 |
|
---|
63 | def test_io_on_closed_object(self):
|
---|
64 | # Test that I/O operations on closed GzipFile objects raise a
|
---|
65 | # ValueError, just like the corresponding functions on file objects.
|
---|
66 |
|
---|
67 | # Write to a file, open it for reading, then close it.
|
---|
68 | self.test_write()
|
---|
69 | f = gzip.GzipFile(self.filename, 'r')
|
---|
70 | f.close()
|
---|
71 | with self.assertRaises(ValueError):
|
---|
72 | f.read(1)
|
---|
73 | with self.assertRaises(ValueError):
|
---|
74 | f.seek(0)
|
---|
75 | with self.assertRaises(ValueError):
|
---|
76 | f.tell()
|
---|
77 | # Open the file for writing, then close it.
|
---|
78 | f = gzip.GzipFile(self.filename, 'w')
|
---|
79 | f.close()
|
---|
80 | with self.assertRaises(ValueError):
|
---|
81 | f.write('')
|
---|
82 | with self.assertRaises(ValueError):
|
---|
83 | f.flush()
|
---|
84 |
|
---|
85 | def test_append(self):
|
---|
86 | self.test_write()
|
---|
87 | # Append to the previous file
|
---|
88 | with gzip.GzipFile(self.filename, 'ab') as f:
|
---|
89 | f.write(data2 * 15)
|
---|
90 |
|
---|
91 | with gzip.GzipFile(self.filename, 'rb') as f:
|
---|
92 | d = f.read()
|
---|
93 | self.assertEqual(d, (data1*50) + (data2*15))
|
---|
94 |
|
---|
95 | def test_many_append(self):
|
---|
96 | # Bug #1074261 was triggered when reading a file that contained
|
---|
97 | # many, many members. Create such a file and verify that reading it
|
---|
98 | # works.
|
---|
99 | with gzip.open(self.filename, 'wb', 9) as f:
|
---|
100 | f.write('a')
|
---|
101 | for i in range(0, 200):
|
---|
102 | with gzip.open(self.filename, "ab", 9) as f: # append
|
---|
103 | f.write('a')
|
---|
104 |
|
---|
105 | # Try reading the file
|
---|
106 | with gzip.open(self.filename, "rb") as zgfile:
|
---|
107 | contents = ""
|
---|
108 | while 1:
|
---|
109 | ztxt = zgfile.read(8192)
|
---|
110 | contents += ztxt
|
---|
111 | if not ztxt: break
|
---|
112 | self.assertEqual(contents, 'a'*201)
|
---|
113 |
|
---|
114 | def test_buffered_reader(self):
|
---|
115 | # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
|
---|
116 | # performance.
|
---|
117 | self.test_write()
|
---|
118 |
|
---|
119 | with gzip.GzipFile(self.filename, 'rb') as f:
|
---|
120 | with io.BufferedReader(f) as r:
|
---|
121 | lines = [line for line in r]
|
---|
122 |
|
---|
123 | self.assertEqual(lines, 50 * data1.splitlines(True))
|
---|
124 |
|
---|
125 | def test_readline(self):
|
---|
126 | self.test_write()
|
---|
127 | # Try .readline() with varying line lengths
|
---|
128 |
|
---|
129 | with gzip.GzipFile(self.filename, 'rb') as f:
|
---|
130 | line_length = 0
|
---|
131 | while 1:
|
---|
132 | L = f.readline(line_length)
|
---|
133 | if not L and line_length != 0: break
|
---|
134 | self.assertTrue(len(L) <= line_length)
|
---|
135 | line_length = (line_length + 1) % 50
|
---|
136 |
|
---|
137 | def test_readlines(self):
|
---|
138 | self.test_write()
|
---|
139 | # Try .readlines()
|
---|
140 |
|
---|
141 | with gzip.GzipFile(self.filename, 'rb') as f:
|
---|
142 | L = f.readlines()
|
---|
143 |
|
---|
144 | with gzip.GzipFile(self.filename, 'rb') as f:
|
---|
145 | while 1:
|
---|
146 | L = f.readlines(150)
|
---|
147 | if L == []: break
|
---|
148 |
|
---|
149 | def test_seek_read(self):
|
---|
150 | self.test_write()
|
---|
151 | # Try seek, read test
|
---|
152 |
|
---|
153 | with gzip.GzipFile(self.filename) as f:
|
---|
154 | while 1:
|
---|
155 | oldpos = f.tell()
|
---|
156 | line1 = f.readline()
|
---|
157 | if not line1: break
|
---|
158 | newpos = f.tell()
|
---|
159 | f.seek(oldpos) # negative seek
|
---|
160 | if len(line1)>10:
|
---|
161 | amount = 10
|
---|
162 | else:
|
---|
163 | amount = len(line1)
|
---|
164 | line2 = f.read(amount)
|
---|
165 | self.assertEqual(line1[:amount], line2)
|
---|
166 | f.seek(newpos) # positive seek
|
---|
167 |
|
---|
168 | def test_seek_whence(self):
|
---|
169 | self.test_write()
|
---|
170 | # Try seek(whence=1), read test
|
---|
171 |
|
---|
172 | with gzip.GzipFile(self.filename) as f:
|
---|
173 | f.read(10)
|
---|
174 | f.seek(10, whence=1)
|
---|
175 | y = f.read(10)
|
---|
176 | self.assertEqual(y, data1[20:30])
|
---|
177 |
|
---|
178 | def test_seek_write(self):
|
---|
179 | # Try seek, write test
|
---|
180 | with gzip.GzipFile(self.filename, 'w') as f:
|
---|
181 | for pos in range(0, 256, 16):
|
---|
182 | f.seek(pos)
|
---|
183 | f.write('GZ\n')
|
---|
184 |
|
---|
185 | def test_mode(self):
|
---|
186 | self.test_write()
|
---|
187 | with gzip.GzipFile(self.filename, 'r') as f:
|
---|
188 | self.assertEqual(f.myfileobj.mode, 'rb')
|
---|
189 |
|
---|
190 | def test_1647484(self):
|
---|
191 | for mode in ('wb', 'rb'):
|
---|
192 | with gzip.GzipFile(self.filename, mode) as f:
|
---|
193 | self.assertTrue(hasattr(f, "name"))
|
---|
194 | self.assertEqual(f.name, self.filename)
|
---|
195 |
|
---|
196 | def test_mtime(self):
|
---|
197 | mtime = 123456789
|
---|
198 | with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
|
---|
199 | fWrite.write(data1)
|
---|
200 | with gzip.GzipFile(self.filename) as fRead:
|
---|
201 | dataRead = fRead.read()
|
---|
202 | self.assertEqual(dataRead, data1)
|
---|
203 | self.assertTrue(hasattr(fRead, 'mtime'))
|
---|
204 | self.assertEqual(fRead.mtime, mtime)
|
---|
205 |
|
---|
206 | def test_metadata(self):
|
---|
207 | mtime = 123456789
|
---|
208 |
|
---|
209 | with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
|
---|
210 | fWrite.write(data1)
|
---|
211 |
|
---|
212 | with open(self.filename, 'rb') as fRead:
|
---|
213 | # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
|
---|
214 |
|
---|
215 | idBytes = fRead.read(2)
|
---|
216 | self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
|
---|
217 |
|
---|
218 | cmByte = fRead.read(1)
|
---|
219 | self.assertEqual(cmByte, '\x08') # deflate
|
---|
220 |
|
---|
221 | flagsByte = fRead.read(1)
|
---|
222 | self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
|
---|
223 |
|
---|
224 | mtimeBytes = fRead.read(4)
|
---|
225 | self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
|
---|
226 |
|
---|
227 | xflByte = fRead.read(1)
|
---|
228 | self.assertEqual(xflByte, '\x02') # maximum compression
|
---|
229 |
|
---|
230 | osByte = fRead.read(1)
|
---|
231 | self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
|
---|
232 |
|
---|
233 | # Since the FNAME flag is set, the zero-terminated filename follows.
|
---|
234 | # RFC 1952 specifies that this is the name of the input file, if any.
|
---|
235 | # However, the gzip module defaults to storing the name of the output
|
---|
236 | # file in this field.
|
---|
237 | expected = self.filename.encode('Latin-1') + '\x00'
|
---|
238 | nameBytes = fRead.read(len(expected))
|
---|
239 | self.assertEqual(nameBytes, expected)
|
---|
240 |
|
---|
241 | # Since no other flags were set, the header ends here.
|
---|
242 | # Rather than process the compressed data, let's seek to the trailer.
|
---|
243 | fRead.seek(os.stat(self.filename).st_size - 8)
|
---|
244 |
|
---|
245 | crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
|
---|
246 | self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
|
---|
247 |
|
---|
248 | isizeBytes = fRead.read(4)
|
---|
249 | self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
|
---|
250 |
|
---|
251 | def test_with_open(self):
|
---|
252 | # GzipFile supports the context management protocol
|
---|
253 | with gzip.GzipFile(self.filename, "wb") as f:
|
---|
254 | f.write(b"xxx")
|
---|
255 | f = gzip.GzipFile(self.filename, "rb")
|
---|
256 | f.close()
|
---|
257 | try:
|
---|
258 | with f:
|
---|
259 | pass
|
---|
260 | except ValueError:
|
---|
261 | pass
|
---|
262 | else:
|
---|
263 | self.fail("__enter__ on a closed file didn't raise an exception")
|
---|
264 | try:
|
---|
265 | with gzip.GzipFile(self.filename, "wb") as f:
|
---|
266 | 1 // 0
|
---|
267 | except ZeroDivisionError:
|
---|
268 | pass
|
---|
269 | else:
|
---|
270 | self.fail("1 // 0 didn't raise an exception")
|
---|
271 |
|
---|
272 | def test_zero_padded_file(self):
|
---|
273 | with gzip.GzipFile(self.filename, "wb") as f:
|
---|
274 | f.write(data1 * 50)
|
---|
275 |
|
---|
276 | # Pad the file with zeroes
|
---|
277 | with open(self.filename, "ab") as f:
|
---|
278 | f.write("\x00" * 50)
|
---|
279 |
|
---|
280 | with gzip.GzipFile(self.filename, "rb") as f:
|
---|
281 | d = f.read()
|
---|
282 | self.assertEqual(d, data1 * 50, "Incorrect data in file")
|
---|
283 |
|
---|
284 | def test_fileobj_from_fdopen(self):
|
---|
285 | # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen()
|
---|
286 | # should not embed the fake filename "<fdopen>" in the output file.
|
---|
287 | fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
|
---|
288 | with os.fdopen(fd, "wb") as f:
|
---|
289 | with gzip.GzipFile(fileobj=f, mode="w") as g:
|
---|
290 | self.assertEqual(g.name, "")
|
---|
291 |
|
---|
292 | def test_read_with_extra(self):
|
---|
293 | # Gzip data with an extra field
|
---|
294 | gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
|
---|
295 | b'\x05\x00Extra'
|
---|
296 | b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
|
---|
297 | with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
|
---|
298 | self.assertEqual(f.read(), b'Test')
|
---|
299 |
|
---|
300 | def test_main(verbose=None):
|
---|
301 | test_support.run_unittest(TestGzip)
|
---|
302 |
|
---|
303 | if __name__ == "__main__":
|
---|
304 | test_main(verbose=True)
|
---|