1 | # NOTE: this file tests the new `io` library backported from Python 3.x.
|
---|
2 | # Similar tests for the builtin file object can be found in test_file2k.py.
|
---|
3 |
|
---|
4 | from __future__ import print_function
|
---|
5 |
|
---|
6 | import sys
|
---|
7 | import os
|
---|
8 | import unittest
|
---|
9 | from array import array
|
---|
10 | from weakref import proxy
|
---|
11 |
|
---|
12 | import io
|
---|
13 | import _pyio as pyio
|
---|
14 |
|
---|
15 | from test.test_support import TESTFN, run_unittest
|
---|
16 | from UserList import UserList
|
---|
17 |
|
---|
18 | class AutoFileTests(unittest.TestCase):
|
---|
19 | # file tests for which a test file is automatically set up
|
---|
20 |
|
---|
21 | def setUp(self):
|
---|
22 | self.f = self.open(TESTFN, 'wb')
|
---|
23 |
|
---|
24 | def tearDown(self):
|
---|
25 | if self.f:
|
---|
26 | self.f.close()
|
---|
27 | os.remove(TESTFN)
|
---|
28 |
|
---|
29 | def testWeakRefs(self):
|
---|
30 | # verify weak references
|
---|
31 | p = proxy(self.f)
|
---|
32 | p.write(b'teststring')
|
---|
33 | self.assertEqual(self.f.tell(), p.tell())
|
---|
34 | self.f.close()
|
---|
35 | self.f = None
|
---|
36 | self.assertRaises(ReferenceError, getattr, p, 'tell')
|
---|
37 |
|
---|
38 | def testAttributes(self):
|
---|
39 | # verify expected attributes exist
|
---|
40 | f = self.f
|
---|
41 | f.name # merely shouldn't blow up
|
---|
42 | f.mode # ditto
|
---|
43 | f.closed # ditto
|
---|
44 |
|
---|
45 | def testReadinto(self):
|
---|
46 | # verify readinto
|
---|
47 | self.f.write(b'12')
|
---|
48 | self.f.close()
|
---|
49 | a = array('b', b'x'*10)
|
---|
50 | self.f = self.open(TESTFN, 'rb')
|
---|
51 | n = self.f.readinto(a)
|
---|
52 | self.assertEqual(b'12', a.tostring()[:n])
|
---|
53 |
|
---|
54 | def testReadinto_text(self):
|
---|
55 | # verify readinto refuses text files
|
---|
56 | a = array('b', b'x'*10)
|
---|
57 | self.f.close()
|
---|
58 | self.f = self.open(TESTFN, 'r')
|
---|
59 | if hasattr(self.f, "readinto"):
|
---|
60 | self.assertRaises(TypeError, self.f.readinto, a)
|
---|
61 |
|
---|
62 | def testWritelinesUserList(self):
|
---|
63 | # verify writelines with instance sequence
|
---|
64 | l = UserList([b'1', b'2'])
|
---|
65 | self.f.writelines(l)
|
---|
66 | self.f.close()
|
---|
67 | self.f = self.open(TESTFN, 'rb')
|
---|
68 | buf = self.f.read()
|
---|
69 | self.assertEqual(buf, b'12')
|
---|
70 |
|
---|
71 | def testWritelinesIntegers(self):
|
---|
72 | # verify writelines with integers
|
---|
73 | self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
|
---|
74 |
|
---|
75 | def testWritelinesIntegersUserList(self):
|
---|
76 | # verify writelines with integers in UserList
|
---|
77 | l = UserList([1,2,3])
|
---|
78 | self.assertRaises(TypeError, self.f.writelines, l)
|
---|
79 |
|
---|
80 | def testWritelinesNonString(self):
|
---|
81 | # verify writelines with non-string object
|
---|
82 | class NonString:
|
---|
83 | pass
|
---|
84 |
|
---|
85 | self.assertRaises(TypeError, self.f.writelines,
|
---|
86 | [NonString(), NonString()])
|
---|
87 |
|
---|
88 | def testErrors(self):
|
---|
89 | f = self.f
|
---|
90 | self.assertEqual(f.name, TESTFN)
|
---|
91 | self.assertTrue(not f.isatty())
|
---|
92 | self.assertTrue(not f.closed)
|
---|
93 |
|
---|
94 | if hasattr(f, "readinto"):
|
---|
95 | self.assertRaises((IOError, TypeError), f.readinto, "")
|
---|
96 | f.close()
|
---|
97 | self.assertTrue(f.closed)
|
---|
98 |
|
---|
99 | def testMethods(self):
|
---|
100 | methods = [('fileno', ()),
|
---|
101 | ('flush', ()),
|
---|
102 | ('isatty', ()),
|
---|
103 | ('next', ()),
|
---|
104 | ('read', ()),
|
---|
105 | ('write', (b"",)),
|
---|
106 | ('readline', ()),
|
---|
107 | ('readlines', ()),
|
---|
108 | ('seek', (0,)),
|
---|
109 | ('tell', ()),
|
---|
110 | ('write', (b"",)),
|
---|
111 | ('writelines', ([],)),
|
---|
112 | ('__iter__', ()),
|
---|
113 | ]
|
---|
114 | if not sys.platform.startswith('atheos'):
|
---|
115 | methods.append(('truncate', ()))
|
---|
116 |
|
---|
117 | # __exit__ should close the file
|
---|
118 | self.f.__exit__(None, None, None)
|
---|
119 | self.assertTrue(self.f.closed)
|
---|
120 |
|
---|
121 | for methodname, args in methods:
|
---|
122 | method = getattr(self.f, methodname)
|
---|
123 | # should raise on closed file
|
---|
124 | self.assertRaises(ValueError, method, *args)
|
---|
125 |
|
---|
126 | # file is closed, __exit__ shouldn't do anything
|
---|
127 | self.assertEqual(self.f.__exit__(None, None, None), None)
|
---|
128 | # it must also return None if an exception was given
|
---|
129 | try:
|
---|
130 | 1 // 0
|
---|
131 | except:
|
---|
132 | self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
|
---|
133 |
|
---|
134 | def testReadWhenWriting(self):
|
---|
135 | self.assertRaises(IOError, self.f.read)
|
---|
136 |
|
---|
137 | class CAutoFileTests(AutoFileTests):
|
---|
138 | open = io.open
|
---|
139 |
|
---|
140 | class PyAutoFileTests(AutoFileTests):
|
---|
141 | open = staticmethod(pyio.open)
|
---|
142 |
|
---|
143 |
|
---|
144 | class OtherFileTests(unittest.TestCase):
|
---|
145 |
|
---|
146 | def testModeStrings(self):
|
---|
147 | # check invalid mode strings
|
---|
148 | for mode in ("", "aU", "wU+"):
|
---|
149 | try:
|
---|
150 | f = self.open(TESTFN, mode)
|
---|
151 | except ValueError:
|
---|
152 | pass
|
---|
153 | else:
|
---|
154 | f.close()
|
---|
155 | self.fail('%r is an invalid file mode' % mode)
|
---|
156 |
|
---|
157 | def testBadModeArgument(self):
|
---|
158 | # verify that we get a sensible error message for bad mode argument
|
---|
159 | bad_mode = "qwerty"
|
---|
160 | try:
|
---|
161 | f = self.open(TESTFN, bad_mode)
|
---|
162 | except ValueError as msg:
|
---|
163 | if msg.args[0] != 0:
|
---|
164 | s = str(msg)
|
---|
165 | if TESTFN in s or bad_mode not in s:
|
---|
166 | self.fail("bad error message for invalid mode: %s" % s)
|
---|
167 | # if msg.args[0] == 0, we're probably on Windows where there may be
|
---|
168 | # no obvious way to discover why open() failed.
|
---|
169 | else:
|
---|
170 | f.close()
|
---|
171 | self.fail("no error for invalid mode: %s" % bad_mode)
|
---|
172 |
|
---|
173 | def testSetBufferSize(self):
|
---|
174 | # make sure that explicitly setting the buffer size doesn't cause
|
---|
175 | # misbehaviour especially with repeated close() calls
|
---|
176 | for s in (-1, 0, 1, 512):
|
---|
177 | try:
|
---|
178 | f = self.open(TESTFN, 'wb', s)
|
---|
179 | f.write(str(s).encode("ascii"))
|
---|
180 | f.close()
|
---|
181 | f.close()
|
---|
182 | f = self.open(TESTFN, 'rb', s)
|
---|
183 | d = int(f.read().decode("ascii"))
|
---|
184 | f.close()
|
---|
185 | f.close()
|
---|
186 | except IOError as msg:
|
---|
187 | self.fail('error setting buffer size %d: %s' % (s, str(msg)))
|
---|
188 | self.assertEqual(d, s)
|
---|
189 |
|
---|
190 | def testTruncateOnWindows(self):
|
---|
191 | # SF bug <http://www.python.org/sf/801631>
|
---|
192 | # "file.truncate fault on windows"
|
---|
193 |
|
---|
194 | os.unlink(TESTFN)
|
---|
195 | f = self.open(TESTFN, 'wb')
|
---|
196 |
|
---|
197 | try:
|
---|
198 | f.write(b'12345678901') # 11 bytes
|
---|
199 | f.close()
|
---|
200 |
|
---|
201 | f = self.open(TESTFN,'rb+')
|
---|
202 | data = f.read(5)
|
---|
203 | if data != b'12345':
|
---|
204 | self.fail("Read on file opened for update failed %r" % data)
|
---|
205 | if f.tell() != 5:
|
---|
206 | self.fail("File pos after read wrong %d" % f.tell())
|
---|
207 |
|
---|
208 | f.truncate()
|
---|
209 | if f.tell() != 5:
|
---|
210 | self.fail("File pos after ftruncate wrong %d" % f.tell())
|
---|
211 |
|
---|
212 | f.close()
|
---|
213 | size = os.path.getsize(TESTFN)
|
---|
214 | if size != 5:
|
---|
215 | self.fail("File size after ftruncate wrong %d" % size)
|
---|
216 | finally:
|
---|
217 | f.close()
|
---|
218 | os.unlink(TESTFN)
|
---|
219 |
|
---|
220 | def testIteration(self):
|
---|
221 | # Test the complex interaction when mixing file-iteration and the
|
---|
222 | # various read* methods.
|
---|
223 | dataoffset = 16384
|
---|
224 | filler = b"ham\n"
|
---|
225 | assert not dataoffset % len(filler), \
|
---|
226 | "dataoffset must be multiple of len(filler)"
|
---|
227 | nchunks = dataoffset // len(filler)
|
---|
228 | testlines = [
|
---|
229 | b"spam, spam and eggs\n",
|
---|
230 | b"eggs, spam, ham and spam\n",
|
---|
231 | b"saussages, spam, spam and eggs\n",
|
---|
232 | b"spam, ham, spam and eggs\n",
|
---|
233 | b"spam, spam, spam, spam, spam, ham, spam\n",
|
---|
234 | b"wonderful spaaaaaam.\n"
|
---|
235 | ]
|
---|
236 | methods = [("readline", ()), ("read", ()), ("readlines", ()),
|
---|
237 | ("readinto", (array("b", b" "*100),))]
|
---|
238 |
|
---|
239 | try:
|
---|
240 | # Prepare the testfile
|
---|
241 | bag = self.open(TESTFN, "wb")
|
---|
242 | bag.write(filler * nchunks)
|
---|
243 | bag.writelines(testlines)
|
---|
244 | bag.close()
|
---|
245 | # Test for appropriate errors mixing read* and iteration
|
---|
246 | for methodname, args in methods:
|
---|
247 | f = self.open(TESTFN, 'rb')
|
---|
248 | if next(f) != filler:
|
---|
249 | self.fail, "Broken testfile"
|
---|
250 | meth = getattr(f, methodname)
|
---|
251 | meth(*args) # This simply shouldn't fail
|
---|
252 | f.close()
|
---|
253 |
|
---|
254 | # Test to see if harmless (by accident) mixing of read* and
|
---|
255 | # iteration still works. This depends on the size of the internal
|
---|
256 | # iteration buffer (currently 8192,) but we can test it in a
|
---|
257 | # flexible manner. Each line in the bag o' ham is 4 bytes
|
---|
258 | # ("h", "a", "m", "\n"), so 4096 lines of that should get us
|
---|
259 | # exactly on the buffer boundary for any power-of-2 buffersize
|
---|
260 | # between 4 and 16384 (inclusive).
|
---|
261 | f = self.open(TESTFN, 'rb')
|
---|
262 | for i in range(nchunks):
|
---|
263 | next(f)
|
---|
264 | testline = testlines.pop(0)
|
---|
265 | try:
|
---|
266 | line = f.readline()
|
---|
267 | except ValueError:
|
---|
268 | self.fail("readline() after next() with supposedly empty "
|
---|
269 | "iteration-buffer failed anyway")
|
---|
270 | if line != testline:
|
---|
271 | self.fail("readline() after next() with empty buffer "
|
---|
272 | "failed. Got %r, expected %r" % (line, testline))
|
---|
273 | testline = testlines.pop(0)
|
---|
274 | buf = array("b", b"\x00" * len(testline))
|
---|
275 | try:
|
---|
276 | f.readinto(buf)
|
---|
277 | except ValueError:
|
---|
278 | self.fail("readinto() after next() with supposedly empty "
|
---|
279 | "iteration-buffer failed anyway")
|
---|
280 | line = buf.tostring()
|
---|
281 | if line != testline:
|
---|
282 | self.fail("readinto() after next() with empty buffer "
|
---|
283 | "failed. Got %r, expected %r" % (line, testline))
|
---|
284 |
|
---|
285 | testline = testlines.pop(0)
|
---|
286 | try:
|
---|
287 | line = f.read(len(testline))
|
---|
288 | except ValueError:
|
---|
289 | self.fail("read() after next() with supposedly empty "
|
---|
290 | "iteration-buffer failed anyway")
|
---|
291 | if line != testline:
|
---|
292 | self.fail("read() after next() with empty buffer "
|
---|
293 | "failed. Got %r, expected %r" % (line, testline))
|
---|
294 | try:
|
---|
295 | lines = f.readlines()
|
---|
296 | except ValueError:
|
---|
297 | self.fail("readlines() after next() with supposedly empty "
|
---|
298 | "iteration-buffer failed anyway")
|
---|
299 | if lines != testlines:
|
---|
300 | self.fail("readlines() after next() with empty buffer "
|
---|
301 | "failed. Got %r, expected %r" % (line, testline))
|
---|
302 | # Reading after iteration hit EOF shouldn't hurt either
|
---|
303 | f = self.open(TESTFN, 'rb')
|
---|
304 | try:
|
---|
305 | for line in f:
|
---|
306 | pass
|
---|
307 | try:
|
---|
308 | f.readline()
|
---|
309 | f.readinto(buf)
|
---|
310 | f.read()
|
---|
311 | f.readlines()
|
---|
312 | except ValueError:
|
---|
313 | self.fail("read* failed after next() consumed file")
|
---|
314 | finally:
|
---|
315 | f.close()
|
---|
316 | finally:
|
---|
317 | os.unlink(TESTFN)
|
---|
318 |
|
---|
319 | class COtherFileTests(OtherFileTests):
|
---|
320 | open = io.open
|
---|
321 |
|
---|
322 | class PyOtherFileTests(OtherFileTests):
|
---|
323 | open = staticmethod(pyio.open)
|
---|
324 |
|
---|
325 |
|
---|
326 | def test_main():
|
---|
327 | # Historically, these tests have been sloppy about removing TESTFN.
|
---|
328 | # So get rid of it no matter what.
|
---|
329 | try:
|
---|
330 | run_unittest(CAutoFileTests, PyAutoFileTests,
|
---|
331 | COtherFileTests, PyOtherFileTests)
|
---|
332 | finally:
|
---|
333 | if os.path.exists(TESTFN):
|
---|
334 | os.unlink(TESTFN)
|
---|
335 |
|
---|
336 | if __name__ == '__main__':
|
---|
337 | test_main()
|
---|