1 | from test.test_support import (TESTFN, run_unittest, import_module, unlink,
|
---|
2 | requires, _2G, _4G)
|
---|
3 | import unittest
|
---|
4 | import os, re, itertools, socket, sys
|
---|
5 |
|
---|
6 | mmap = import_module('mmap')
|
---|
7 |
|
---|
8 | PAGESIZE = mmap.PAGESIZE
|
---|
9 |
|
---|
10 | class MmapTests(unittest.TestCase):
|
---|
11 |
|
---|
12 | def setUp(self):
|
---|
13 | if os.path.exists(TESTFN):
|
---|
14 | os.unlink(TESTFN)
|
---|
15 |
|
---|
16 | def tearDown(self):
|
---|
17 | try:
|
---|
18 | os.unlink(TESTFN)
|
---|
19 | except OSError:
|
---|
20 | pass
|
---|
21 |
|
---|
22 | def test_basic(self):
|
---|
23 | # Test mmap module on Unix systems and Windows
|
---|
24 |
|
---|
25 | # Create a file to be mmap'ed.
|
---|
26 | f = open(TESTFN, 'w+')
|
---|
27 | try:
|
---|
28 | # Write 2 pages worth of data to the file
|
---|
29 | f.write('\0'* PAGESIZE)
|
---|
30 | f.write('foo')
|
---|
31 | f.write('\0'* (PAGESIZE-3) )
|
---|
32 | f.flush()
|
---|
33 | m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
|
---|
34 | f.close()
|
---|
35 |
|
---|
36 | # Simple sanity checks
|
---|
37 |
|
---|
38 | tp = str(type(m)) # SF bug 128713: segfaulted on Linux
|
---|
39 | self.assertEqual(m.find('foo'), PAGESIZE)
|
---|
40 |
|
---|
41 | self.assertEqual(len(m), 2*PAGESIZE)
|
---|
42 |
|
---|
43 | self.assertEqual(m[0], '\0')
|
---|
44 | self.assertEqual(m[0:3], '\0\0\0')
|
---|
45 |
|
---|
46 | # Shouldn't crash on boundary (Issue #5292)
|
---|
47 | self.assertRaises(IndexError, m.__getitem__, len(m))
|
---|
48 | self.assertRaises(IndexError, m.__setitem__, len(m), '\0')
|
---|
49 |
|
---|
50 | # Modify the file's content
|
---|
51 | m[0] = '3'
|
---|
52 | m[PAGESIZE +3: PAGESIZE +3+3] = 'bar'
|
---|
53 |
|
---|
54 | # Check that the modification worked
|
---|
55 | self.assertEqual(m[0], '3')
|
---|
56 | self.assertEqual(m[0:3], '3\0\0')
|
---|
57 | self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], '\0foobar\0')
|
---|
58 |
|
---|
59 | m.flush()
|
---|
60 |
|
---|
61 | # Test doing a regular expression match in an mmap'ed file
|
---|
62 | match = re.search('[A-Za-z]+', m)
|
---|
63 | if match is None:
|
---|
64 | self.fail('regex match on mmap failed!')
|
---|
65 | else:
|
---|
66 | start, end = match.span(0)
|
---|
67 | length = end - start
|
---|
68 |
|
---|
69 | self.assertEqual(start, PAGESIZE)
|
---|
70 | self.assertEqual(end, PAGESIZE + 6)
|
---|
71 |
|
---|
72 | # test seeking around (try to overflow the seek implementation)
|
---|
73 | m.seek(0,0)
|
---|
74 | self.assertEqual(m.tell(), 0)
|
---|
75 | m.seek(42,1)
|
---|
76 | self.assertEqual(m.tell(), 42)
|
---|
77 | m.seek(0,2)
|
---|
78 | self.assertEqual(m.tell(), len(m))
|
---|
79 |
|
---|
80 | # Try to seek to negative position...
|
---|
81 | self.assertRaises(ValueError, m.seek, -1)
|
---|
82 |
|
---|
83 | # Try to seek beyond end of mmap...
|
---|
84 | self.assertRaises(ValueError, m.seek, 1, 2)
|
---|
85 |
|
---|
86 | # Try to seek to negative position...
|
---|
87 | self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
|
---|
88 |
|
---|
89 | # Try resizing map
|
---|
90 | try:
|
---|
91 | m.resize(512)
|
---|
92 | except SystemError:
|
---|
93 | # resize() not supported
|
---|
94 | # No messages are printed, since the output of this test suite
|
---|
95 | # would then be different across platforms.
|
---|
96 | pass
|
---|
97 | else:
|
---|
98 | # resize() is supported
|
---|
99 | self.assertEqual(len(m), 512)
|
---|
100 | # Check that we can no longer seek beyond the new size.
|
---|
101 | self.assertRaises(ValueError, m.seek, 513, 0)
|
---|
102 |
|
---|
103 | # Check that the underlying file is truncated too
|
---|
104 | # (bug #728515)
|
---|
105 | f = open(TESTFN)
|
---|
106 | f.seek(0, 2)
|
---|
107 | self.assertEqual(f.tell(), 512)
|
---|
108 | f.close()
|
---|
109 | self.assertEqual(m.size(), 512)
|
---|
110 |
|
---|
111 | m.close()
|
---|
112 |
|
---|
113 | finally:
|
---|
114 | try:
|
---|
115 | f.close()
|
---|
116 | except OSError:
|
---|
117 | pass
|
---|
118 |
|
---|
119 | def test_access_parameter(self):
|
---|
120 | # Test for "access" keyword parameter
|
---|
121 | mapsize = 10
|
---|
122 | open(TESTFN, "wb").write("a"*mapsize)
|
---|
123 | f = open(TESTFN, "rb")
|
---|
124 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
|
---|
125 | self.assertEqual(m[:], 'a'*mapsize, "Readonly memory map data incorrect.")
|
---|
126 |
|
---|
127 | # Ensuring that readonly mmap can't be slice assigned
|
---|
128 | try:
|
---|
129 | m[:] = 'b'*mapsize
|
---|
130 | except TypeError:
|
---|
131 | pass
|
---|
132 | else:
|
---|
133 | self.fail("Able to write to readonly memory map")
|
---|
134 |
|
---|
135 | # Ensuring that readonly mmap can't be item assigned
|
---|
136 | try:
|
---|
137 | m[0] = 'b'
|
---|
138 | except TypeError:
|
---|
139 | pass
|
---|
140 | else:
|
---|
141 | self.fail("Able to write to readonly memory map")
|
---|
142 |
|
---|
143 | # Ensuring that readonly mmap can't be write() to
|
---|
144 | try:
|
---|
145 | m.seek(0,0)
|
---|
146 | m.write('abc')
|
---|
147 | except TypeError:
|
---|
148 | pass
|
---|
149 | else:
|
---|
150 | self.fail("Able to write to readonly memory map")
|
---|
151 |
|
---|
152 | # Ensuring that readonly mmap can't be write_byte() to
|
---|
153 | try:
|
---|
154 | m.seek(0,0)
|
---|
155 | m.write_byte('d')
|
---|
156 | except TypeError:
|
---|
157 | pass
|
---|
158 | else:
|
---|
159 | self.fail("Able to write to readonly memory map")
|
---|
160 |
|
---|
161 | # Ensuring that readonly mmap can't be resized
|
---|
162 | try:
|
---|
163 | m.resize(2*mapsize)
|
---|
164 | except SystemError: # resize is not universally supported
|
---|
165 | pass
|
---|
166 | except TypeError:
|
---|
167 | pass
|
---|
168 | else:
|
---|
169 | self.fail("Able to resize readonly memory map")
|
---|
170 | f.close()
|
---|
171 | del m, f
|
---|
172 | self.assertEqual(open(TESTFN, "rb").read(), 'a'*mapsize,
|
---|
173 | "Readonly memory map data file was modified")
|
---|
174 |
|
---|
175 | # Opening mmap with size too big
|
---|
176 | import sys
|
---|
177 | f = open(TESTFN, "r+b")
|
---|
178 | try:
|
---|
179 | m = mmap.mmap(f.fileno(), mapsize+1)
|
---|
180 | except ValueError:
|
---|
181 | # we do not expect a ValueError on Windows
|
---|
182 | # CAUTION: This also changes the size of the file on disk, and
|
---|
183 | # later tests assume that the length hasn't changed. We need to
|
---|
184 | # repair that.
|
---|
185 | if sys.platform.startswith('win'):
|
---|
186 | self.fail("Opening mmap with size+1 should work on Windows.")
|
---|
187 | else:
|
---|
188 | # we expect a ValueError on Unix, but not on Windows
|
---|
189 | if not sys.platform.startswith('win'):
|
---|
190 | self.fail("Opening mmap with size+1 should raise ValueError.")
|
---|
191 | m.close()
|
---|
192 | f.close()
|
---|
193 | if sys.platform.startswith('win'):
|
---|
194 | # Repair damage from the resizing test.
|
---|
195 | f = open(TESTFN, 'r+b')
|
---|
196 | f.truncate(mapsize)
|
---|
197 | f.close()
|
---|
198 |
|
---|
199 | # Opening mmap with access=ACCESS_WRITE
|
---|
200 | f = open(TESTFN, "r+b")
|
---|
201 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
|
---|
202 | # Modifying write-through memory map
|
---|
203 | m[:] = 'c'*mapsize
|
---|
204 | self.assertEqual(m[:], 'c'*mapsize,
|
---|
205 | "Write-through memory map memory not updated properly.")
|
---|
206 | m.flush()
|
---|
207 | m.close()
|
---|
208 | f.close()
|
---|
209 | f = open(TESTFN, 'rb')
|
---|
210 | stuff = f.read()
|
---|
211 | f.close()
|
---|
212 | self.assertEqual(stuff, 'c'*mapsize,
|
---|
213 | "Write-through memory map data file not updated properly.")
|
---|
214 |
|
---|
215 | # Opening mmap with access=ACCESS_COPY
|
---|
216 | f = open(TESTFN, "r+b")
|
---|
217 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
|
---|
218 | # Modifying copy-on-write memory map
|
---|
219 | m[:] = 'd'*mapsize
|
---|
220 | self.assertEqual(m[:], 'd' * mapsize,
|
---|
221 | "Copy-on-write memory map data not written correctly.")
|
---|
222 | m.flush()
|
---|
223 | self.assertEqual(open(TESTFN, "rb").read(), 'c'*mapsize,
|
---|
224 | "Copy-on-write test data file should not be modified.")
|
---|
225 | # Ensuring copy-on-write maps cannot be resized
|
---|
226 | self.assertRaises(TypeError, m.resize, 2*mapsize)
|
---|
227 | f.close()
|
---|
228 | del m, f
|
---|
229 |
|
---|
230 | # Ensuring invalid access parameter raises exception
|
---|
231 | f = open(TESTFN, "r+b")
|
---|
232 | self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
|
---|
233 | f.close()
|
---|
234 |
|
---|
235 | if os.name == "posix":
|
---|
236 | # Try incompatible flags, prot and access parameters.
|
---|
237 | f = open(TESTFN, "r+b")
|
---|
238 | self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
|
---|
239 | flags=mmap.MAP_PRIVATE,
|
---|
240 | prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
|
---|
241 | f.close()
|
---|
242 |
|
---|
243 | # Try writing with PROT_EXEC and without PROT_WRITE
|
---|
244 | prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
|
---|
245 | with open(TESTFN, "r+b") as f:
|
---|
246 | m = mmap.mmap(f.fileno(), mapsize, prot=prot)
|
---|
247 | self.assertRaises(TypeError, m.write, b"abcdef")
|
---|
248 | self.assertRaises(TypeError, m.write_byte, 0)
|
---|
249 | m.close()
|
---|
250 |
|
---|
251 | def test_bad_file_desc(self):
|
---|
252 | # Try opening a bad file descriptor...
|
---|
253 | self.assertRaises(mmap.error, mmap.mmap, -2, 4096)
|
---|
254 |
|
---|
255 | def test_tougher_find(self):
|
---|
256 | # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2,
|
---|
257 | # searching for data with embedded \0 bytes didn't work.
|
---|
258 | f = open(TESTFN, 'w+')
|
---|
259 |
|
---|
260 | data = 'aabaac\x00deef\x00\x00aa\x00'
|
---|
261 | n = len(data)
|
---|
262 | f.write(data)
|
---|
263 | f.flush()
|
---|
264 | m = mmap.mmap(f.fileno(), n)
|
---|
265 | f.close()
|
---|
266 |
|
---|
267 | for start in range(n+1):
|
---|
268 | for finish in range(start, n+1):
|
---|
269 | slice = data[start : finish]
|
---|
270 | self.assertEqual(m.find(slice), data.find(slice))
|
---|
271 | self.assertEqual(m.find(slice + 'x'), -1)
|
---|
272 | m.close()
|
---|
273 |
|
---|
274 | def test_find_end(self):
|
---|
275 | # test the new 'end' parameter works as expected
|
---|
276 | f = open(TESTFN, 'w+')
|
---|
277 | data = 'one two ones'
|
---|
278 | n = len(data)
|
---|
279 | f.write(data)
|
---|
280 | f.flush()
|
---|
281 | m = mmap.mmap(f.fileno(), n)
|
---|
282 | f.close()
|
---|
283 |
|
---|
284 | self.assertEqual(m.find('one'), 0)
|
---|
285 | self.assertEqual(m.find('ones'), 8)
|
---|
286 | self.assertEqual(m.find('one', 0, -1), 0)
|
---|
287 | self.assertEqual(m.find('one', 1), 8)
|
---|
288 | self.assertEqual(m.find('one', 1, -1), 8)
|
---|
289 | self.assertEqual(m.find('one', 1, -2), -1)
|
---|
290 |
|
---|
291 |
|
---|
292 | def test_rfind(self):
|
---|
293 | # test the new 'end' parameter works as expected
|
---|
294 | f = open(TESTFN, 'w+')
|
---|
295 | data = 'one two ones'
|
---|
296 | n = len(data)
|
---|
297 | f.write(data)
|
---|
298 | f.flush()
|
---|
299 | m = mmap.mmap(f.fileno(), n)
|
---|
300 | f.close()
|
---|
301 |
|
---|
302 | self.assertEqual(m.rfind('one'), 8)
|
---|
303 | self.assertEqual(m.rfind('one '), 0)
|
---|
304 | self.assertEqual(m.rfind('one', 0, -1), 8)
|
---|
305 | self.assertEqual(m.rfind('one', 0, -2), 0)
|
---|
306 | self.assertEqual(m.rfind('one', 1, -1), 8)
|
---|
307 | self.assertEqual(m.rfind('one', 1, -2), -1)
|
---|
308 |
|
---|
309 |
|
---|
310 | def test_double_close(self):
|
---|
311 | # make sure a double close doesn't crash on Solaris (Bug# 665913)
|
---|
312 | f = open(TESTFN, 'w+')
|
---|
313 |
|
---|
314 | f.write(2**16 * 'a') # Arbitrary character
|
---|
315 | f.close()
|
---|
316 |
|
---|
317 | f = open(TESTFN)
|
---|
318 | mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
|
---|
319 | mf.close()
|
---|
320 | mf.close()
|
---|
321 | f.close()
|
---|
322 |
|
---|
323 | def test_entire_file(self):
|
---|
324 | # test mapping of entire file by passing 0 for map length
|
---|
325 | if hasattr(os, "stat"):
|
---|
326 | f = open(TESTFN, "w+")
|
---|
327 |
|
---|
328 | f.write(2**16 * 'm') # Arbitrary character
|
---|
329 | f.close()
|
---|
330 |
|
---|
331 | f = open(TESTFN, "rb+")
|
---|
332 | mf = mmap.mmap(f.fileno(), 0)
|
---|
333 | self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
|
---|
334 | self.assertEqual(mf.read(2**16), 2**16 * "m")
|
---|
335 | mf.close()
|
---|
336 | f.close()
|
---|
337 |
|
---|
338 | def test_length_0_offset(self):
|
---|
339 | # Issue #10916: test mapping of remainder of file by passing 0 for
|
---|
340 | # map length with an offset doesn't cause a segfault.
|
---|
341 | if not hasattr(os, "stat"):
|
---|
342 | self.skipTest("needs os.stat")
|
---|
343 | # NOTE: allocation granularity is currently 65536 under Win64,
|
---|
344 | # and therefore the minimum offset alignment.
|
---|
345 | with open(TESTFN, "wb") as f:
|
---|
346 | f.write((65536 * 2) * b'm') # Arbitrary character
|
---|
347 |
|
---|
348 | with open(TESTFN, "rb") as f:
|
---|
349 | mf = mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ)
|
---|
350 | try:
|
---|
351 | self.assertRaises(IndexError, mf.__getitem__, 80000)
|
---|
352 | finally:
|
---|
353 | mf.close()
|
---|
354 |
|
---|
355 | def test_length_0_large_offset(self):
|
---|
356 | # Issue #10959: test mapping of a file by passing 0 for
|
---|
357 | # map length with a large offset doesn't cause a segfault.
|
---|
358 | if not hasattr(os, "stat"):
|
---|
359 | self.skipTest("needs os.stat")
|
---|
360 |
|
---|
361 | with open(TESTFN, "wb") as f:
|
---|
362 | f.write(115699 * b'm') # Arbitrary character
|
---|
363 |
|
---|
364 | with open(TESTFN, "w+b") as f:
|
---|
365 | self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
|
---|
366 | offset=2147418112)
|
---|
367 |
|
---|
368 | def test_move(self):
|
---|
369 | # make move works everywhere (64-bit format problem earlier)
|
---|
370 | f = open(TESTFN, 'w+')
|
---|
371 |
|
---|
372 | f.write("ABCDEabcde") # Arbitrary character
|
---|
373 | f.flush()
|
---|
374 |
|
---|
375 | mf = mmap.mmap(f.fileno(), 10)
|
---|
376 | mf.move(5, 0, 5)
|
---|
377 | self.assertEqual(mf[:], "ABCDEABCDE", "Map move should have duplicated front 5")
|
---|
378 | mf.close()
|
---|
379 | f.close()
|
---|
380 |
|
---|
381 | # more excessive test
|
---|
382 | data = "0123456789"
|
---|
383 | for dest in range(len(data)):
|
---|
384 | for src in range(len(data)):
|
---|
385 | for count in range(len(data) - max(dest, src)):
|
---|
386 | expected = data[:dest] + data[src:src+count] + data[dest+count:]
|
---|
387 | m = mmap.mmap(-1, len(data))
|
---|
388 | m[:] = data
|
---|
389 | m.move(dest, src, count)
|
---|
390 | self.assertEqual(m[:], expected)
|
---|
391 | m.close()
|
---|
392 |
|
---|
393 | # segfault test (Issue 5387)
|
---|
394 | m = mmap.mmap(-1, 100)
|
---|
395 | offsets = [-100, -1, 0, 1, 100]
|
---|
396 | for source, dest, size in itertools.product(offsets, offsets, offsets):
|
---|
397 | try:
|
---|
398 | m.move(source, dest, size)
|
---|
399 | except ValueError:
|
---|
400 | pass
|
---|
401 |
|
---|
402 | offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
|
---|
403 | (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
|
---|
404 | for source, dest, size in offsets:
|
---|
405 | self.assertRaises(ValueError, m.move, source, dest, size)
|
---|
406 |
|
---|
407 | m.close()
|
---|
408 |
|
---|
409 | m = mmap.mmap(-1, 1) # single byte
|
---|
410 | self.assertRaises(ValueError, m.move, 0, 0, 2)
|
---|
411 | self.assertRaises(ValueError, m.move, 1, 0, 1)
|
---|
412 | self.assertRaises(ValueError, m.move, 0, 1, 1)
|
---|
413 | m.move(0, 0, 1)
|
---|
414 | m.move(0, 0, 0)
|
---|
415 |
|
---|
416 |
|
---|
417 | def test_anonymous(self):
|
---|
418 | # anonymous mmap.mmap(-1, PAGE)
|
---|
419 | m = mmap.mmap(-1, PAGESIZE)
|
---|
420 | for x in xrange(PAGESIZE):
|
---|
421 | self.assertEqual(m[x], '\0', "anonymously mmap'ed contents should be zero")
|
---|
422 |
|
---|
423 | for x in xrange(PAGESIZE):
|
---|
424 | m[x] = ch = chr(x & 255)
|
---|
425 | self.assertEqual(m[x], ch)
|
---|
426 |
|
---|
427 | def test_extended_getslice(self):
|
---|
428 | # Test extended slicing by comparing with list slicing.
|
---|
429 | s = "".join(chr(c) for c in reversed(range(256)))
|
---|
430 | m = mmap.mmap(-1, len(s))
|
---|
431 | m[:] = s
|
---|
432 | self.assertEqual(m[:], s)
|
---|
433 | indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
|
---|
434 | for start in indices:
|
---|
435 | for stop in indices:
|
---|
436 | # Skip step 0 (invalid)
|
---|
437 | for step in indices[1:]:
|
---|
438 | self.assertEqual(m[start:stop:step],
|
---|
439 | s[start:stop:step])
|
---|
440 |
|
---|
441 | def test_extended_set_del_slice(self):
|
---|
442 | # Test extended slicing by comparing with list slicing.
|
---|
443 | s = "".join(chr(c) for c in reversed(range(256)))
|
---|
444 | m = mmap.mmap(-1, len(s))
|
---|
445 | indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
|
---|
446 | for start in indices:
|
---|
447 | for stop in indices:
|
---|
448 | # Skip invalid step 0
|
---|
449 | for step in indices[1:]:
|
---|
450 | m[:] = s
|
---|
451 | self.assertEqual(m[:], s)
|
---|
452 | L = list(s)
|
---|
453 | # Make sure we have a slice of exactly the right length,
|
---|
454 | # but with different data.
|
---|
455 | data = L[start:stop:step]
|
---|
456 | data = "".join(reversed(data))
|
---|
457 | L[start:stop:step] = data
|
---|
458 | m[start:stop:step] = data
|
---|
459 | self.assertEqual(m[:], "".join(L))
|
---|
460 |
|
---|
461 | def make_mmap_file (self, f, halfsize):
|
---|
462 | # Write 2 pages worth of data to the file
|
---|
463 | f.write ('\0' * halfsize)
|
---|
464 | f.write ('foo')
|
---|
465 | f.write ('\0' * (halfsize - 3))
|
---|
466 | f.flush ()
|
---|
467 | return mmap.mmap (f.fileno(), 0)
|
---|
468 |
|
---|
469 | def test_empty_file (self):
|
---|
470 | f = open (TESTFN, 'w+b')
|
---|
471 | f.close()
|
---|
472 | with open(TESTFN, "rb") as f :
|
---|
473 | self.assertRaisesRegexp(ValueError,
|
---|
474 | "cannot mmap an empty file",
|
---|
475 | mmap.mmap, f.fileno(), 0,
|
---|
476 | access=mmap.ACCESS_READ)
|
---|
477 |
|
---|
478 | def test_offset (self):
|
---|
479 | f = open (TESTFN, 'w+b')
|
---|
480 |
|
---|
481 | try: # unlink TESTFN no matter what
|
---|
482 | halfsize = mmap.ALLOCATIONGRANULARITY
|
---|
483 | m = self.make_mmap_file (f, halfsize)
|
---|
484 | m.close ()
|
---|
485 | f.close ()
|
---|
486 |
|
---|
487 | mapsize = halfsize * 2
|
---|
488 | # Try invalid offset
|
---|
489 | f = open(TESTFN, "r+b")
|
---|
490 | for offset in [-2, -1, None]:
|
---|
491 | try:
|
---|
492 | m = mmap.mmap(f.fileno(), mapsize, offset=offset)
|
---|
493 | self.assertEqual(0, 1)
|
---|
494 | except (ValueError, TypeError, OverflowError):
|
---|
495 | pass
|
---|
496 | else:
|
---|
497 | self.assertEqual(0, 0)
|
---|
498 | f.close()
|
---|
499 |
|
---|
500 | # Try valid offset, hopefully 8192 works on all OSes
|
---|
501 | f = open(TESTFN, "r+b")
|
---|
502 | m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
|
---|
503 | self.assertEqual(m[0:3], 'foo')
|
---|
504 | f.close()
|
---|
505 |
|
---|
506 | # Try resizing map
|
---|
507 | try:
|
---|
508 | m.resize(512)
|
---|
509 | except SystemError:
|
---|
510 | pass
|
---|
511 | else:
|
---|
512 | # resize() is supported
|
---|
513 | self.assertEqual(len(m), 512)
|
---|
514 | # Check that we can no longer seek beyond the new size.
|
---|
515 | self.assertRaises(ValueError, m.seek, 513, 0)
|
---|
516 | # Check that the content is not changed
|
---|
517 | self.assertEqual(m[0:3], 'foo')
|
---|
518 |
|
---|
519 | # Check that the underlying file is truncated too
|
---|
520 | f = open(TESTFN)
|
---|
521 | f.seek(0, 2)
|
---|
522 | self.assertEqual(f.tell(), halfsize + 512)
|
---|
523 | f.close()
|
---|
524 | self.assertEqual(m.size(), halfsize + 512)
|
---|
525 |
|
---|
526 | m.close()
|
---|
527 |
|
---|
528 | finally:
|
---|
529 | f.close()
|
---|
530 | try:
|
---|
531 | os.unlink(TESTFN)
|
---|
532 | except OSError:
|
---|
533 | pass
|
---|
534 |
|
---|
535 | def test_subclass(self):
|
---|
536 | class anon_mmap(mmap.mmap):
|
---|
537 | def __new__(klass, *args, **kwargs):
|
---|
538 | return mmap.mmap.__new__(klass, -1, *args, **kwargs)
|
---|
539 | anon_mmap(PAGESIZE)
|
---|
540 |
|
---|
541 | def test_prot_readonly(self):
|
---|
542 | if not hasattr(mmap, 'PROT_READ'):
|
---|
543 | return
|
---|
544 | mapsize = 10
|
---|
545 | open(TESTFN, "wb").write("a"*mapsize)
|
---|
546 | f = open(TESTFN, "rb")
|
---|
547 | m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
|
---|
548 | self.assertRaises(TypeError, m.write, "foo")
|
---|
549 | f.close()
|
---|
550 |
|
---|
551 | def test_error(self):
|
---|
552 | self.assertTrue(issubclass(mmap.error, EnvironmentError))
|
---|
553 | self.assertIn("mmap.error", str(mmap.error))
|
---|
554 |
|
---|
555 | def test_io_methods(self):
|
---|
556 | data = "0123456789"
|
---|
557 | open(TESTFN, "wb").write("x"*len(data))
|
---|
558 | f = open(TESTFN, "r+b")
|
---|
559 | m = mmap.mmap(f.fileno(), len(data))
|
---|
560 | f.close()
|
---|
561 | # Test write_byte()
|
---|
562 | for i in xrange(len(data)):
|
---|
563 | self.assertEqual(m.tell(), i)
|
---|
564 | m.write_byte(data[i])
|
---|
565 | self.assertEqual(m.tell(), i+1)
|
---|
566 | self.assertRaises(ValueError, m.write_byte, "x")
|
---|
567 | self.assertEqual(m[:], data)
|
---|
568 | # Test read_byte()
|
---|
569 | m.seek(0)
|
---|
570 | for i in xrange(len(data)):
|
---|
571 | self.assertEqual(m.tell(), i)
|
---|
572 | self.assertEqual(m.read_byte(), data[i])
|
---|
573 | self.assertEqual(m.tell(), i+1)
|
---|
574 | self.assertRaises(ValueError, m.read_byte)
|
---|
575 | # Test read()
|
---|
576 | m.seek(3)
|
---|
577 | self.assertEqual(m.read(3), "345")
|
---|
578 | self.assertEqual(m.tell(), 6)
|
---|
579 | # Test write()
|
---|
580 | m.seek(3)
|
---|
581 | m.write("bar")
|
---|
582 | self.assertEqual(m.tell(), 6)
|
---|
583 | self.assertEqual(m[:], "012bar6789")
|
---|
584 | m.seek(8)
|
---|
585 | self.assertRaises(ValueError, m.write, "bar")
|
---|
586 |
|
---|
587 | if os.name == 'nt':
|
---|
588 | def test_tagname(self):
|
---|
589 | data1 = "0123456789"
|
---|
590 | data2 = "abcdefghij"
|
---|
591 | assert len(data1) == len(data2)
|
---|
592 |
|
---|
593 | # Test same tag
|
---|
594 | m1 = mmap.mmap(-1, len(data1), tagname="foo")
|
---|
595 | m1[:] = data1
|
---|
596 | m2 = mmap.mmap(-1, len(data2), tagname="foo")
|
---|
597 | m2[:] = data2
|
---|
598 | self.assertEqual(m1[:], data2)
|
---|
599 | self.assertEqual(m2[:], data2)
|
---|
600 | m2.close()
|
---|
601 | m1.close()
|
---|
602 |
|
---|
603 | # Test different tag
|
---|
604 | m1 = mmap.mmap(-1, len(data1), tagname="foo")
|
---|
605 | m1[:] = data1
|
---|
606 | m2 = mmap.mmap(-1, len(data2), tagname="boo")
|
---|
607 | m2[:] = data2
|
---|
608 | self.assertEqual(m1[:], data1)
|
---|
609 | self.assertEqual(m2[:], data2)
|
---|
610 | m2.close()
|
---|
611 | m1.close()
|
---|
612 |
|
---|
613 | def test_crasher_on_windows(self):
|
---|
614 | # Should not crash (Issue 1733986)
|
---|
615 | m = mmap.mmap(-1, 1000, tagname="foo")
|
---|
616 | try:
|
---|
617 | mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
|
---|
618 | except:
|
---|
619 | pass
|
---|
620 | m.close()
|
---|
621 |
|
---|
622 | # Should not crash (Issue 5385)
|
---|
623 | open(TESTFN, "wb").write("x"*10)
|
---|
624 | f = open(TESTFN, "r+b")
|
---|
625 | m = mmap.mmap(f.fileno(), 0)
|
---|
626 | f.close()
|
---|
627 | try:
|
---|
628 | m.resize(0) # will raise WindowsError
|
---|
629 | except:
|
---|
630 | pass
|
---|
631 | try:
|
---|
632 | m[:]
|
---|
633 | except:
|
---|
634 | pass
|
---|
635 | m.close()
|
---|
636 |
|
---|
637 | def test_invalid_descriptor(self):
|
---|
638 | # socket file descriptors are valid, but out of range
|
---|
639 | # for _get_osfhandle, causing a crash when validating the
|
---|
640 | # parameters to _get_osfhandle.
|
---|
641 | s = socket.socket()
|
---|
642 | try:
|
---|
643 | with self.assertRaises(mmap.error):
|
---|
644 | m = mmap.mmap(s.fileno(), 10)
|
---|
645 | finally:
|
---|
646 | s.close()
|
---|
647 |
|
---|
648 |
|
---|
649 | class LargeMmapTests(unittest.TestCase):
|
---|
650 |
|
---|
651 | def setUp(self):
|
---|
652 | unlink(TESTFN)
|
---|
653 |
|
---|
654 | def tearDown(self):
|
---|
655 | unlink(TESTFN)
|
---|
656 |
|
---|
657 | def _make_test_file(self, num_zeroes, tail):
|
---|
658 | if sys.platform[:3] == 'win' or sys.platform == 'darwin':
|
---|
659 | requires('largefile',
|
---|
660 | 'test requires %s bytes and a long time to run' % str(0x180000000))
|
---|
661 | f = open(TESTFN, 'w+b')
|
---|
662 | try:
|
---|
663 | f.seek(num_zeroes)
|
---|
664 | f.write(tail)
|
---|
665 | f.flush()
|
---|
666 | except (IOError, OverflowError):
|
---|
667 | f.close()
|
---|
668 | raise unittest.SkipTest("filesystem does not have largefile support")
|
---|
669 | return f
|
---|
670 |
|
---|
671 | def test_large_offset(self):
|
---|
672 | with self._make_test_file(0x14FFFFFFF, b" ") as f:
|
---|
673 | m = mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ)
|
---|
674 | try:
|
---|
675 | self.assertEqual(m[0xFFFFFFF], b" ")
|
---|
676 | finally:
|
---|
677 | m.close()
|
---|
678 |
|
---|
679 | def test_large_filesize(self):
|
---|
680 | with self._make_test_file(0x17FFFFFFF, b" ") as f:
|
---|
681 | if sys.maxsize < 0x180000000:
|
---|
682 | # On 32 bit platforms the file is larger than sys.maxsize so
|
---|
683 | # mapping the whole file should fail -- Issue #16743
|
---|
684 | with self.assertRaises(OverflowError):
|
---|
685 | mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
|
---|
686 | with self.assertRaises(ValueError):
|
---|
687 | mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
---|
688 | m = mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ)
|
---|
689 | try:
|
---|
690 | self.assertEqual(m.size(), 0x180000000)
|
---|
691 | finally:
|
---|
692 | m.close()
|
---|
693 |
|
---|
694 | # Issue 11277: mmap() with large (~4GB) sparse files crashes on OS X.
|
---|
695 |
|
---|
696 | def _test_around_boundary(self, boundary):
|
---|
697 | tail = b' DEARdear '
|
---|
698 | start = boundary - len(tail) // 2
|
---|
699 | end = start + len(tail)
|
---|
700 | with self._make_test_file(start, tail) as f:
|
---|
701 | m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
---|
702 | try:
|
---|
703 | self.assertEqual(m[start:end], tail)
|
---|
704 | finally:
|
---|
705 | m.close()
|
---|
706 |
|
---|
707 | @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
|
---|
708 | def test_around_2GB(self):
|
---|
709 | self._test_around_boundary(_2G)
|
---|
710 |
|
---|
711 | @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
|
---|
712 | def test_around_4GB(self):
|
---|
713 | self._test_around_boundary(_4G)
|
---|
714 |
|
---|
715 |
|
---|
716 | def test_main():
|
---|
717 | run_unittest(MmapTests, LargeMmapTests)
|
---|
718 |
|
---|
719 | if __name__ == '__main__':
|
---|
720 | test_main()
|
---|