source: python/trunk/Lib/test/test_support.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 48.0 KB
Line 
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4 raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import sys
12import os
13import platform
14import shutil
15import warnings
16import unittest
17import importlib
18import UserDict
19import re
20import time
21import struct
22import _testcapi
23import sysconfig
24try:
25 import thread
26except ImportError:
27 thread = None
28
29__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
30 "verbose", "use_resources", "max_memuse", "record_original_stdout",
31 "get_original_stdout", "unload", "unlink", "rmtree", "forget",
32 "is_resource_enabled", "requires", "find_unused_port", "bind_port",
33 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
34 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
35 "open_urlresource", "check_warnings", "check_py3k_warnings",
36 "CleanImport", "EnvironmentVarGuard", "captured_output",
37 "captured_stdout", "TransientResource", "transient_internet",
38 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
39 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
40 "threading_cleanup", "reap_children", "cpython_only",
41 "check_impl_detail", "get_attribute", "py3k_bytes",
42 "import_fresh_module", "threading_cleanup", "reap_children",
43 "strip_python_stderr"]
44
45class Error(Exception):
46 """Base class for regression test exceptions."""
47
48class TestFailed(Error):
49 """Test failed."""
50
51class ResourceDenied(unittest.SkipTest):
52 """Test skipped because it requested a disallowed resource.
53
54 This is raised when a test calls requires() for a resource that
55 has not been enabled. It is used to distinguish between expected
56 and unexpected skips.
57 """
58
59@contextlib.contextmanager
60def _ignore_deprecated_imports(ignore=True):
61 """Context manager to suppress package and module deprecation
62 warnings when importing them.
63
64 If ignore is False, this context manager has no effect."""
65 if ignore:
66 with warnings.catch_warnings():
67 warnings.filterwarnings("ignore", ".+ (module|package)",
68 DeprecationWarning)
69 yield
70 else:
71 yield
72
73
74def import_module(name, deprecated=False):
75 """Import and return the module to be tested, raising SkipTest if
76 it is not available.
77
78 If deprecated is True, any module or package deprecation messages
79 will be suppressed."""
80 with _ignore_deprecated_imports(deprecated):
81 try:
82 return importlib.import_module(name)
83 except ImportError, msg:
84 raise unittest.SkipTest(str(msg))
85
86
87def _save_and_remove_module(name, orig_modules):
88 """Helper function to save and remove a module from sys.modules
89
90 Raise ImportError if the module can't be imported."""
91 # try to import the module and raise an error if it can't be imported
92 if name not in sys.modules:
93 __import__(name)
94 del sys.modules[name]
95 for modname in list(sys.modules):
96 if modname == name or modname.startswith(name + '.'):
97 orig_modules[modname] = sys.modules[modname]
98 del sys.modules[modname]
99
100def _save_and_block_module(name, orig_modules):
101 """Helper function to save and block a module in sys.modules
102
103 Return True if the module was in sys.modules, False otherwise."""
104 saved = True
105 try:
106 orig_modules[name] = sys.modules[name]
107 except KeyError:
108 saved = False
109 sys.modules[name] = None
110 return saved
111
112
113def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
114 """Imports and returns a module, deliberately bypassing the sys.modules cache
115 and importing a fresh copy of the module. Once the import is complete,
116 the sys.modules cache is restored to its original state.
117
118 Modules named in fresh are also imported anew if needed by the import.
119 If one of these modules can't be imported, None is returned.
120
121 Importing of modules named in blocked is prevented while the fresh import
122 takes place.
123
124 If deprecated is True, any module or package deprecation messages
125 will be suppressed."""
126 # NOTE: test_heapq, test_json, and test_warnings include extra sanity
127 # checks to make sure that this utility function is working as expected
128 with _ignore_deprecated_imports(deprecated):
129 # Keep track of modules saved for later restoration as well
130 # as those which just need a blocking entry removed
131 orig_modules = {}
132 names_to_remove = []
133 _save_and_remove_module(name, orig_modules)
134 try:
135 for fresh_name in fresh:
136 _save_and_remove_module(fresh_name, orig_modules)
137 for blocked_name in blocked:
138 if not _save_and_block_module(blocked_name, orig_modules):
139 names_to_remove.append(blocked_name)
140 fresh_module = importlib.import_module(name)
141 except ImportError:
142 fresh_module = None
143 finally:
144 for orig_name, module in orig_modules.items():
145 sys.modules[orig_name] = module
146 for name_to_remove in names_to_remove:
147 del sys.modules[name_to_remove]
148 return fresh_module
149
150
151def get_attribute(obj, name):
152 """Get an attribute, raising SkipTest if AttributeError is raised."""
153 try:
154 attribute = getattr(obj, name)
155 except AttributeError:
156 raise unittest.SkipTest("module %s has no attribute %s" % (
157 obj.__name__, name))
158 else:
159 return attribute
160
161
162verbose = 1 # Flag set to 0 by regrtest.py
163use_resources = None # Flag set to [] by regrtest.py
164max_memuse = 0 # Disable bigmem tests (they will still be run with
165 # small sizes, to make sure they work.)
166real_max_memuse = 0
167
168# _original_stdout is meant to hold stdout at the time regrtest began.
169# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
170# The point is to have some flavor of stdout the user can actually see.
171_original_stdout = None
172def record_original_stdout(stdout):
173 global _original_stdout
174 _original_stdout = stdout
175
176def get_original_stdout():
177 return _original_stdout or sys.stdout
178
179def unload(name):
180 try:
181 del sys.modules[name]
182 except KeyError:
183 pass
184
185if sys.platform.startswith("win"):
186 def _waitfor(func, pathname, waitall=False):
187 # Perform the operation
188 func(pathname)
189 # Now setup the wait loop
190 if waitall:
191 dirname = pathname
192 else:
193 dirname, name = os.path.split(pathname)
194 dirname = dirname or '.'
195 # Check for `pathname` to be removed from the filesystem.
196 # The exponential backoff of the timeout amounts to a total
197 # of ~1 second after which the deletion is probably an error
198 # anyway.
199 # Testing on a i7@4.3GHz shows that usually only 1 iteration is
200 # required when contention occurs.
201 timeout = 0.001
202 while timeout < 1.0:
203 # Note we are only testing for the existence of the file(s) in
204 # the contents of the directory regardless of any security or
205 # access rights. If we have made it this far, we have sufficient
206 # permissions to do that much using Python's equivalent of the
207 # Windows API FindFirstFile.
208 # Other Windows APIs can fail or give incorrect results when
209 # dealing with files that are pending deletion.
210 L = os.listdir(dirname)
211 if not (L if waitall else name in L):
212 return
213 # Increase the timeout and try again
214 time.sleep(timeout)
215 timeout *= 2
216 warnings.warn('tests may fail, delete still pending for ' + pathname,
217 RuntimeWarning, stacklevel=4)
218
219 def _unlink(filename):
220 _waitfor(os.unlink, filename)
221
222 def _rmdir(dirname):
223 _waitfor(os.rmdir, dirname)
224
225 def _rmtree(path):
226 def _rmtree_inner(path):
227 for name in os.listdir(path):
228 fullname = os.path.join(path, name)
229 if os.path.isdir(fullname):
230 _waitfor(_rmtree_inner, fullname, waitall=True)
231 os.rmdir(fullname)
232 else:
233 os.unlink(fullname)
234 _waitfor(_rmtree_inner, path, waitall=True)
235 _waitfor(os.rmdir, path)
236else:
237 _unlink = os.unlink
238 _rmdir = os.rmdir
239 _rmtree = shutil.rmtree
240
241def unlink(filename):
242 try:
243 _unlink(filename)
244 except OSError:
245 pass
246
247def rmdir(dirname):
248 try:
249 _rmdir(dirname)
250 except OSError as error:
251 # The directory need not exist.
252 if error.errno != errno.ENOENT:
253 raise
254
255def rmtree(path):
256 try:
257 _rmtree(path)
258 except OSError, e:
259 # Unix returns ENOENT, Windows returns ESRCH.
260 if e.errno not in (errno.ENOENT, errno.ESRCH):
261 raise
262
263def forget(modname):
264 '''"Forget" a module was ever imported by removing it from sys.modules and
265 deleting any .pyc and .pyo files.'''
266 unload(modname)
267 for dirname in sys.path:
268 unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
269 # Deleting the .pyo file cannot be within the 'try' for the .pyc since
270 # the chance exists that there is no .pyc (and thus the 'try' statement
271 # is exited) but there is a .pyo file.
272 unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
273
274def is_resource_enabled(resource):
275 """Test whether a resource is enabled. Known resources are set by
276 regrtest.py."""
277 return use_resources is not None and resource in use_resources
278
279def requires(resource, msg=None):
280 """Raise ResourceDenied if the specified resource is not available.
281
282 If the caller's module is __main__ then automatically return True. The
283 possibility of False being returned occurs when regrtest.py is executing."""
284 # see if the caller's module is __main__ - if so, treat as if
285 # the resource was set
286 if sys._getframe(1).f_globals.get("__name__") == "__main__":
287 return
288 if not is_resource_enabled(resource):
289 if msg is None:
290 msg = "Use of the `%s' resource not enabled" % resource
291 raise ResourceDenied(msg)
292
293
294# Don't use "localhost", since resolving it uses the DNS under recent
295# Windows versions (see issue #18792).
296HOST = "127.0.0.1"
297HOSTv6 = "::1"
298
299
300def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
301 """Returns an unused port that should be suitable for binding. This is
302 achieved by creating a temporary socket with the same family and type as
303 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
304 the specified host address (defaults to 0.0.0.0) with the port set to 0,
305 eliciting an unused ephemeral port from the OS. The temporary socket is
306 then closed and deleted, and the ephemeral port is returned.
307
308 Either this method or bind_port() should be used for any tests where a
309 server socket needs to be bound to a particular port for the duration of
310 the test. Which one to use depends on whether the calling code is creating
311 a python socket, or if an unused port needs to be provided in a constructor
312 or passed to an external program (i.e. the -accept argument to openssl's
313 s_server mode). Always prefer bind_port() over find_unused_port() where
314 possible. Hard coded ports should *NEVER* be used. As soon as a server
315 socket is bound to a hard coded port, the ability to run multiple instances
316 of the test simultaneously on the same host is compromised, which makes the
317 test a ticking time bomb in a buildbot environment. On Unix buildbots, this
318 may simply manifest as a failed test, which can be recovered from without
319 intervention in most cases, but on Windows, the entire python process can
320 completely and utterly wedge, requiring someone to log in to the buildbot
321 and manually kill the affected process.
322
323 (This is easy to reproduce on Windows, unfortunately, and can be traced to
324 the SO_REUSEADDR socket option having different semantics on Windows versus
325 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
326 listen and then accept connections on identical host/ports. An EADDRINUSE
327 socket.error will be raised at some point (depending on the platform and
328 the order bind and listen were called on each socket).
329
330 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
331 will ever be raised when attempting to bind two identical host/ports. When
332 accept() is called on each socket, the second caller's process will steal
333 the port from the first caller, leaving them both in an awkwardly wedged
334 state where they'll no longer respond to any signals or graceful kills, and
335 must be forcibly killed via OpenProcess()/TerminateProcess().
336
337 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
338 instead of SO_REUSEADDR, which effectively affords the same semantics as
339 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
340 Source world compared to Windows ones, this is a common mistake. A quick
341 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
342 openssl.exe is called with the 's_server' option, for example. See
343 http://bugs.python.org/issue2550 for more info. The following site also
344 has a very thorough description about the implications of both REUSEADDR
345 and EXCLUSIVEADDRUSE on Windows:
346 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
347
348 XXX: although this approach is a vast improvement on previous attempts to
349 elicit unused ports, it rests heavily on the assumption that the ephemeral
350 port returned to us by the OS won't immediately be dished back out to some
351 other process when we close and delete our temporary socket but before our
352 calling code has a chance to bind the returned port. We can deal with this
353 issue if/when we come across it."""
354 tempsock = socket.socket(family, socktype)
355 port = bind_port(tempsock)
356 tempsock.close()
357 del tempsock
358 return port
359
360def bind_port(sock, host=HOST):
361 """Bind the socket to a free port and return the port number. Relies on
362 ephemeral ports in order to ensure we are using an unbound port. This is
363 important as many tests may be running simultaneously, especially in a
364 buildbot environment. This method raises an exception if the sock.family
365 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
366 or SO_REUSEPORT set on it. Tests should *never* set these socket options
367 for TCP/IP sockets. The only case for setting these options is testing
368 multicasting via multiple UDP sockets.
369
370 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
371 on Windows), it will be set on the socket. This will prevent anyone else
372 from bind()'ing to our host/port for the duration of the test.
373 """
374 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
375 if hasattr(socket, 'SO_REUSEADDR'):
376 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
377 raise TestFailed("tests should never set the SO_REUSEADDR " \
378 "socket option on TCP/IP sockets!")
379 if hasattr(socket, 'SO_REUSEPORT'):
380 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
381 raise TestFailed("tests should never set the SO_REUSEPORT " \
382 "socket option on TCP/IP sockets!")
383 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
384 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
385
386 sock.bind((host, 0))
387 port = sock.getsockname()[1]
388 return port
389
390FUZZ = 1e-6
391
392def fcmp(x, y): # fuzzy comparison function
393 if isinstance(x, float) or isinstance(y, float):
394 try:
395 fuzz = (abs(x) + abs(y)) * FUZZ
396 if abs(x-y) <= fuzz:
397 return 0
398 except:
399 pass
400 elif type(x) == type(y) and isinstance(x, (tuple, list)):
401 for i in range(min(len(x), len(y))):
402 outcome = fcmp(x[i], y[i])
403 if outcome != 0:
404 return outcome
405 return (len(x) > len(y)) - (len(x) < len(y))
406 return (x > y) - (x < y)
407
408
409# A constant likely larger than the underlying OS pipe buffer size, to
410# make writes blocking.
411# Windows limit seems to be around 512 B, and many Unix kernels have a
412# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
413# (see issue #17835 for a discussion of this number).
414PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
415
416# A constant likely larger than the underlying OS socket buffer size, to make
417# writes blocking.
418# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
419# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
420# for a discussion of this number).
421SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
422
423try:
424 unicode
425 have_unicode = True
426except NameError:
427 have_unicode = False
428
429is_jython = sys.platform.startswith('java')
430
431# Filename used for testing
432if os.name == 'java':
433 # Jython disallows @ in module names
434 TESTFN = '$test'
435elif os.name == 'riscos':
436 TESTFN = 'testfile'
437else:
438 TESTFN = '@test'
439 # Unicode name only used if TEST_FN_ENCODING exists for the platform.
440 if have_unicode:
441 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
442 # TESTFN_UNICODE is a filename that can be encoded using the
443 # file system encoding, but *not* with the default (ascii) encoding
444 if isinstance('', unicode):
445 # python -U
446 # XXX perhaps unicode() should accept Unicode strings?
447 TESTFN_UNICODE = "@test-\xe0\xf2"
448 else:
449 # 2 latin characters.
450 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
451 TESTFN_ENCODING = sys.getfilesystemencoding()
452 # TESTFN_UNENCODABLE is a filename that should *not* be
453 # able to be encoded by *either* the default or filesystem encoding.
454 # This test really only makes sense on Windows NT platforms
455 # which have special Unicode support in posixmodule.
456 if (not hasattr(sys, "getwindowsversion") or
457 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
458 TESTFN_UNENCODABLE = None
459 else:
460 # Japanese characters (I think - from bug 846133)
461 TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
462 try:
463 # XXX - Note - should be using TESTFN_ENCODING here - but for
464 # Windows, "mbcs" currently always operates as if in
465 # errors=ignore' mode - hence we get '?' characters rather than
466 # the exception. 'Latin1' operates as we expect - ie, fails.
467 # See [ 850997 ] mbcs encoding ignores errors
468 TESTFN_UNENCODABLE.encode("Latin1")
469 except UnicodeEncodeError:
470 pass
471 else:
472 print \
473 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
474 'Unicode filename tests may not be effective' \
475 % TESTFN_UNENCODABLE
476
477
478# Disambiguate TESTFN for parallel testing, while letting it remain a valid
479# module name.
480TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
481
482# Save the initial cwd
483SAVEDCWD = os.getcwd()
484
485@contextlib.contextmanager
486def temp_cwd(name='tempcwd', quiet=False):
487 """
488 Context manager that creates a temporary directory and set it as CWD.
489
490 The new CWD is created in the current directory and it's named *name*.
491 If *quiet* is False (default) and it's not possible to create or change
492 the CWD, an error is raised. If it's True, only a warning is raised
493 and the original CWD is used.
494 """
495 if have_unicode and isinstance(name, unicode):
496 try:
497 name = name.encode(sys.getfilesystemencoding() or 'ascii')
498 except UnicodeEncodeError:
499 if not quiet:
500 raise unittest.SkipTest('unable to encode the cwd name with '
501 'the filesystem encoding.')
502 saved_dir = os.getcwd()
503 is_temporary = False
504 try:
505 os.mkdir(name)
506 os.chdir(name)
507 is_temporary = True
508 except OSError:
509 if not quiet:
510 raise
511 warnings.warn('tests may fail, unable to change the CWD to ' + name,
512 RuntimeWarning, stacklevel=3)
513 try:
514 yield os.getcwd()
515 finally:
516 os.chdir(saved_dir)
517 if is_temporary:
518 rmtree(name)
519
520
521def findfile(file, here=__file__, subdir=None):
522 """Try to find a file on sys.path and the working directory. If it is not
523 found the argument passed to the function is returned (this does not
524 necessarily signal failure; could still be the legitimate path)."""
525 if os.path.isabs(file):
526 return file
527 if subdir is not None:
528 file = os.path.join(subdir, file)
529 path = sys.path
530 path = [os.path.dirname(here)] + path
531 for dn in path:
532 fn = os.path.join(dn, file)
533 if os.path.exists(fn): return fn
534 return file
535
536def sortdict(dict):
537 "Like repr(dict), but in sorted order."
538 items = dict.items()
539 items.sort()
540 reprpairs = ["%r: %r" % pair for pair in items]
541 withcommas = ", ".join(reprpairs)
542 return "{%s}" % withcommas
543
544def make_bad_fd():
545 """
546 Create an invalid file descriptor by opening and closing a file and return
547 its fd.
548 """
549 file = open(TESTFN, "wb")
550 try:
551 return file.fileno()
552 finally:
553 file.close()
554 unlink(TESTFN)
555
556def check_syntax_error(testcase, statement):
557 testcase.assertRaises(SyntaxError, compile, statement,
558 '<test string>', 'exec')
559
560def open_urlresource(url, check=None):
561 import urlparse, urllib2
562
563 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
564
565 fn = os.path.join(os.path.dirname(__file__), "data", filename)
566
567 def check_valid_file(fn):
568 f = open(fn)
569 if check is None:
570 return f
571 elif check(f):
572 f.seek(0)
573 return f
574 f.close()
575
576 if os.path.exists(fn):
577 f = check_valid_file(fn)
578 if f is not None:
579 return f
580 unlink(fn)
581
582 # Verify the requirement before downloading the file
583 requires('urlfetch')
584
585 print >> get_original_stdout(), '\tfetching %s ...' % url
586 f = urllib2.urlopen(url, timeout=15)
587 try:
588 with open(fn, "wb") as out:
589 s = f.read()
590 while s:
591 out.write(s)
592 s = f.read()
593 finally:
594 f.close()
595
596 f = check_valid_file(fn)
597 if f is not None:
598 return f
599 raise TestFailed('invalid resource "%s"' % fn)
600
601
602class WarningsRecorder(object):
603 """Convenience wrapper for the warnings list returned on
604 entry to the warnings.catch_warnings() context manager.
605 """
606 def __init__(self, warnings_list):
607 self._warnings = warnings_list
608 self._last = 0
609
610 def __getattr__(self, attr):
611 if len(self._warnings) > self._last:
612 return getattr(self._warnings[-1], attr)
613 elif attr in warnings.WarningMessage._WARNING_DETAILS:
614 return None
615 raise AttributeError("%r has no attribute %r" % (self, attr))
616
617 @property
618 def warnings(self):
619 return self._warnings[self._last:]
620
621 def reset(self):
622 self._last = len(self._warnings)
623
624
625def _filterwarnings(filters, quiet=False):
626 """Catch the warnings, then check if all the expected
627 warnings have been raised and re-raise unexpected warnings.
628 If 'quiet' is True, only re-raise the unexpected warnings.
629 """
630 # Clear the warning registry of the calling module
631 # in order to re-raise the warnings.
632 frame = sys._getframe(2)
633 registry = frame.f_globals.get('__warningregistry__')
634 if registry:
635 registry.clear()
636 with warnings.catch_warnings(record=True) as w:
637 # Set filter "always" to record all warnings. Because
638 # test_warnings swap the module, we need to look up in
639 # the sys.modules dictionary.
640 sys.modules['warnings'].simplefilter("always")
641 yield WarningsRecorder(w)
642 # Filter the recorded warnings
643 reraise = [warning.message for warning in w]
644 missing = []
645 for msg, cat in filters:
646 seen = False
647 for exc in reraise[:]:
648 message = str(exc)
649 # Filter out the matching messages
650 if (re.match(msg, message, re.I) and
651 issubclass(exc.__class__, cat)):
652 seen = True
653 reraise.remove(exc)
654 if not seen and not quiet:
655 # This filter caught nothing
656 missing.append((msg, cat.__name__))
657 if reraise:
658 raise AssertionError("unhandled warning %r" % reraise[0])
659 if missing:
660 raise AssertionError("filter (%r, %s) did not catch any warning" %
661 missing[0])
662
663
664@contextlib.contextmanager
665def check_warnings(*filters, **kwargs):
666 """Context manager to silence warnings.
667
668 Accept 2-tuples as positional arguments:
669 ("message regexp", WarningCategory)
670
671 Optional argument:
672 - if 'quiet' is True, it does not fail if a filter catches nothing
673 (default True without argument,
674 default False if some filters are defined)
675
676 Without argument, it defaults to:
677 check_warnings(("", Warning), quiet=True)
678 """
679 quiet = kwargs.get('quiet')
680 if not filters:
681 filters = (("", Warning),)
682 # Preserve backward compatibility
683 if quiet is None:
684 quiet = True
685 return _filterwarnings(filters, quiet)
686
687
688@contextlib.contextmanager
689def check_py3k_warnings(*filters, **kwargs):
690 """Context manager to silence py3k warnings.
691
692 Accept 2-tuples as positional arguments:
693 ("message regexp", WarningCategory)
694
695 Optional argument:
696 - if 'quiet' is True, it does not fail if a filter catches nothing
697 (default False)
698
699 Without argument, it defaults to:
700 check_py3k_warnings(("", DeprecationWarning), quiet=False)
701 """
702 if sys.py3kwarning:
703 if not filters:
704 filters = (("", DeprecationWarning),)
705 else:
706 # It should not raise any py3k warning
707 filters = ()
708 return _filterwarnings(filters, kwargs.get('quiet'))
709
710
711class CleanImport(object):
712 """Context manager to force import to return a new module reference.
713
714 This is useful for testing module-level behaviours, such as
715 the emission of a DeprecationWarning on import.
716
717 Use like this:
718
719 with CleanImport("foo"):
720 importlib.import_module("foo") # new reference
721 """
722
723 def __init__(self, *module_names):
724 self.original_modules = sys.modules.copy()
725 for module_name in module_names:
726 if module_name in sys.modules:
727 module = sys.modules[module_name]
728 # It is possible that module_name is just an alias for
729 # another module (e.g. stub for modules renamed in 3.x).
730 # In that case, we also need delete the real module to clear
731 # the import cache.
732 if module.__name__ != module_name:
733 del sys.modules[module.__name__]
734 del sys.modules[module_name]
735
736 def __enter__(self):
737 return self
738
739 def __exit__(self, *ignore_exc):
740 sys.modules.update(self.original_modules)
741
742
743class EnvironmentVarGuard(UserDict.DictMixin):
744
745 """Class to help protect the environment variable properly. Can be used as
746 a context manager."""
747
748 def __init__(self):
749 self._environ = os.environ
750 self._changed = {}
751
752 def __getitem__(self, envvar):
753 return self._environ[envvar]
754
755 def __setitem__(self, envvar, value):
756 # Remember the initial value on the first access
757 if envvar not in self._changed:
758 self._changed[envvar] = self._environ.get(envvar)
759 self._environ[envvar] = value
760
761 def __delitem__(self, envvar):
762 # Remember the initial value on the first access
763 if envvar not in self._changed:
764 self._changed[envvar] = self._environ.get(envvar)
765 if envvar in self._environ:
766 del self._environ[envvar]
767
768 def keys(self):
769 return self._environ.keys()
770
771 def set(self, envvar, value):
772 self[envvar] = value
773
774 def unset(self, envvar):
775 del self[envvar]
776
777 def __enter__(self):
778 return self
779
780 def __exit__(self, *ignore_exc):
781 for (k, v) in self._changed.items():
782 if v is None:
783 if k in self._environ:
784 del self._environ[k]
785 else:
786 self._environ[k] = v
787 os.environ = self._environ
788
789
790class DirsOnSysPath(object):
791 """Context manager to temporarily add directories to sys.path.
792
793 This makes a copy of sys.path, appends any directories given
794 as positional arguments, then reverts sys.path to the copied
795 settings when the context ends.
796
797 Note that *all* sys.path modifications in the body of the
798 context manager, including replacement of the object,
799 will be reverted at the end of the block.
800 """
801
802 def __init__(self, *paths):
803 self.original_value = sys.path[:]
804 self.original_object = sys.path
805 sys.path.extend(paths)
806
807 def __enter__(self):
808 return self
809
810 def __exit__(self, *ignore_exc):
811 sys.path = self.original_object
812 sys.path[:] = self.original_value
813
814
815class TransientResource(object):
816
817 """Raise ResourceDenied if an exception is raised while the context manager
818 is in effect that matches the specified exception and attributes."""
819
820 def __init__(self, exc, **kwargs):
821 self.exc = exc
822 self.attrs = kwargs
823
824 def __enter__(self):
825 return self
826
827 def __exit__(self, type_=None, value=None, traceback=None):
828 """If type_ is a subclass of self.exc and value has attributes matching
829 self.attrs, raise ResourceDenied. Otherwise let the exception
830 propagate (if any)."""
831 if type_ is not None and issubclass(self.exc, type_):
832 for attr, attr_value in self.attrs.iteritems():
833 if not hasattr(value, attr):
834 break
835 if getattr(value, attr) != attr_value:
836 break
837 else:
838 raise ResourceDenied("an optional resource is not available")
839
840
841@contextlib.contextmanager
842def transient_internet(resource_name, timeout=30.0, errnos=()):
843 """Return a context manager that raises ResourceDenied when various issues
844 with the Internet connection manifest themselves as exceptions."""
845 default_errnos = [
846 ('ECONNREFUSED', 111),
847 ('ECONNRESET', 104),
848 ('EHOSTUNREACH', 113),
849 ('ENETUNREACH', 101),
850 ('ETIMEDOUT', 110),
851 ]
852 default_gai_errnos = [
853 ('EAI_AGAIN', -3),
854 ('EAI_FAIL', -4),
855 ('EAI_NONAME', -2),
856 ('EAI_NODATA', -5),
857 # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
858 # implementation actually returns WSANO_DATA i.e. 11004.
859 ('WSANO_DATA', 11004),
860 ]
861
862 denied = ResourceDenied("Resource '%s' is not available" % resource_name)
863 captured_errnos = errnos
864 gai_errnos = []
865 if not captured_errnos:
866 captured_errnos = [getattr(errno, name, num)
867 for (name, num) in default_errnos]
868 gai_errnos = [getattr(socket, name, num)
869 for (name, num) in default_gai_errnos]
870
871 def filter_error(err):
872 n = getattr(err, 'errno', None)
873 if (isinstance(err, socket.timeout) or
874 (isinstance(err, socket.gaierror) and n in gai_errnos) or
875 n in captured_errnos):
876 if not verbose:
877 sys.stderr.write(denied.args[0] + "\n")
878 raise denied
879
880 old_timeout = socket.getdefaulttimeout()
881 try:
882 if timeout is not None:
883 socket.setdefaulttimeout(timeout)
884 yield
885 except IOError as err:
886 # urllib can wrap original socket errors multiple times (!), we must
887 # unwrap to get at the original error.
888 while True:
889 a = err.args
890 if len(a) >= 1 and isinstance(a[0], IOError):
891 err = a[0]
892 # The error can also be wrapped as args[1]:
893 # except socket.error as msg:
894 # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
895 elif len(a) >= 2 and isinstance(a[1], IOError):
896 err = a[1]
897 else:
898 break
899 filter_error(err)
900 raise
901 # XXX should we catch generic exceptions and look for their
902 # __cause__ or __context__?
903 finally:
904 socket.setdefaulttimeout(old_timeout)
905
906
907@contextlib.contextmanager
908def captured_output(stream_name):
909 """Return a context manager used by captured_stdout and captured_stdin
910 that temporarily replaces the sys stream *stream_name* with a StringIO."""
911 import StringIO
912 orig_stdout = getattr(sys, stream_name)
913 setattr(sys, stream_name, StringIO.StringIO())
914 try:
915 yield getattr(sys, stream_name)
916 finally:
917 setattr(sys, stream_name, orig_stdout)
918
919def captured_stdout():
920 """Capture the output of sys.stdout:
921
922 with captured_stdout() as s:
923 print "hello"
924 self.assertEqual(s.getvalue(), "hello")
925 """
926 return captured_output("stdout")
927
928def captured_stderr():
929 return captured_output("stderr")
930
931def captured_stdin():
932 return captured_output("stdin")
933
934def gc_collect():
935 """Force as many objects as possible to be collected.
936
937 In non-CPython implementations of Python, this is needed because timely
938 deallocation is not guaranteed by the garbage collector. (Even in CPython
939 this can be the case in case of reference cycles.) This means that __del__
940 methods may be called later than expected and weakrefs may remain alive for
941 longer than expected. This function tries its best to force all garbage
942 objects to disappear.
943 """
944 gc.collect()
945 if is_jython:
946 time.sleep(0.1)
947 gc.collect()
948 gc.collect()
949
950
951_header = '2P'
952if hasattr(sys, "gettotalrefcount"):
953 _header = '2P' + _header
954_vheader = _header + 'P'
955
956def calcobjsize(fmt):
957 return struct.calcsize(_header + fmt + '0P')
958
959def calcvobjsize(fmt):
960 return struct.calcsize(_vheader + fmt + '0P')
961
962
963_TPFLAGS_HAVE_GC = 1<<14
964_TPFLAGS_HEAPTYPE = 1<<9
965
966def check_sizeof(test, o, size):
967 result = sys.getsizeof(o)
968 # add GC header size
969 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
970 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
971 size += _testcapi.SIZEOF_PYGC_HEAD
972 msg = 'wrong size for %s: got %d, expected %d' \
973 % (type(o), result, size)
974 test.assertEqual(result, size, msg)
975
976
977#=======================================================================
978# Decorator for running a function in a different locale, correctly resetting
979# it afterwards.
980
981def run_with_locale(catstr, *locales):
982 def decorator(func):
983 def inner(*args, **kwds):
984 try:
985 import locale
986 category = getattr(locale, catstr)
987 orig_locale = locale.setlocale(category)
988 except AttributeError:
989 # if the test author gives us an invalid category string
990 raise
991 except:
992 # cannot retrieve original locale, so do nothing
993 locale = orig_locale = None
994 else:
995 for loc in locales:
996 try:
997 locale.setlocale(category, loc)
998 break
999 except:
1000 pass
1001
1002 # now run the function, resetting the locale on exceptions
1003 try:
1004 return func(*args, **kwds)
1005 finally:
1006 if locale and orig_locale:
1007 locale.setlocale(category, orig_locale)
1008 inner.func_name = func.func_name
1009 inner.__doc__ = func.__doc__
1010 return inner
1011 return decorator
1012
1013#=======================================================================
1014# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
1015
1016# Some handy shorthands. Note that these are used for byte-limits as well
1017# as size-limits, in the various bigmem tests
1018_1M = 1024*1024
1019_1G = 1024 * _1M
1020_2G = 2 * _1G
1021_4G = 4 * _1G
1022
1023MAX_Py_ssize_t = sys.maxsize
1024
1025def set_memlimit(limit):
1026 global max_memuse
1027 global real_max_memuse
1028 sizes = {
1029 'k': 1024,
1030 'm': _1M,
1031 'g': _1G,
1032 't': 1024*_1G,
1033 }
1034 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1035 re.IGNORECASE | re.VERBOSE)
1036 if m is None:
1037 raise ValueError('Invalid memory limit %r' % (limit,))
1038 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1039 real_max_memuse = memlimit
1040 if memlimit > MAX_Py_ssize_t:
1041 memlimit = MAX_Py_ssize_t
1042 if memlimit < _2G - 1:
1043 raise ValueError('Memory limit %r too low to be useful' % (limit,))
1044 max_memuse = memlimit
1045
1046def bigmemtest(minsize, memuse, overhead=5*_1M):
1047 """Decorator for bigmem tests.
1048
1049 'minsize' is the minimum useful size for the test (in arbitrary,
1050 test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1051 the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1052 independent of the testsize, and defaults to 5Mb.
1053
1054 The decorator tries to guess a good value for 'size' and passes it to
1055 the decorated test function. If minsize * memuse is more than the
1056 allowed memory use (as defined by max_memuse), the test is skipped.
1057 Otherwise, minsize is adjusted upward to use up to max_memuse.
1058 """
1059 def decorator(f):
1060 def wrapper(self):
1061 if not max_memuse:
1062 # If max_memuse is 0 (the default),
1063 # we still want to run the tests with size set to a few kb,
1064 # to make sure they work. We still want to avoid using
1065 # too much memory, though, but we do that noisily.
1066 maxsize = 5147
1067 self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1068 else:
1069 maxsize = int((max_memuse - overhead) / memuse)
1070 if maxsize < minsize:
1071 # Really ought to print 'test skipped' or something
1072 if verbose:
1073 sys.stderr.write("Skipping %s because of memory "
1074 "constraint\n" % (f.__name__,))
1075 return
1076 # Try to keep some breathing room in memory use
1077 maxsize = max(maxsize - 50 * _1M, minsize)
1078 return f(self, maxsize)
1079 wrapper.minsize = minsize
1080 wrapper.memuse = memuse
1081 wrapper.overhead = overhead
1082 return wrapper
1083 return decorator
1084
1085def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
1086 def decorator(f):
1087 def wrapper(self):
1088 if not real_max_memuse:
1089 maxsize = 5147
1090 else:
1091 maxsize = size
1092
1093 if ((real_max_memuse or not dry_run)
1094 and real_max_memuse < maxsize * memuse):
1095 if verbose:
1096 sys.stderr.write("Skipping %s because of memory "
1097 "constraint\n" % (f.__name__,))
1098 return
1099
1100 return f(self, maxsize)
1101 wrapper.size = size
1102 wrapper.memuse = memuse
1103 wrapper.overhead = overhead
1104 return wrapper
1105 return decorator
1106
1107def bigaddrspacetest(f):
1108 """Decorator for tests that fill the address space."""
1109 def wrapper(self):
1110 if max_memuse < MAX_Py_ssize_t:
1111 if verbose:
1112 sys.stderr.write("Skipping %s because of memory "
1113 "constraint\n" % (f.__name__,))
1114 else:
1115 return f(self)
1116 return wrapper
1117
1118#=======================================================================
1119# unittest integration.
1120
1121class BasicTestRunner:
1122 def run(self, test):
1123 result = unittest.TestResult()
1124 test(result)
1125 return result
1126
1127def _id(obj):
1128 return obj
1129
1130def requires_resource(resource):
1131 if is_resource_enabled(resource):
1132 return _id
1133 else:
1134 return unittest.skip("resource {0!r} is not enabled".format(resource))
1135
1136def cpython_only(test):
1137 """
1138 Decorator for tests only applicable on CPython.
1139 """
1140 return impl_detail(cpython=True)(test)
1141
1142def impl_detail(msg=None, **guards):
1143 if check_impl_detail(**guards):
1144 return _id
1145 if msg is None:
1146 guardnames, default = _parse_guards(guards)
1147 if default:
1148 msg = "implementation detail not available on {0}"
1149 else:
1150 msg = "implementation detail specific to {0}"
1151 guardnames = sorted(guardnames.keys())
1152 msg = msg.format(' or '.join(guardnames))
1153 return unittest.skip(msg)
1154
1155def _parse_guards(guards):
1156 # Returns a tuple ({platform_name: run_me}, default_value)
1157 if not guards:
1158 return ({'cpython': True}, False)
1159 is_true = guards.values()[0]
1160 assert guards.values() == [is_true] * len(guards) # all True or all False
1161 return (guards, not is_true)
1162
1163# Use the following check to guard CPython's implementation-specific tests --
1164# or to run them only on the implementation(s) guarded by the arguments.
1165def check_impl_detail(**guards):
1166 """This function returns True or False depending on the host platform.
1167 Examples:
1168 if check_impl_detail(): # only on CPython (default)
1169 if check_impl_detail(jython=True): # only on Jython
1170 if check_impl_detail(cpython=False): # everywhere except on CPython
1171 """
1172 guards, default = _parse_guards(guards)
1173 return guards.get(platform.python_implementation().lower(), default)
1174
1175
1176
1177def _run_suite(suite):
1178 """Run tests from a unittest.TestSuite-derived class."""
1179 if verbose:
1180 runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1181 else:
1182 runner = BasicTestRunner()
1183
1184 result = runner.run(suite)
1185 if not result.wasSuccessful():
1186 if len(result.errors) == 1 and not result.failures:
1187 err = result.errors[0][1]
1188 elif len(result.failures) == 1 and not result.errors:
1189 err = result.failures[0][1]
1190 else:
1191 err = "multiple errors occurred"
1192 if not verbose:
1193 err += "; run in verbose mode for details"
1194 raise TestFailed(err)
1195
1196
1197def run_unittest(*classes):
1198 """Run tests from unittest.TestCase-derived classes."""
1199 valid_types = (unittest.TestSuite, unittest.TestCase)
1200 suite = unittest.TestSuite()
1201 for cls in classes:
1202 if isinstance(cls, str):
1203 if cls in sys.modules:
1204 suite.addTest(unittest.findTestCases(sys.modules[cls]))
1205 else:
1206 raise ValueError("str arguments must be keys in sys.modules")
1207 elif isinstance(cls, valid_types):
1208 suite.addTest(cls)
1209 else:
1210 suite.addTest(unittest.makeSuite(cls))
1211 _run_suite(suite)
1212
1213#=======================================================================
1214# Check for the presence of docstrings.
1215
1216HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1217 sys.platform == 'win32' or
1218 sysconfig.get_config_var('WITH_DOC_STRINGS'))
1219
1220requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1221 "test requires docstrings")
1222
1223
1224#=======================================================================
1225# doctest driver.
1226
1227def run_doctest(module, verbosity=None):
1228 """Run doctest on the given module. Return (#failures, #tests).
1229
1230 If optional argument verbosity is not specified (or is None), pass
1231 test_support's belief about verbosity on to doctest. Else doctest's
1232 usual behavior is used (it searches sys.argv for -v).
1233 """
1234
1235 import doctest
1236
1237 if verbosity is None:
1238 verbosity = verbose
1239 else:
1240 verbosity = None
1241
1242 # Direct doctest output (normally just errors) to real stdout; doctest
1243 # output shouldn't be compared by regrtest.
1244 save_stdout = sys.stdout
1245 sys.stdout = get_original_stdout()
1246 try:
1247 f, t = doctest.testmod(module, verbose=verbosity)
1248 if f:
1249 raise TestFailed("%d of %d doctests failed" % (f, t))
1250 finally:
1251 sys.stdout = save_stdout
1252 if verbose:
1253 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1254 return f, t
1255
1256#=======================================================================
1257# Threading support to prevent reporting refleaks when running regrtest.py -R
1258
1259# NOTE: we use thread._count() rather than threading.enumerate() (or the
1260# moral equivalent thereof) because a threading.Thread object is still alive
1261# until its __bootstrap() method has returned, even after it has been
1262# unregistered from the threading module.
1263# thread._count(), on the other hand, only gets decremented *after* the
1264# __bootstrap() method has returned, which gives us reliable reference counts
1265# at the end of a test run.
1266
1267def threading_setup():
1268 if thread:
1269 return thread._count(),
1270 else:
1271 return 1,
1272
1273def threading_cleanup(nb_threads):
1274 if not thread:
1275 return
1276
1277 _MAX_COUNT = 10
1278 for count in range(_MAX_COUNT):
1279 n = thread._count()
1280 if n == nb_threads:
1281 break
1282 time.sleep(0.1)
1283 # XXX print a warning in case of failure?
1284
1285def reap_threads(func):
1286 """Use this function when threads are being used. This will
1287 ensure that the threads are cleaned up even when the test fails.
1288 If threading is unavailable this function does nothing.
1289 """
1290 if not thread:
1291 return func
1292
1293 @functools.wraps(func)
1294 def decorator(*args):
1295 key = threading_setup()
1296 try:
1297 return func(*args)
1298 finally:
1299 threading_cleanup(*key)
1300 return decorator
1301
1302def reap_children():
1303 """Use this function at the end of test_main() whenever sub-processes
1304 are started. This will help ensure that no extra children (zombies)
1305 stick around to hog resources and create problems when looking
1306 for refleaks.
1307 """
1308
1309 # Reap all our dead child processes so we don't leave zombies around.
1310 # These hog resources and might be causing some of the buildbots to die.
1311 if hasattr(os, 'waitpid'):
1312 any_process = -1
1313 while True:
1314 try:
1315 # This will raise an exception on Windows. That's ok.
1316 pid, status = os.waitpid(any_process, os.WNOHANG)
1317 if pid == 0:
1318 break
1319 except:
1320 break
1321
1322@contextlib.contextmanager
1323def swap_attr(obj, attr, new_val):
1324 """Temporary swap out an attribute with a new object.
1325
1326 Usage:
1327 with swap_attr(obj, "attr", 5):
1328 ...
1329
1330 This will set obj.attr to 5 for the duration of the with: block,
1331 restoring the old value at the end of the block. If `attr` doesn't
1332 exist on `obj`, it will be created and then deleted at the end of the
1333 block.
1334 """
1335 if hasattr(obj, attr):
1336 real_val = getattr(obj, attr)
1337 setattr(obj, attr, new_val)
1338 try:
1339 yield
1340 finally:
1341 setattr(obj, attr, real_val)
1342 else:
1343 setattr(obj, attr, new_val)
1344 try:
1345 yield
1346 finally:
1347 delattr(obj, attr)
1348
1349def py3k_bytes(b):
1350 """Emulate the py3k bytes() constructor.
1351
1352 NOTE: This is only a best effort function.
1353 """
1354 try:
1355 # memoryview?
1356 return b.tobytes()
1357 except AttributeError:
1358 try:
1359 # iterable of ints?
1360 return b"".join(chr(x) for x in b)
1361 except TypeError:
1362 return bytes(b)
1363
1364def args_from_interpreter_flags():
1365 """Return a list of command-line arguments reproducing the current
1366 settings in sys.flags."""
1367 import subprocess
1368 return subprocess._args_from_interpreter_flags()
1369
1370def strip_python_stderr(stderr):
1371 """Strip the stderr of a Python process from potential debug output
1372 emitted by the interpreter.
1373
1374 This will typically be run on the result of the communicate() method
1375 of a subprocess.Popen object.
1376 """
1377 stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1378 return stderr
Note: See TracBrowser for help on using the repository browser.