source: trunk/server/source3/stf/comfychair.py@ 1009

Last change on this file since 1009 was 414, checked in by Herwig Bauernfeind, 16 years ago

Samba 3.5.0: Initial import

File size: 13.2 KB
Line 
1#! /usr/bin/env python
2
3# Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
4# Copyright (C) 2003 by Tim Potter <tpot@samba.org>
5#
6# This program is free software; you can redistribute it and/or
7# modify it under the terms of the GNU General Public License as
8# published by the Free Software Foundation; either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, see <http://www.gnu.org/licenses/>.
18
19"""comfychair: a Python-based instrument of software torture.
20
21Copyright (C) 2002, 2003 by Martin Pool <mbp@samba.org>
22Copyright (C) 2003 by Tim Potter <tpot@samba.org>
23
24This is a test framework designed for testing programs written in
25Python, or (through a fork/exec interface) any other language.
26
27For more information, see the file README.comfychair.
28
29To run a test suite based on ComfyChair, just run it as a program.
30"""
31
32import sys, re
33
34
35class TestCase:
36 """A base class for tests. This class defines required functions which
37 can optionally be overridden by subclasses. It also provides some
38 utility functions for"""
39
40 def __init__(self):
41 self.test_log = ""
42 self.background_pids = []
43 self._cleanups = []
44 self._enter_rundir()
45 self._save_environment()
46 self.add_cleanup(self.teardown)
47
48
49 # --------------------------------------------------
50 # Save and restore directory
51 def _enter_rundir(self):
52 import os
53 self.basedir = os.getcwd()
54 self.add_cleanup(self._restore_directory)
55 self.rundir = os.path.join(self.basedir,
56 'testtmp',
57 self.__class__.__name__)
58 self.tmpdir = os.path.join(self.rundir, 'tmp')
59 os.system("rm -fr %s" % self.rundir)
60 os.makedirs(self.tmpdir)
61 os.system("mkdir -p %s" % self.rundir)
62 os.chdir(self.rundir)
63
64 def _restore_directory(self):
65 import os
66 os.chdir(self.basedir)
67
68 # --------------------------------------------------
69 # Save and restore environment
70 def _save_environment(self):
71 import os
72 self._saved_environ = os.environ.copy()
73 self.add_cleanup(self._restore_environment)
74
75 def _restore_environment(self):
76 import os
77 os.environ.clear()
78 os.environ.update(self._saved_environ)
79
80
81 def setup(self):
82 """Set up test fixture."""
83 pass
84
85 def teardown(self):
86 """Tear down test fixture."""
87 pass
88
89 def runtest(self):
90 """Run the test."""
91 pass
92
93
94 def add_cleanup(self, c):
95 """Queue a cleanup to be run when the test is complete."""
96 self._cleanups.append(c)
97
98
99 def fail(self, reason = ""):
100 """Say the test failed."""
101 raise AssertionError(reason)
102
103
104 #############################################################
105 # Requisition methods
106
107 def require(self, predicate, message):
108 """Check a predicate for running this test.
109
110If the predicate value is not true, the test is skipped with a message explaining
111why."""
112 if not predicate:
113 raise NotRunError, message
114
115 def require_root(self):
116 """Skip this test unless run by root."""
117 import os
118 self.require(os.getuid() == 0,
119 "must be root to run this test")
120
121 #############################################################
122 # Assertion methods
123
124 def assert_(self, expr, reason = ""):
125 if not expr:
126 raise AssertionError(reason)
127
128 def assert_equal(self, a, b):
129 if not a == b:
130 raise AssertionError("assertEquals failed: %s" % `(a, b)`)
131
132 def assert_notequal(self, a, b):
133 if a == b:
134 raise AssertionError("assertNotEqual failed: %s" % `(a, b)`)
135
136 def assert_re_match(self, pattern, s):
137 """Assert that a string matches a particular pattern
138
139 Inputs:
140 pattern string: regular expression
141 s string: to be matched
142
143 Raises:
144 AssertionError if not matched
145 """
146 if not re.match(pattern, s):
147 raise AssertionError("string does not match regexp\n"
148 " string: %s\n"
149 " re: %s" % (`s`, `pattern`))
150
151 def assert_re_search(self, pattern, s):
152 """Assert that a string *contains* a particular pattern
153
154 Inputs:
155 pattern string: regular expression
156 s string: to be searched
157
158 Raises:
159 AssertionError if not matched
160 """
161 if not re.search(pattern, s):
162 raise AssertionError("string does not contain regexp\n"
163 " string: %s\n"
164 " re: %s" % (`s`, `pattern`))
165
166
167 def assert_no_file(self, filename):
168 import os.path
169 assert not os.path.exists(filename), ("file exists but should not: %s" % filename)
170
171
172 #############################################################
173 # Methods for running programs
174
175 def runcmd_background(self, cmd):
176 import os
177 self.test_log = self.test_log + "Run in background:\n" + `cmd` + "\n"
178 pid = os.fork()
179 if pid == 0:
180 # child
181 try:
182 os.execvp("/bin/sh", ["/bin/sh", "-c", cmd])
183 finally:
184 os._exit(127)
185 self.test_log = self.test_log + "pid: %d\n" % pid
186 return pid
187
188
189 def runcmd(self, cmd, expectedResult = 0):
190 """Run a command, fail if the command returns an unexpected exit
191 code. Return the output produced."""
192 rc, output, stderr = self.runcmd_unchecked(cmd)
193 if rc != expectedResult:
194 raise AssertionError("""command returned %d; expected %s: \"%s\"
195stdout:
196%s
197stderr:
198%s""" % (rc, expectedResult, cmd, output, stderr))
199
200 return output, stderr
201
202
203 def run_captured(self, cmd):
204 """Run a command, capturing stdout and stderr.
205
206 Based in part on popen2.py
207
208 Returns (waitstatus, stdout, stderr)."""
209 import os, types
210 pid = os.fork()
211 if pid == 0:
212 # child
213 try:
214 pid = os.getpid()
215 openmode = os.O_WRONLY|os.O_CREAT|os.O_TRUNC
216
217 outfd = os.open('%d.out' % pid, openmode, 0666)
218 os.dup2(outfd, 1)
219 os.close(outfd)
220
221 errfd = os.open('%d.err' % pid, openmode, 0666)
222 os.dup2(errfd, 2)
223 os.close(errfd)
224
225 if isinstance(cmd, types.StringType):
226 cmd = ['/bin/sh', '-c', cmd]
227
228 os.execvp(cmd[0], cmd)
229 finally:
230 os._exit(127)
231 else:
232 # parent
233 exited_pid, waitstatus = os.waitpid(pid, 0)
234 stdout = open('%d.out' % pid).read()
235 stderr = open('%d.err' % pid).read()
236 return waitstatus, stdout, stderr
237
238
239 def runcmd_unchecked(self, cmd, skip_on_noexec = 0):
240 """Invoke a command; return (exitcode, stdout, stderr)"""
241 import os
242 waitstatus, stdout, stderr = self.run_captured(cmd)
243 assert not os.WIFSIGNALED(waitstatus), \
244 ("%s terminated with signal %d" % (`cmd`, os.WTERMSIG(waitstatus)))
245 rc = os.WEXITSTATUS(waitstatus)
246 self.test_log = self.test_log + ("""Run command: %s
247Wait status: %#x (exit code %d, signal %d)
248stdout:
249%s
250stderr:
251%s""" % (cmd, waitstatus, os.WEXITSTATUS(waitstatus), os.WTERMSIG(waitstatus),
252 stdout, stderr))
253 if skip_on_noexec and rc == 127:
254 # Either we could not execute the command or the command
255 # returned exit code 127. According to system(3) we can't
256 # tell the difference.
257 raise NotRunError, "could not execute %s" % `cmd`
258 return rc, stdout, stderr
259
260
261 def explain_failure(self, exc_info = None):
262 print "test_log:"
263 print self.test_log
264
265
266 def log(self, msg):
267 """Log a message to the test log. This message is displayed if
268 the test fails, or when the runtests function is invoked with
269 the verbose option."""
270 self.test_log = self.test_log + msg + "\n"
271
272
273class NotRunError(Exception):
274 """Raised if a test must be skipped because of missing resources"""
275 def __init__(self, value = None):
276 self.value = value
277
278
279def _report_error(case, debugger):
280 """Ask the test case to explain failure, and optionally run a debugger
281
282 Input:
283 case TestCase instance
284 debugger if true, a debugger function to be applied to the traceback
285"""
286 import sys
287 ex = sys.exc_info()
288 print "-----------------------------------------------------------------"
289 if ex:
290 import traceback
291 traceback.print_exc(file=sys.stdout)
292 case.explain_failure()
293 print "-----------------------------------------------------------------"
294
295 if debugger:
296 tb = ex[2]
297 debugger(tb)
298
299
300def runtests(test_list, verbose = 0, debugger = None):
301 """Run a series of tests.
302
303 Inputs:
304 test_list sequence of TestCase classes
305 verbose print more information as testing proceeds
306 debugger debugger object to be applied to errors
307
308 Returns:
309 unix return code: 0 for success, 1 for failures, 2 for test failure
310 """
311 import traceback
312 ret = 0
313 for test_class in test_list:
314 print "%-30s" % _test_name(test_class),
315 # flush now so that long running tests are easier to follow
316 sys.stdout.flush()
317
318 obj = None
319 try:
320 try: # run test and show result
321 obj = test_class()
322 obj.setup()
323 obj.runtest()
324 print "OK"
325 except KeyboardInterrupt:
326 print "INTERRUPT"
327 _report_error(obj, debugger)
328 ret = 2
329 break
330 except NotRunError, msg:
331 print "NOTRUN, %s" % msg.value
332 except:
333 print "FAIL"
334 _report_error(obj, debugger)
335 ret = 1
336 finally:
337 while obj and obj._cleanups:
338 try:
339 apply(obj._cleanups.pop())
340 except KeyboardInterrupt:
341 print "interrupted during teardown"
342 _report_error(obj, debugger)
343 ret = 2
344 break
345 except:
346 print "error during teardown"
347 _report_error(obj, debugger)
348 ret = 1
349 # Display log file if we're verbose
350 if ret == 0 and verbose:
351 obj.explain_failure()
352
353 return ret
354
355
356def _test_name(test_class):
357 """Return a human-readable name for a test class.
358 """
359 try:
360 return test_class.__name__
361 except:
362 return `test_class`
363
364
365def print_help():
366 """Help for people running tests"""
367 import sys
368 print """%s: software test suite based on ComfyChair
369
370usage:
371 To run all tests, just run this program. To run particular tests,
372 list them on the command line.
373
374options:
375 --help show usage message
376 --list list available tests
377 --verbose, -v show more information while running tests
378 --post-mortem, -p enter Python debugger on error
379""" % sys.argv[0]
380
381
382def print_list(test_list):
383 """Show list of available tests"""
384 for test_class in test_list:
385 print " %s" % _test_name(test_class)
386
387
388def main(tests, extra_tests=[]):
389 """Main entry point for test suites based on ComfyChair.
390
391 inputs:
392 tests Sequence of TestCase subclasses to be run by default.
393 extra_tests Sequence of TestCase subclasses that are available but
394 not run by default.
395
396Test suites should contain this boilerplate:
397
398 if __name__ == '__main__':
399 comfychair.main(tests)
400
401This function handles standard options such as --help and --list, and
402by default runs all tests in the suggested order.
403
404Calls sys.exit() on completion.
405"""
406 from sys import argv
407 import getopt, sys
408
409 opt_verbose = 0
410 debugger = None
411
412 opts, args = getopt.getopt(argv[1:], 'pv',
413 ['help', 'list', 'verbose', 'post-mortem'])
414 for opt, opt_arg in opts:
415 if opt == '--help':
416 print_help()
417 return
418 elif opt == '--list':
419 print_list(tests + extra_tests)
420 return
421 elif opt == '--verbose' or opt == '-v':
422 opt_verbose = 1
423 elif opt == '--post-mortem' or opt == '-p':
424 import pdb
425 debugger = pdb.post_mortem
426
427 if args:
428 all_tests = tests + extra_tests
429 by_name = {}
430 for t in all_tests:
431 by_name[_test_name(t)] = t
432 which_tests = []
433 for name in args:
434 which_tests.append(by_name[name])
435 else:
436 which_tests = tests
437
438 sys.exit(runtests(which_tests, verbose=opt_verbose,
439 debugger=debugger))
440
441
442if __name__ == '__main__':
443 print __doc__
Note: See TracBrowser for help on using the repository browser.