Changeset 740 for vendor/current/lib/subunit
- Timestamp:
- Nov 14, 2012, 12:59:34 PM (13 years ago)
- Location:
- vendor/current/lib/subunit
- Files:
-
- 71 added
- 1 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
vendor/current/lib/subunit/python/subunit/__init__.py
r414 r740 1 1 # 2 # subunit: extensions to python unittest to get test results from subprocesses.2 # subunit: extensions to Python unittest to get test results from subprocesses. 3 3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> 4 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>5 4 # 6 # This program is free software; you can redistribute it and/or modify 7 # it under the terms of the GNU General Public License as published by 8 # the Free Software Foundation; either version 3 of the License, or 9 # (at your option) any later version. 5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 6 # license at the users choice. A copy of both licenses are available in the 7 # project source as Apache-2.0 and BSD. You may not use this file except in 8 # compliance with one of these two licences. 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 # license you chose for the specific language governing permissions and 14 # limitations under that license. 10 15 # 11 # This program is distributed in the hope that it will be useful, 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 # GNU General Public License for more details. 15 # 16 # You should have received a copy of the GNU General Public License 17 # along with this program; if not, write to the Free Software 18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 19 # 20 16 17 """Subunit - a streaming test protocol 18 19 Overview 20 ++++++++ 21 22 The ``subunit`` Python package provides a number of ``unittest`` extensions 23 which can be used to cause tests to output Subunit, to parse Subunit streams 24 into test activity, perform seamless test isolation within a regular test 25 case and variously sort, filter and report on test runs. 26 27 28 Key Classes 29 ----------- 30 31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult`` 32 extension which will translate a test run into a Subunit stream. 33 34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire 35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate 36 a stream into a test run, which regular ``unittest.TestResult`` objects can 37 process and report/inspect. 38 39 Subunit has support for non-blocking usage too, for use with asyncore or 40 Twisted. See the ``TestProtocolServer`` parser class for more details. 41 42 Subunit includes extensions to the Python ``TestResult`` protocol. These are 43 all done in a compatible manner: ``TestResult`` objects that do not implement 44 the extension methods will not cause errors to be raised, instead the extension 45 will either lose fidelity (for instance, folding expected failures to success 46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra 47 details, tags, timestamping and progress markers). 48 49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``, 50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details`` 51 which can be used instead of the usual python unittest parameter. 52 When used the value of details should be a dict from ``string`` to 53 ``testtools.content.Content`` objects. This is a draft API being worked on with 54 the Python Testing In Python mail list, with the goal of permitting a common 55 way to provide additional data beyond a traceback, such as captured data from 56 disk, logging messages etc. The reference for this API is in testtools (0.9.0 57 and newer). 58 59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or 60 remove tags in the test run that is currently executing. If called when no 61 test is in progress (that is, if called outside of the ``startTest``, 62 ``stopTest`` pair), the the tags apply to all sebsequent tests. If called 63 when a test is in progress, then the tags only apply to that test. 64 65 The ``time(a_datetime)`` method is called (if present) when a ``time:`` 66 directive is encountered in a Subunit stream. This is used to tell a TestResult 67 about the time that events in the stream occured at, to allow reconstructing 68 test timing from a stream. 69 70 The ``progress(offset, whence)`` method controls progress data for a stream. 71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR, 72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations 73 ignore the offset parameter. 74 75 76 Python test support 77 ------------------- 78 79 ``subunit.run`` is a convenience wrapper to run a Python test suite via 80 the command line, reporting via Subunit:: 81 82 $ python -m subunit.run mylib.tests.test_suite 83 84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its 85 tests, allowing isolation between the test runner and some tests. 86 87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get 88 tests that will fork() before that individual test is run. 89 90 `ExecTestCase`` is a convenience wrapper for running an external 91 program to get a Subunit stream and then report that back to an arbitrary 92 result object:: 93 94 class AggregateTests(subunit.ExecTestCase): 95 96 def test_script_one(self): 97 './bin/script_one' 98 99 def test_script_two(self): 100 './bin/script_two' 101 102 # Normally your normal test loading would take of this automatically, 103 # It is only spelt out in detail here for clarity. 104 suite = unittest.TestSuite([AggregateTests("test_script_one"), 105 AggregateTests("test_script_two")]) 106 # Create any TestResult class you like. 107 result = unittest._TextTestResult(sys.stdout) 108 # And run your suite as normal, Subunit will exec each external script as 109 # needed and report to your result object. 110 suite.run(result) 111 112 Utility modules 113 --------------- 114 115 * subunit.chunked contains HTTP chunked encoding/decoding logic. 116 * subunit.test_results contains TestResult helper classes. 117 """ 118 119 import datetime 21 120 import os 121 import re 22 122 from StringIO import StringIO 123 import subprocess 23 124 import sys 24 125 import unittest 126 127 import iso8601 128 from testtools import content, content_type, ExtendedToOriginalDecorator 129 try: 130 from testtools.testresult.real import _StringException 131 RemoteException = _StringException 132 _remote_exception_str = '_StringException' # For testing. 133 except ImportError: 134 raise ImportError ("testtools.testresult.real does not contain " 135 "_StringException, check your version.") 136 from testtools import testresult 137 138 import chunked, details, test_results 139 140 141 PROGRESS_SET = 0 142 PROGRESS_CUR = 1 143 PROGRESS_PUSH = 2 144 PROGRESS_POP = 3 145 25 146 26 147 def test_suite(): … … 43 164 44 165 166 def tags_to_new_gone(tags): 167 """Split a list of tags into a new_set and a gone_set.""" 168 new_tags = set() 169 gone_tags = set() 170 for tag in tags: 171 if tag[0] == '-': 172 gone_tags.add(tag[1:]) 173 else: 174 new_tags.add(tag) 175 return new_tags, gone_tags 176 177 178 class DiscardStream(object): 179 """A filelike object which discards what is written to it.""" 180 181 def write(self, bytes): 182 pass 183 184 185 class _ParserState(object): 186 """State for the subunit parser.""" 187 188 def __init__(self, parser): 189 self.parser = parser 190 191 def addError(self, offset, line): 192 """An 'error:' directive has been read.""" 193 self.parser.stdOutLineReceived(line) 194 195 def addExpectedFail(self, offset, line): 196 """An 'xfail:' directive has been read.""" 197 self.parser.stdOutLineReceived(line) 198 199 def addFailure(self, offset, line): 200 """A 'failure:' directive has been read.""" 201 self.parser.stdOutLineReceived(line) 202 203 def addSkip(self, offset, line): 204 """A 'skip:' directive has been read.""" 205 self.parser.stdOutLineReceived(line) 206 207 def addSuccess(self, offset, line): 208 """A 'success:' directive has been read.""" 209 self.parser.stdOutLineReceived(line) 210 211 def lineReceived(self, line): 212 """a line has been received.""" 213 parts = line.split(None, 1) 214 if len(parts) == 2 and line.startswith(parts[0]): 215 cmd, rest = parts 216 offset = len(cmd) + 1 217 cmd = cmd.rstrip(':') 218 if cmd in ('test', 'testing'): 219 self.startTest(offset, line) 220 elif cmd == 'error': 221 self.addError(offset, line) 222 elif cmd == 'failure': 223 self.addFailure(offset, line) 224 elif cmd == 'progress': 225 self.parser._handleProgress(offset, line) 226 elif cmd == 'skip': 227 self.addSkip(offset, line) 228 elif cmd in ('success', 'successful'): 229 self.addSuccess(offset, line) 230 elif cmd in ('tags',): 231 self.parser._handleTags(offset, line) 232 self.parser.subunitLineReceived(line) 233 elif cmd in ('time',): 234 self.parser._handleTime(offset, line) 235 self.parser.subunitLineReceived(line) 236 elif cmd == 'xfail': 237 self.addExpectedFail(offset, line) 238 else: 239 self.parser.stdOutLineReceived(line) 240 else: 241 self.parser.stdOutLineReceived(line) 242 243 def lostConnection(self): 244 """Connection lost.""" 245 self.parser._lostConnectionInTest(u'unknown state of ') 246 247 def startTest(self, offset, line): 248 """A test start command received.""" 249 self.parser.stdOutLineReceived(line) 250 251 252 class _InTest(_ParserState): 253 """State for the subunit parser after reading a test: directive.""" 254 255 def _outcome(self, offset, line, no_details, details_state): 256 """An outcome directive has been read. 257 258 :param no_details: Callable to call when no details are presented. 259 :param details_state: The state to switch to for details 260 processing of this outcome. 261 """ 262 if self.parser.current_test_description == line[offset:-1]: 263 self.parser._state = self.parser._outside_test 264 self.parser.current_test_description = None 265 no_details() 266 self.parser.client.stopTest(self.parser._current_test) 267 self.parser._current_test = None 268 self.parser.subunitLineReceived(line) 269 elif self.parser.current_test_description + " [" == line[offset:-1]: 270 self.parser._state = details_state 271 details_state.set_simple() 272 self.parser.subunitLineReceived(line) 273 elif self.parser.current_test_description + " [ multipart" == \ 274 line[offset:-1]: 275 self.parser._state = details_state 276 details_state.set_multipart() 277 self.parser.subunitLineReceived(line) 278 else: 279 self.parser.stdOutLineReceived(line) 280 281 def _error(self): 282 self.parser.client.addError(self.parser._current_test, 283 details={}) 284 285 def addError(self, offset, line): 286 """An 'error:' directive has been read.""" 287 self._outcome(offset, line, self._error, 288 self.parser._reading_error_details) 289 290 def _xfail(self): 291 self.parser.client.addExpectedFailure(self.parser._current_test, 292 details={}) 293 294 def addExpectedFail(self, offset, line): 295 """An 'xfail:' directive has been read.""" 296 self._outcome(offset, line, self._xfail, 297 self.parser._reading_xfail_details) 298 299 def _failure(self): 300 self.parser.client.addFailure(self.parser._current_test, details={}) 301 302 def addFailure(self, offset, line): 303 """A 'failure:' directive has been read.""" 304 self._outcome(offset, line, self._failure, 305 self.parser._reading_failure_details) 306 307 def _skip(self): 308 self.parser.client.addSkip(self.parser._current_test, details={}) 309 310 def addSkip(self, offset, line): 311 """A 'skip:' directive has been read.""" 312 self._outcome(offset, line, self._skip, 313 self.parser._reading_skip_details) 314 315 def _succeed(self): 316 self.parser.client.addSuccess(self.parser._current_test, details={}) 317 318 def addSuccess(self, offset, line): 319 """A 'success:' directive has been read.""" 320 self._outcome(offset, line, self._succeed, 321 self.parser._reading_success_details) 322 323 def lostConnection(self): 324 """Connection lost.""" 325 self.parser._lostConnectionInTest(u'') 326 327 328 class _OutSideTest(_ParserState): 329 """State for the subunit parser outside of a test context.""" 330 331 def lostConnection(self): 332 """Connection lost.""" 333 334 def startTest(self, offset, line): 335 """A test start command received.""" 336 self.parser._state = self.parser._in_test 337 self.parser._current_test = RemotedTestCase(line[offset:-1]) 338 self.parser.current_test_description = line[offset:-1] 339 self.parser.client.startTest(self.parser._current_test) 340 self.parser.subunitLineReceived(line) 341 342 343 class _ReadingDetails(_ParserState): 344 """Common logic for readin state details.""" 345 346 def endDetails(self): 347 """The end of a details section has been reached.""" 348 self.parser._state = self.parser._outside_test 349 self.parser.current_test_description = None 350 self._report_outcome() 351 self.parser.client.stopTest(self.parser._current_test) 352 353 def lineReceived(self, line): 354 """a line has been received.""" 355 self.details_parser.lineReceived(line) 356 self.parser.subunitLineReceived(line) 357 358 def lostConnection(self): 359 """Connection lost.""" 360 self.parser._lostConnectionInTest(u'%s report of ' % 361 self._outcome_label()) 362 363 def _outcome_label(self): 364 """The label to describe this outcome.""" 365 raise NotImplementedError(self._outcome_label) 366 367 def set_simple(self): 368 """Start a simple details parser.""" 369 self.details_parser = details.SimpleDetailsParser(self) 370 371 def set_multipart(self): 372 """Start a multipart details parser.""" 373 self.details_parser = details.MultipartDetailsParser(self) 374 375 376 class _ReadingFailureDetails(_ReadingDetails): 377 """State for the subunit parser when reading failure details.""" 378 379 def _report_outcome(self): 380 self.parser.client.addFailure(self.parser._current_test, 381 details=self.details_parser.get_details()) 382 383 def _outcome_label(self): 384 return "failure" 385 386 387 class _ReadingErrorDetails(_ReadingDetails): 388 """State for the subunit parser when reading error details.""" 389 390 def _report_outcome(self): 391 self.parser.client.addError(self.parser._current_test, 392 details=self.details_parser.get_details()) 393 394 def _outcome_label(self): 395 return "error" 396 397 398 class _ReadingExpectedFailureDetails(_ReadingDetails): 399 """State for the subunit parser when reading xfail details.""" 400 401 def _report_outcome(self): 402 self.parser.client.addExpectedFailure(self.parser._current_test, 403 details=self.details_parser.get_details()) 404 405 def _outcome_label(self): 406 return "xfail" 407 408 409 class _ReadingSkipDetails(_ReadingDetails): 410 """State for the subunit parser when reading skip details.""" 411 412 def _report_outcome(self): 413 self.parser.client.addSkip(self.parser._current_test, 414 details=self.details_parser.get_details("skip")) 415 416 def _outcome_label(self): 417 return "skip" 418 419 420 class _ReadingSuccessDetails(_ReadingDetails): 421 """State for the subunit parser when reading success details.""" 422 423 def _report_outcome(self): 424 self.parser.client.addSuccess(self.parser._current_test, 425 details=self.details_parser.get_details("success")) 426 427 def _outcome_label(self): 428 return "success" 429 430 45 431 class TestProtocolServer(object): 46 """A class for receiving results from a TestProtocol client."""47 48 OUTSIDE_TEST = 049 TEST_STARTED = 150 READING_FAILURE = 2 51 READING_ERROR = 352 53 def __init__(self, client, stream=sys.stdout): 54 """Create a TestProtocol server instance.55 56 client should be an object that provides57 - startTest58 - addSuccess59 - addFailure60 - addError61 - stopTest62 methods, i.e. a TestResult.432 """A parser for subunit. 433 434 :ivar tags: The current tags associated with the protocol stream. 435 """ 436 437 def __init__(self, client, stream=None, forward_stream=None): 438 """Create a TestProtocolServer instance. 439 440 :param client: An object meeting the unittest.TestResult protocol. 441 :param stream: The stream that lines received which are not part of the 442 subunit protocol should be written to. This allows custom handling 443 of mixed protocols. By default, sys.stdout will be used for 444 convenience. 445 :param forward_stream: A stream to forward subunit lines to. This 446 allows a filter to forward the entire stream while still parsing 447 and acting on it. By default forward_stream is set to 448 DiscardStream() and no forwarding happens. 63 449 """ 64 self.state = TestProtocolServer.OUTSIDE_TEST 65 self.client = client 450 self.client = ExtendedToOriginalDecorator(client) 451 if stream is None: 452 stream = sys.stdout 66 453 self._stream = stream 67 68 def _addError(self, offset, line): 69 if (self.state == TestProtocolServer.TEST_STARTED and 70 self.current_test_description == line[offset:-1]): 71 self.state = TestProtocolServer.OUTSIDE_TEST 72 self.current_test_description = None 73 self.client.addError(self._current_test, RemoteError("")) 74 self.client.stopTest(self._current_test) 75 self._current_test = None 76 elif (self.state == TestProtocolServer.TEST_STARTED and 77 self.current_test_description + " [" == line[offset:-1]): 78 self.state = TestProtocolServer.READING_ERROR 79 self._message = "" 454 self._forward_stream = forward_stream or DiscardStream() 455 # state objects we can switch too 456 self._in_test = _InTest(self) 457 self._outside_test = _OutSideTest(self) 458 self._reading_error_details = _ReadingErrorDetails(self) 459 self._reading_failure_details = _ReadingFailureDetails(self) 460 self._reading_skip_details = _ReadingSkipDetails(self) 461 self._reading_success_details = _ReadingSuccessDetails(self) 462 self._reading_xfail_details = _ReadingExpectedFailureDetails(self) 463 # start with outside test. 464 self._state = self._outside_test 465 466 def _handleProgress(self, offset, line): 467 """Process a progress directive.""" 468 line = line[offset:].strip() 469 if line[0] in '+-': 470 whence = PROGRESS_CUR 471 delta = int(line) 472 elif line == "push": 473 whence = PROGRESS_PUSH 474 delta = None 475 elif line == "pop": 476 whence = PROGRESS_POP 477 delta = None 80 478 else: 81 self.stdOutLineReceived(line) 82 83 def _addFailure(self, offset, line): 84 if (self.state == TestProtocolServer.TEST_STARTED and 85 self.current_test_description == line[offset:-1]): 86 self.state = TestProtocolServer.OUTSIDE_TEST 87 self.current_test_description = None 88 self.client.addFailure(self._current_test, RemoteError()) 89 self.client.stopTest(self._current_test) 90 elif (self.state == TestProtocolServer.TEST_STARTED and 91 self.current_test_description + " [" == line[offset:-1]): 92 self.state = TestProtocolServer.READING_FAILURE 93 self._message = "" 94 else: 95 self.stdOutLineReceived(line) 96 97 def _addSuccess(self, offset, line): 98 if (self.state == TestProtocolServer.TEST_STARTED and 99 self.current_test_description == line[offset:-1]): 100 self.client.addSuccess(self._current_test) 101 self.client.stopTest(self._current_test) 102 self.current_test_description = None 103 self._current_test = None 104 self.state = TestProtocolServer.OUTSIDE_TEST 105 else: 106 self.stdOutLineReceived(line) 107 108 def _appendMessage(self, line): 109 if line[0:2] == " ]": 110 # quoted ] start 111 self._message += line[1:] 112 else: 113 self._message += line 114 115 def endQuote(self, line): 116 if self.state == TestProtocolServer.READING_FAILURE: 117 self.state = TestProtocolServer.OUTSIDE_TEST 118 self.current_test_description = None 119 self.client.addFailure(self._current_test, 120 RemoteError(self._message)) 121 self.client.stopTest(self._current_test) 122 elif self.state == TestProtocolServer.READING_ERROR: 123 self.state = TestProtocolServer.OUTSIDE_TEST 124 self.current_test_description = None 125 self.client.addError(self._current_test, 126 RemoteError(self._message)) 127 self.client.stopTest(self._current_test) 128 else: 129 self.stdOutLineReceived(line) 479 whence = PROGRESS_SET 480 delta = int(line) 481 self.client.progress(delta, whence) 482 483 def _handleTags(self, offset, line): 484 """Process a tags command.""" 485 tags = line[offset:].split() 486 new_tags, gone_tags = tags_to_new_gone(tags) 487 self.client.tags(new_tags, gone_tags) 488 489 def _handleTime(self, offset, line): 490 # Accept it, but do not do anything with it yet. 491 try: 492 event_time = iso8601.parse_date(line[offset:-1]) 493 except TypeError, e: 494 raise TypeError("Failed to parse %r, got %r" % (line, e)) 495 self.client.time(event_time) 130 496 131 497 def lineReceived(self, line): 132 498 """Call the appropriate local method for the received line.""" 133 if line == "]\n": 134 self.endQuote(line) 135 elif (self.state == TestProtocolServer.READING_FAILURE or 136 self.state == TestProtocolServer.READING_ERROR): 137 self._appendMessage(line) 138 else: 139 parts = line.split(None, 1) 140 if len(parts) == 2: 141 cmd, rest = parts 142 offset = len(cmd) + 1 143 cmd = cmd.strip(':') 144 if cmd in ('test', 'testing'): 145 self._startTest(offset, line) 146 elif cmd == 'error': 147 self._addError(offset, line) 148 elif cmd == 'failure': 149 self._addFailure(offset, line) 150 elif cmd in ('success', 'successful'): 151 self._addSuccess(offset, line) 152 else: 153 self.stdOutLineReceived(line) 154 else: 155 self.stdOutLineReceived(line) 499 self._state.lineReceived(line) 500 501 def _lostConnectionInTest(self, state_string): 502 error_string = u"lost connection during %stest '%s'" % ( 503 state_string, self.current_test_description) 504 self.client.addError(self._current_test, RemoteError(error_string)) 505 self.client.stopTest(self._current_test) 156 506 157 507 def lostConnection(self): 158 508 """The input connection has finished.""" 159 if self.state == TestProtocolServer.TEST_STARTED: 160 self.client.addError(self._current_test, 161 RemoteError("lost connection during test '%s'" 162 % self.current_test_description)) 163 self.client.stopTest(self._current_test) 164 elif self.state == TestProtocolServer.READING_ERROR: 165 self.client.addError(self._current_test, 166 RemoteError("lost connection during " 167 "error report of test " 168 "'%s'" % 169 self.current_test_description)) 170 self.client.stopTest(self._current_test) 171 elif self.state == TestProtocolServer.READING_FAILURE: 172 self.client.addError(self._current_test, 173 RemoteError("lost connection during " 174 "failure report of test " 175 "'%s'" % 176 self.current_test_description)) 177 self.client.stopTest(self._current_test) 509 self._state.lostConnection() 178 510 179 511 def readFrom(self, pipe): 512 """Blocking convenience API to parse an entire stream. 513 514 :param pipe: A file-like object supporting readlines(). 515 :return: None. 516 """ 180 517 for line in pipe.readlines(): 181 518 self.lineReceived(line) … … 184 521 def _startTest(self, offset, line): 185 522 """Internal call to change state machine. Override startTest().""" 186 if self.state == TestProtocolServer.OUTSIDE_TEST: 187 self.state = TestProtocolServer.TEST_STARTED 188 self._current_test = RemotedTestCase(line[offset:-1]) 189 self.current_test_description = line[offset:-1] 190 self.client.startTest(self._current_test) 191 else: 192 self.stdOutLineReceived(line) 523 self._state.startTest(offset, line) 524 525 def subunitLineReceived(self, line): 526 self._forward_stream.write(line) 193 527 194 528 def stdOutLineReceived(self, line): … … 196 530 197 531 198 class RemoteException(Exception): 199 """An exception that occured remotely to python.""" 200 201 def __eq__(self, other): 202 try: 203 return self.args == other.args 204 except AttributeError: 205 return False 206 207 208 class TestProtocolClient(unittest.TestResult): 209 """A class that looks like a TestResult and informs a TestProtocolServer.""" 532 class TestProtocolClient(testresult.TestResult): 533 """A TestResult which generates a subunit stream for a test run. 534 535 # Get a TestSuite or TestCase to run 536 suite = make_suite() 537 # Create a stream (any object with a 'write' method) 538 stream = file('tests.log', 'wb') 539 # Create a subunit result object which will output to the stream 540 result = subunit.TestProtocolClient(stream) 541 # Optionally, to get timing data for performance analysis, wrap the 542 # serialiser with a timing decorator 543 result = subunit.test_results.AutoTimingTestResultDecorator(result) 544 # Run the test suite reporting to the subunit result object 545 suite.run(result) 546 # Close the stream. 547 stream.close() 548 """ 210 549 211 550 def __init__(self, stream): 212 super(TestProtocolClient, self).__init__()551 testresult.TestResult.__init__(self) 213 552 self._stream = stream 214 215 def addError(self, test, error): 216 """Report an error in test test.""" 217 self._stream.write("error: %s [\n" % (test.shortDescription() or str(test))) 218 for line in self._exc_info_to_string(error, test).splitlines(): 219 self._stream.write("%s\n" % line) 553 _make_stream_binary(stream) 554 555 def addError(self, test, error=None, details=None): 556 """Report an error in test test. 557 558 Only one of error and details should be provided: conceptually there 559 are two separate methods: 560 addError(self, test, error) 561 addError(self, test, details) 562 563 :param error: Standard unittest positional argument form - an 564 exc_info tuple. 565 :param details: New Testing-in-python drafted API; a dict from string 566 to subunit.Content objects. 567 """ 568 self._addOutcome("error", test, error=error, details=details) 569 570 def addExpectedFailure(self, test, error=None, details=None): 571 """Report an expected failure in test test. 572 573 Only one of error and details should be provided: conceptually there 574 are two separate methods: 575 addError(self, test, error) 576 addError(self, test, details) 577 578 :param error: Standard unittest positional argument form - an 579 exc_info tuple. 580 :param details: New Testing-in-python drafted API; a dict from string 581 to subunit.Content objects. 582 """ 583 self._addOutcome("xfail", test, error=error, details=details) 584 585 def addFailure(self, test, error=None, details=None): 586 """Report a failure in test test. 587 588 Only one of error and details should be provided: conceptually there 589 are two separate methods: 590 addFailure(self, test, error) 591 addFailure(self, test, details) 592 593 :param error: Standard unittest positional argument form - an 594 exc_info tuple. 595 :param details: New Testing-in-python drafted API; a dict from string 596 to subunit.Content objects. 597 """ 598 self._addOutcome("failure", test, error=error, details=details) 599 600 def _addOutcome(self, outcome, test, error=None, details=None): 601 """Report a failure in test test. 602 603 Only one of error and details should be provided: conceptually there 604 are two separate methods: 605 addOutcome(self, test, error) 606 addOutcome(self, test, details) 607 608 :param outcome: A string describing the outcome - used as the 609 event name in the subunit stream. 610 :param error: Standard unittest positional argument form - an 611 exc_info tuple. 612 :param details: New Testing-in-python drafted API; a dict from string 613 to subunit.Content objects. 614 """ 615 self._stream.write("%s: %s" % (outcome, test.id())) 616 if error is None and details is None: 617 raise ValueError 618 if error is not None: 619 self._stream.write(" [\n") 620 # XXX: this needs to be made much stricter, along the lines of 621 # Martin[gz]'s work in testtools. Perhaps subunit can use that? 622 for line in self._exc_info_to_unicode(error, test).splitlines(): 623 self._stream.write(("%s\n" % line).encode('utf8')) 624 else: 625 self._write_details(details) 220 626 self._stream.write("]\n") 221 super(TestProtocolClient, self).addError(test, error) 222 223 def addFailure(self, test, error):224 """Report a failure in test test."""225 self._stream.write("failure: %s [\n" % (test.shortDescription() or str(test)))226 for line in self._exc_info_to_string(error, test).splitlines():227 self._stream.write(" %s\n" % line)228 self._stream.write("]\n")229 super(TestProtocolClient, self).addFailure(test, error)230 231 def addSuccess(self, test ):627 628 def addSkip(self, test, reason=None, details=None): 629 """Report a skipped test.""" 630 if reason is None: 631 self._addOutcome("skip", test, error=None, details=details) 632 else: 633 self._stream.write("skip: %s [\n" % test.id()) 634 self._stream.write("%s\n" % reason) 635 self._stream.write("]\n") 636 637 def addSuccess(self, test, details=None): 232 638 """Report a success in a test.""" 233 self._stream.write("successful: %s\n" % (test.shortDescription() or str(test))) 234 super(TestProtocolClient, self).addSuccess(test) 639 self._stream.write("successful: %s" % test.id()) 640 if not details: 641 self._stream.write("\n") 642 else: 643 self._write_details(details) 644 self._stream.write("]\n") 645 addUnexpectedSuccess = addSuccess 235 646 236 647 def startTest(self, test): 237 648 """Mark a test as starting its test run.""" 238 self._stream.write("test: %s\n" % (test.shortDescription() or str(test)))239 649 super(TestProtocolClient, self).startTest(test) 240 241 242 def RemoteError(description=""): 243 if description == "": 244 description = "\n" 245 return (RemoteException, RemoteException(description), None) 650 self._stream.write("test: %s\n" % test.id()) 651 self._stream.flush() 652 653 def stopTest(self, test): 654 super(TestProtocolClient, self).stopTest(test) 655 self._stream.flush() 656 657 def progress(self, offset, whence): 658 """Provide indication about the progress/length of the test run. 659 660 :param offset: Information about the number of tests remaining. If 661 whence is PROGRESS_CUR, then offset increases/decreases the 662 remaining test count. If whence is PROGRESS_SET, then offset 663 specifies exactly the remaining test count. 664 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH, 665 PROGRESS_POP. 666 """ 667 if whence == PROGRESS_CUR and offset > -1: 668 prefix = "+" 669 elif whence == PROGRESS_PUSH: 670 prefix = "" 671 offset = "push" 672 elif whence == PROGRESS_POP: 673 prefix = "" 674 offset = "pop" 675 else: 676 prefix = "" 677 self._stream.write("progress: %s%s\n" % (prefix, offset)) 678 679 def time(self, a_datetime): 680 """Inform the client of the time. 681 682 ":param datetime: A datetime.datetime object. 683 """ 684 time = a_datetime.astimezone(iso8601.Utc()) 685 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( 686 time.year, time.month, time.day, time.hour, time.minute, 687 time.second, time.microsecond)) 688 689 def _write_details(self, details): 690 """Output details to the stream. 691 692 :param details: An extended details dict for a test outcome. 693 """ 694 self._stream.write(" [ multipart\n") 695 for name, content in sorted(details.iteritems()): 696 self._stream.write("Content-Type: %s/%s" % 697 (content.content_type.type, content.content_type.subtype)) 698 parameters = content.content_type.parameters 699 if parameters: 700 self._stream.write(";") 701 param_strs = [] 702 for param, value in parameters.iteritems(): 703 param_strs.append("%s=%s" % (param, value)) 704 self._stream.write(",".join(param_strs)) 705 self._stream.write("\n%s\n" % name) 706 encoder = chunked.Encoder(self._stream) 707 map(encoder.write, content.iter_bytes()) 708 encoder.close() 709 710 def done(self): 711 """Obey the testtools result.done() interface.""" 712 713 714 def RemoteError(description=u""): 715 return (_StringException, _StringException(description), None) 246 716 247 717 248 718 class RemotedTestCase(unittest.TestCase): 249 """A class to represent test cases run in child processes.""" 719 """A class to represent test cases run in child processes. 720 721 Instances of this class are used to provide the Python test API a TestCase 722 that can be printed to the screen, introspected for metadata and so on. 723 However, as they are a simply a memoisation of a test that was actually 724 run in the past by a separate process, they cannot perform any interactive 725 actions. 726 """ 250 727 251 728 def __eq__ (self, other): … … 273 750 274 751 def id(self): 275 return "%s .%s" % (self._strclass(), self.__description)752 return "%s" % (self.__description,) 276 753 277 754 def __str__(self): … … 285 762 if result is None: result = self.defaultTestResult() 286 763 result.startTest(self) 287 result.addError(self, RemoteError( "Cannot run RemotedTestCases.\n"))764 result.addError(self, RemoteError(u"Cannot run RemotedTestCases.\n")) 288 765 result.stopTest(self) 289 766 … … 315 792 def debug(self): 316 793 """Run the test without collecting errors in a TestResult""" 317 self._run( unittest.TestResult())794 self._run(testresult.TestResult()) 318 795 319 796 def _run(self, result): 320 797 protocol = TestProtocolServer(result) 321 output = os.popen(self.script, mode='r') 322 protocol.readFrom(output) 798 output = subprocess.Popen(self.script, shell=True, 799 stdout=subprocess.PIPE).communicate()[0] 800 protocol.readFrom(StringIO(output)) 323 801 324 802 325 803 class IsolatedTestCase(unittest.TestCase): 326 """A TestCase which runs its tests in a forked process.""" 804 """A TestCase which executes in a forked process. 805 806 Each test gets its own process, which has a performance overhead but will 807 provide excellent isolation from global state (such as django configs, 808 zope utilities and so on). 809 """ 327 810 328 811 def run(self, result=None): … … 332 815 333 816 class IsolatedTestSuite(unittest.TestSuite): 334 """A TestCase which runs its tests in a forked process.""" 817 """A TestSuite which runs its tests in a forked process. 818 819 This decorator that will fork() before running the tests and report the 820 results from the child process using a Subunit stream. This is useful for 821 handling tests that mutate global state, or are testing C extensions that 822 could crash the VM. 823 """ 335 824 336 825 def run(self, result=None): 337 if result is None: result = unittest.TestResult()826 if result is None: result = testresult.TestResult() 338 827 run_isolated(unittest.TestSuite, self, result) 339 828 … … 377 866 378 867 379 class SubunitTestRunner(object): 380 def __init__(self, stream=sys.stdout): 381 self.stream = stream 382 383 def run(self, test): 384 "Run the given test case or test suite." 385 result = TestProtocolClient(self.stream) 386 test(result) 387 return result 388 868 def TAP2SubUnit(tap, subunit): 869 """Filter a TAP pipe into a subunit pipe. 870 871 :param tap: A tap pipe/stream/file object. 872 :param subunit: A pipe/stream/file object to write subunit results to. 873 :return: The exit code to exit with. 874 """ 875 BEFORE_PLAN = 0 876 AFTER_PLAN = 1 877 SKIP_STREAM = 2 878 client = TestProtocolClient(subunit) 879 state = BEFORE_PLAN 880 plan_start = 1 881 plan_stop = 0 882 def _skipped_test(subunit, plan_start): 883 # Some tests were skipped. 884 subunit.write('test test %d\n' % plan_start) 885 subunit.write('error test %d [\n' % plan_start) 886 subunit.write('test missing from TAP output\n') 887 subunit.write(']\n') 888 return plan_start + 1 889 # Test data for the next test to emit 890 test_name = None 891 log = [] 892 result = None 893 def _emit_test(): 894 "write out a test" 895 if test_name is None: 896 return 897 subunit.write("test %s\n" % test_name) 898 if not log: 899 subunit.write("%s %s\n" % (result, test_name)) 900 else: 901 subunit.write("%s %s [\n" % (result, test_name)) 902 if log: 903 for line in log: 904 subunit.write("%s\n" % line) 905 subunit.write("]\n") 906 del log[:] 907 for line in tap: 908 if state == BEFORE_PLAN: 909 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line) 910 if match: 911 state = AFTER_PLAN 912 _, plan_stop, comment = match.groups() 913 plan_stop = int(plan_stop) 914 if plan_start > plan_stop and plan_stop == 0: 915 # skipped file 916 state = SKIP_STREAM 917 subunit.write("test file skip\n") 918 subunit.write("skip file skip [\n") 919 subunit.write("%s\n" % comment) 920 subunit.write("]\n") 921 continue 922 # not a plan line, or have seen one before 923 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line) 924 if match: 925 # new test, emit current one. 926 _emit_test() 927 status, number, description, directive, directive_comment = match.groups() 928 if status == 'ok': 929 result = 'success' 930 else: 931 result = "failure" 932 if description is None: 933 description = '' 934 else: 935 description = ' ' + description 936 if directive is not None: 937 if directive.upper() == 'TODO': 938 result = 'xfail' 939 elif directive.upper() == 'SKIP': 940 result = 'skip' 941 if directive_comment is not None: 942 log.append(directive_comment) 943 if number is not None: 944 number = int(number) 945 while plan_start < number: 946 plan_start = _skipped_test(subunit, plan_start) 947 test_name = "test %d%s" % (plan_start, description) 948 plan_start += 1 949 continue 950 match = re.match("Bail out\!(?:\s*(.*))?\n", line) 951 if match: 952 reason, = match.groups() 953 if reason is None: 954 extra = '' 955 else: 956 extra = ' %s' % reason 957 _emit_test() 958 test_name = "Bail out!%s" % extra 959 result = "error" 960 state = SKIP_STREAM 961 continue 962 match = re.match("\#.*\n", line) 963 if match: 964 log.append(line[:-1]) 965 continue 966 subunit.write(line) 967 _emit_test() 968 while plan_start <= plan_stop: 969 # record missed tests 970 plan_start = _skipped_test(subunit, plan_start) 971 return 0 972 973 974 def tag_stream(original, filtered, tags): 975 """Alter tags on a stream. 976 977 :param original: The input stream. 978 :param filtered: The output stream. 979 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or 980 '-TAG' commands. 981 982 A 'TAG' command will add the tag to the output stream, 983 and override any existing '-TAG' command in that stream. 984 Specifically: 985 * A global 'tags: TAG' will be added to the start of the stream. 986 * Any tags commands with -TAG will have the -TAG removed. 987 988 A '-TAG' command will remove the TAG command from the stream. 989 Specifically: 990 * A 'tags: -TAG' command will be added to the start of the stream. 991 * Any 'tags: TAG' command will have 'TAG' removed from it. 992 Additionally, any redundant tagging commands (adding a tag globally 993 present, or removing a tag globally removed) are stripped as a 994 by-product of the filtering. 995 :return: 0 996 """ 997 new_tags, gone_tags = tags_to_new_gone(tags) 998 def write_tags(new_tags, gone_tags): 999 if new_tags or gone_tags: 1000 filtered.write("tags: " + ' '.join(new_tags)) 1001 if gone_tags: 1002 for tag in gone_tags: 1003 filtered.write("-" + tag) 1004 filtered.write("\n") 1005 write_tags(new_tags, gone_tags) 1006 # TODO: use the protocol parser and thus don't mangle test comments. 1007 for line in original: 1008 if line.startswith("tags:"): 1009 line_tags = line[5:].split() 1010 line_new, line_gone = tags_to_new_gone(line_tags) 1011 line_new = line_new - gone_tags 1012 line_gone = line_gone - new_tags 1013 write_tags(line_new, line_gone) 1014 else: 1015 filtered.write(line) 1016 return 0 1017 1018 1019 class ProtocolTestCase(object): 1020 """Subunit wire protocol to unittest.TestCase adapter. 1021 1022 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol - 1023 calling a ProtocolTestCase or invoking the run() method will make a 'test 1024 run' happen. The 'test run' will simply be a replay of the test activity 1025 that has been encoded into the stream. The ``unittest.TestCase`` ``debug`` 1026 and ``countTestCases`` methods are not supported because there isn't a 1027 sensible mapping for those methods. 1028 1029 # Get a stream (any object with a readline() method), in this case the 1030 # stream output by the example from ``subunit.TestProtocolClient``. 1031 stream = file('tests.log', 'rb') 1032 # Create a parser which will read from the stream and emit 1033 # activity to a unittest.TestResult when run() is called. 1034 suite = subunit.ProtocolTestCase(stream) 1035 # Create a result object to accept the contents of that stream. 1036 result = unittest._TextTestResult(sys.stdout) 1037 # 'run' the tests - process the stream and feed its contents to result. 1038 suite.run(result) 1039 stream.close() 1040 1041 :seealso: TestProtocolServer (the subunit wire protocol parser). 1042 """ 1043 1044 def __init__(self, stream, passthrough=None, forward=False): 1045 """Create a ProtocolTestCase reading from stream. 1046 1047 :param stream: A filelike object which a subunit stream can be read 1048 from. 1049 :param passthrough: A stream pass non subunit input on to. If not 1050 supplied, the TestProtocolServer default is used. 1051 :param forward: A stream to pass subunit input on to. If not supplied 1052 subunit input is not forwarded. 1053 """ 1054 self._stream = stream 1055 _make_stream_binary(stream) 1056 self._passthrough = passthrough 1057 self._forward = forward 1058 _make_stream_binary(forward) 1059 1060 def __call__(self, result=None): 1061 return self.run(result) 1062 1063 def run(self, result=None): 1064 if result is None: 1065 result = self.defaultTestResult() 1066 protocol = TestProtocolServer(result, self._passthrough, self._forward) 1067 line = self._stream.readline() 1068 while line: 1069 protocol.lineReceived(line) 1070 line = self._stream.readline() 1071 protocol.lostConnection() 1072 1073 1074 class TestResultStats(testresult.TestResult): 1075 """A pyunit TestResult interface implementation for making statistics. 1076 1077 :ivar total_tests: The total tests seen. 1078 :ivar passed_tests: The tests that passed. 1079 :ivar failed_tests: The tests that failed. 1080 :ivar seen_tags: The tags seen across all tests. 1081 """ 1082 1083 def __init__(self, stream): 1084 """Create a TestResultStats which outputs to stream.""" 1085 testresult.TestResult.__init__(self) 1086 self._stream = stream 1087 self.failed_tests = 0 1088 self.skipped_tests = 0 1089 self.seen_tags = set() 1090 1091 @property 1092 def total_tests(self): 1093 return self.testsRun 1094 1095 def addError(self, test, err, details=None): 1096 self.failed_tests += 1 1097 1098 def addFailure(self, test, err, details=None): 1099 self.failed_tests += 1 1100 1101 def addSkip(self, test, reason, details=None): 1102 self.skipped_tests += 1 1103 1104 def formatStats(self): 1105 self._stream.write("Total tests: %5d\n" % self.total_tests) 1106 self._stream.write("Passed tests: %5d\n" % self.passed_tests) 1107 self._stream.write("Failed tests: %5d\n" % self.failed_tests) 1108 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests) 1109 tags = sorted(self.seen_tags) 1110 self._stream.write("Seen tags: %s\n" % (", ".join(tags))) 1111 1112 @property 1113 def passed_tests(self): 1114 return self.total_tests - self.failed_tests - self.skipped_tests 1115 1116 def tags(self, new_tags, gone_tags): 1117 """Accumulate the seen tags.""" 1118 self.seen_tags.update(new_tags) 1119 1120 def wasSuccessful(self): 1121 """Tells whether or not this result was a success""" 1122 return self.failed_tests == 0 1123 1124 1125 def get_default_formatter(): 1126 """Obtain the default formatter to write to. 1127 1128 :return: A file-like object. 1129 """ 1130 formatter = os.getenv("SUBUNIT_FORMATTER") 1131 if formatter: 1132 return os.popen(formatter, "w") 1133 else: 1134 return sys.stdout 1135 1136 1137 def _make_stream_binary(stream): 1138 """Ensure that a stream will be binary safe. See _make_binary_on_windows.""" 1139 if getattr(stream, 'fileno', None) is not None: 1140 _make_binary_on_windows(stream.fileno()) 1141 1142 def _make_binary_on_windows(fileno): 1143 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078.""" 1144 if sys.platform == "win32": 1145 import msvcrt 1146 msvcrt.setmode(fileno, os.O_BINARY) -
vendor/current/lib/subunit/python/subunit/tests/__init__.py
r414 r740 3 3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> 4 4 # 5 # This program is free software; you can redistribute it and/or modify 6 # it under the terms of the GNU General Public License as published by 7 # the Free Software Foundation; either version 2 of the License, or 8 # (at your option) any later version. 9 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU General Public License for more details. 14 # 15 # You should have received a copy of the GNU General Public License 16 # along with this program; if not, write to the Free Software 17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 6 # license at the users choice. A copy of both licenses are available in the 7 # project source as Apache-2.0 and BSD. You may not use this file except in 8 # compliance with one of these two licences. 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 # license you chose for the specific language governing permissions and 14 # limitations under that license. 18 15 # 19 16 20 from subunit.tests import TestUtil, test_test_protocol 17 from subunit.tests import ( 18 TestUtil, 19 test_chunked, 20 test_details, 21 test_progress_model, 22 test_subunit_filter, 23 test_subunit_stats, 24 test_subunit_tags, 25 test_tap2subunit, 26 test_test_protocol, 27 test_test_results, 28 ) 21 29 22 30 def test_suite(): 23 31 result = TestUtil.TestSuite() 32 result.addTest(test_chunked.test_suite()) 33 result.addTest(test_details.test_suite()) 34 result.addTest(test_progress_model.test_suite()) 35 result.addTest(test_test_results.test_suite()) 24 36 result.addTest(test_test_protocol.test_suite()) 37 result.addTest(test_tap2subunit.test_suite()) 38 result.addTest(test_subunit_filter.test_suite()) 39 result.addTest(test_subunit_tags.test_suite()) 40 result.addTest(test_subunit_stats.test_suite()) 25 41 return result -
vendor/current/lib/subunit/python/subunit/tests/sample-script.py
r414 r740 1 1 #!/usr/bin/env python 2 2 import sys 3 if len(sys.argv) == 2: 4 # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args 5 # uses this code path to be sure that the arguments were passed to 6 # sample-script.py 7 print "test fail" 8 print "error fail" 9 sys.exit(0) 3 10 print "test old mcdonald" 4 11 print "success old mcdonald" -
vendor/current/lib/subunit/python/subunit/tests/test_test_protocol.py
r414 r740 1 1 # 2 # subunit: extensions to python unittest to get test results from subprocesses.2 # subunit: extensions to Python unittest to get test results from subprocesses. 3 3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> 4 4 # 5 # This program is free software; you can redistribute it and/or modify 6 # it under the terms of the GNU General Public License as published by 7 # the Free Software Foundation; either version 2 of the License, or 8 # (at your option) any later version. 5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause 6 # license at the users choice. A copy of both licenses are available in the 7 # project source as Apache-2.0 and BSD. You may not use this file except in 8 # compliance with one of these two licences. 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT 12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 # license you chose for the specific language governing permissions and 14 # limitations under that license. 9 15 # 10 # This program is distributed in the hope that it will be useful, 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 # GNU General Public License for more details. 14 # 15 # You should have received a copy of the GNU General Public License 16 # along with this program; if not, write to the Free Software 17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 18 # 19 16 17 import datetime 20 18 import unittest 21 19 from StringIO import StringIO 22 20 import os 21 import sys 22 23 from testtools.content import Content, TracebackContent 24 from testtools.content_type import ContentType 25 from testtools.tests.helpers import ( 26 Python26TestResult, 27 Python27TestResult, 28 ExtendedTestResult, 29 ) 30 23 31 import subunit 24 import sys 25 26 try: 27 class MockTestProtocolServerClient(object): 28 """A mock protocol server client to test callbacks.""" 29 30 def __init__(self): 31 self.end_calls = [] 32 self.error_calls = [] 33 self.failure_calls = [] 34 self.start_calls = [] 35 self.success_calls = [] 36 super(MockTestProtocolServerClient, self).__init__() 37 38 def addError(self, test, error): 39 self.error_calls.append((test, error)) 40 41 def addFailure(self, test, error): 42 self.failure_calls.append((test, error)) 43 44 def addSuccess(self, test): 45 self.success_calls.append(test) 46 47 def stopTest(self, test): 48 self.end_calls.append(test) 49 50 def startTest(self, test): 51 self.start_calls.append(test) 52 53 except AttributeError: 54 MockTestProtocolServer = None 55 56 57 class TestMockTestProtocolServer(unittest.TestCase): 58 59 def test_start_test(self): 60 protocol = MockTestProtocolServerClient() 61 protocol.startTest(subunit.RemotedTestCase("test old mcdonald")) 62 self.assertEqual(protocol.start_calls, 63 [subunit.RemotedTestCase("test old mcdonald")]) 64 self.assertEqual(protocol.end_calls, []) 65 self.assertEqual(protocol.error_calls, []) 66 self.assertEqual(protocol.failure_calls, []) 67 self.assertEqual(protocol.success_calls, []) 68 69 def test_add_error(self): 70 protocol = MockTestProtocolServerClient() 71 protocol.addError(subunit.RemotedTestCase("old mcdonald"), 72 subunit.RemoteError("omg it works")) 73 self.assertEqual(protocol.start_calls, []) 74 self.assertEqual(protocol.end_calls, []) 75 self.assertEqual(protocol.error_calls, [( 76 subunit.RemotedTestCase("old mcdonald"), 77 subunit.RemoteError("omg it works"))]) 78 self.assertEqual(protocol.failure_calls, []) 79 self.assertEqual(protocol.success_calls, []) 80 81 def test_add_failure(self): 82 protocol = MockTestProtocolServerClient() 83 protocol.addFailure(subunit.RemotedTestCase("old mcdonald"), 84 subunit.RemoteError("omg it works")) 85 self.assertEqual(protocol.start_calls, []) 86 self.assertEqual(protocol.end_calls, []) 87 self.assertEqual(protocol.error_calls, []) 88 self.assertEqual(protocol.failure_calls, [ 89 (subunit.RemotedTestCase("old mcdonald"), 90 subunit.RemoteError("omg it works"))]) 91 self.assertEqual(protocol.success_calls, []) 92 93 def test_add_success(self): 94 protocol = MockTestProtocolServerClient() 95 protocol.addSuccess(subunit.RemotedTestCase("test old mcdonald")) 96 self.assertEqual(protocol.start_calls, []) 97 self.assertEqual(protocol.end_calls, []) 98 self.assertEqual(protocol.error_calls, []) 99 self.assertEqual(protocol.failure_calls, []) 100 self.assertEqual(protocol.success_calls, 101 [subunit.RemotedTestCase("test old mcdonald")]) 102 103 def test_end_test(self): 104 protocol = MockTestProtocolServerClient() 105 protocol.stopTest(subunit.RemotedTestCase("test old mcdonald")) 106 self.assertEqual(protocol.end_calls, 107 [subunit.RemotedTestCase("test old mcdonald")]) 108 self.assertEqual(protocol.error_calls, []) 109 self.assertEqual(protocol.failure_calls, []) 110 self.assertEqual(protocol.success_calls, []) 111 self.assertEqual(protocol.start_calls, []) 32 from subunit import _remote_exception_str 33 import subunit.iso8601 as iso8601 112 34 113 35 … … 115 37 116 38 def test_imports(self): 39 from subunit import DiscardStream 117 40 from subunit import TestProtocolServer 118 41 from subunit import RemotedTestCase … … 121 44 from subunit import IsolatedTestCase 122 45 from subunit import TestProtocolClient 123 46 from subunit import ProtocolTestCase 47 48 49 class TestDiscardStream(unittest.TestCase): 50 51 def test_write(self): 52 subunit.DiscardStream().write("content") 53 54 55 class TestProtocolServerForward(unittest.TestCase): 56 57 def test_story(self): 58 client = unittest.TestResult() 59 out = StringIO() 60 protocol = subunit.TestProtocolServer(client, forward_stream=out) 61 pipe = StringIO("test old mcdonald\n" 62 "success old mcdonald\n") 63 protocol.readFrom(pipe) 64 mcdonald = subunit.RemotedTestCase("old mcdonald") 65 self.assertEqual(client.testsRun, 1) 66 self.assertEqual(pipe.getvalue(), out.getvalue()) 67 68 def test_not_command(self): 69 client = unittest.TestResult() 70 out = StringIO() 71 protocol = subunit.TestProtocolServer(client, 72 stream=subunit.DiscardStream(), forward_stream=out) 73 pipe = StringIO("success old mcdonald\n") 74 protocol.readFrom(pipe) 75 self.assertEqual(client.testsRun, 0) 76 self.assertEqual("", out.getvalue()) 77 124 78 125 79 class TestTestProtocolServerPipe(unittest.TestCase): … … 141 95 an_error = subunit.RemotedTestCase("an error") 142 96 self.assertEqual(client.errors, 143 [(an_error, 'RemoteException: \n\n')])97 [(an_error, _remote_exception_str + '\n')]) 144 98 self.assertEqual( 145 99 client.failures, 146 [(bing, "RemoteException: foo.c:53:ERROR invalid state\n\n")]) 100 [(bing, _remote_exception_str + ": Text attachment: traceback\n" 101 "------------\nfoo.c:53:ERROR invalid state\n" 102 "------------\n\n")]) 147 103 self.assertEqual(client.testsRun, 3) 148 104 105 def test_non_test_characters_forwarded_immediately(self): 106 pass 107 149 108 150 109 class TestTestProtocolServerStartTest(unittest.TestCase): 151 110 152 111 def setUp(self): 153 self.client = MockTestProtocolServerClient()112 self.client = Python26TestResult() 154 113 self.protocol = subunit.TestProtocolServer(self.client) 155 114 156 115 def test_start_test(self): 157 116 self.protocol.lineReceived("test old mcdonald\n") 158 self.assertEqual(self.client. start_calls,159 [subunit.RemotedTestCase("old mcdonald")])117 self.assertEqual(self.client._events, 118 [('startTest', subunit.RemotedTestCase("old mcdonald"))]) 160 119 161 120 def test_start_testing(self): 162 121 self.protocol.lineReceived("testing old mcdonald\n") 163 self.assertEqual(self.client. start_calls,164 [subunit.RemotedTestCase("old mcdonald")])122 self.assertEqual(self.client._events, 123 [('startTest', subunit.RemotedTestCase("old mcdonald"))]) 165 124 166 125 def test_start_test_colon(self): 167 126 self.protocol.lineReceived("test: old mcdonald\n") 168 self.assertEqual(self.client.start_calls, 169 [subunit.RemotedTestCase("old mcdonald")]) 127 self.assertEqual(self.client._events, 128 [('startTest', subunit.RemotedTestCase("old mcdonald"))]) 129 130 def test_indented_test_colon_ignored(self): 131 self.protocol.lineReceived(" test: old mcdonald\n") 132 self.assertEqual([], self.client._events) 170 133 171 134 def test_start_testing_colon(self): 172 135 self.protocol.lineReceived("testing: old mcdonald\n") 173 self.assertEqual(self.client. start_calls,174 [subunit.RemotedTestCase("old mcdonald")])136 self.assertEqual(self.client._events, 137 [('startTest', subunit.RemotedTestCase("old mcdonald"))]) 175 138 176 139 … … 178 141 179 142 def setUp(self): 180 from StringIO import StringIO181 143 self.stdout = StringIO() 182 144 self.test = subunit.RemotedTestCase("old mcdonald") 183 self.client = MockTestProtocolServerClient()145 self.client = ExtendedTestResult() 184 146 self.protocol = subunit.TestProtocolServer(self.client, self.stdout) 185 147 … … 206 168 def test_keywords_before_test(self): 207 169 self.keywords_before_test() 208 self.assertEqual(self.client.start_calls, []) 209 self.assertEqual(self.client.error_calls, []) 210 self.assertEqual(self.client.failure_calls, []) 211 self.assertEqual(self.client.success_calls, []) 170 self.assertEqual(self.client._events, []) 212 171 213 172 def test_keywords_after_error(self): … … 215 174 self.protocol.lineReceived("error old mcdonald\n") 216 175 self.keywords_before_test() 217 self.assertEqual(self.client.start_calls, [self.test]) 218 self.assertEqual(self.client.end_calls, [self.test]) 219 self.assertEqual(self.client.error_calls, 220 [(self.test, subunit.RemoteError(""))]) 221 self.assertEqual(self.client.failure_calls, []) 222 self.assertEqual(self.client.success_calls, []) 176 self.assertEqual([ 177 ('startTest', self.test), 178 ('addError', self.test, {}), 179 ('stopTest', self.test), 180 ], self.client._events) 223 181 224 182 def test_keywords_after_failure(self): … … 226 184 self.protocol.lineReceived("failure old mcdonald\n") 227 185 self.keywords_before_test() 228 self.assertEqual(self.client.start_calls, [self.test]) 229 self.assertEqual(self.client.end_calls, [self.test]) 230 self.assertEqual(self.client.error_calls, []) 231 self.assertEqual(self.client.failure_calls, 232 [(self.test, subunit.RemoteError())]) 233 self.assertEqual(self.client.success_calls, []) 186 self.assertEqual(self.client._events, [ 187 ('startTest', self.test), 188 ('addFailure', self.test, {}), 189 ('stopTest', self.test), 190 ]) 234 191 235 192 def test_keywords_after_success(self): … … 237 194 self.protocol.lineReceived("success old mcdonald\n") 238 195 self.keywords_before_test() 239 self.assertEqual( self.client.start_calls, [self.test])240 self.assertEqual(self.client.end_calls, [self.test])241 self.assertEqual(self.client.error_calls, [])242 self.assertEqual(self.client.failure_calls, [])243 self.assertEqual(self.client.success_calls, [self.test])196 self.assertEqual([ 197 ('startTest', self.test), 198 ('addSuccess', self.test), 199 ('stopTest', self.test), 200 ], self.client._events) 244 201 245 202 def test_keywords_after_test(self): … … 266 223 "successful: a\n" 267 224 "]\n") 268 self.assertEqual(self.client.start_calls, [self.test]) 269 self.assertEqual(self.client.end_calls, [self.test]) 270 self.assertEqual(self.client.failure_calls, 271 [(self.test, subunit.RemoteError())]) 272 self.assertEqual(self.client.error_calls, []) 273 self.assertEqual(self.client.success_calls, []) 225 self.assertEqual(self.client._events, [ 226 ('startTest', self.test), 227 ('addFailure', self.test, {}), 228 ('stopTest', self.test), 229 ]) 274 230 275 231 def test_keywords_during_failure(self): 232 # A smoke test to make sure that the details parsers have control 233 # appropriately. 276 234 self.protocol.lineReceived("test old mcdonald\n") 277 235 self.protocol.lineReceived("failure: old mcdonald [\n") … … 288 246 self.protocol.lineReceived("]\n") 289 247 self.assertEqual(self.stdout.getvalue(), "") 290 self.assertEqual(self.client.start_calls, [self.test]) 291 self.assertEqual(self.client.failure_calls, 292 [(self.test, subunit.RemoteError("test old mcdonald\n" 293 "failure a\n" 294 "failure: a\n" 295 "error a\n" 296 "error: a\n" 297 "success a\n" 298 "success: a\n" 299 "successful a\n" 300 "successful: a\n" 301 "]\n"))]) 302 self.assertEqual(self.client.end_calls, [self.test]) 303 self.assertEqual(self.client.error_calls, []) 304 self.assertEqual(self.client.success_calls, []) 248 details = {} 249 details['traceback'] = Content(ContentType("text", "x-traceback", 250 {'charset': 'utf8'}), 251 lambda:[ 252 "test old mcdonald\n" 253 "failure a\n" 254 "failure: a\n" 255 "error a\n" 256 "error: a\n" 257 "success a\n" 258 "success: a\n" 259 "successful a\n" 260 "successful: a\n" 261 "]\n"]) 262 self.assertEqual(self.client._events, [ 263 ('startTest', self.test), 264 ('addFailure', self.test, details), 265 ('stopTest', self.test), 266 ]) 305 267 306 268 def test_stdout_passthrough(self): … … 316 278 317 279 def setUp(self): 318 self.client = MockTestProtocolServerClient()280 self.client = Python26TestResult() 319 281 self.protocol = subunit.TestProtocolServer(self.client) 320 282 self.test = subunit.RemotedTestCase("old mcdonald") … … 322 284 def test_lost_connection_no_input(self): 323 285 self.protocol.lostConnection() 324 self.assertEqual(self.client.start_calls, []) 325 self.assertEqual(self.client.error_calls, []) 326 self.assertEqual(self.client.failure_calls, []) 327 self.assertEqual(self.client.success_calls, []) 286 self.assertEqual([], self.client._events) 328 287 329 288 def test_lost_connection_after_start(self): 330 289 self.protocol.lineReceived("test old mcdonald\n") 331 290 self.protocol.lostConnection() 332 self.assertEqual(self.client.start_calls, [self.test])333 self.assertEqual(self.client.end_calls, [self.test])334 self.assertEqual( self.client.error_calls,[335 ( self.test, subunit.RemoteError("lost connection during "336 "test 'old mcdonald'"))])337 self.assertEqual(self.client.failure_calls, [])338 self.assertEqual(self.client.success_calls, [])291 failure = subunit.RemoteError( 292 u"lost connection during test 'old mcdonald'") 293 self.assertEqual([ 294 ('startTest', self.test), 295 ('addError', self.test, failure), 296 ('stopTest', self.test), 297 ], self.client._events) 339 298 340 299 def test_lost_connected_after_error(self): … … 342 301 self.protocol.lineReceived("error old mcdonald\n") 343 302 self.protocol.lostConnection() 344 self.assertEqual(self.client.start_calls, [self.test]) 345 self.assertEqual(self.client.failure_calls, []) 346 self.assertEqual(self.client.end_calls, [self.test]) 347 self.assertEqual(self.client.error_calls, [ 348 (self.test, subunit.RemoteError(""))]) 349 self.assertEqual(self.client.success_calls, []) 303 self.assertEqual([ 304 ('startTest', self.test), 305 ('addError', self.test, subunit.RemoteError(u"")), 306 ('stopTest', self.test), 307 ], self.client._events) 308 309 def do_connection_lost(self, outcome, opening): 310 self.protocol.lineReceived("test old mcdonald\n") 311 self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening)) 312 self.protocol.lostConnection() 313 failure = subunit.RemoteError( 314 u"lost connection during %s report of test 'old mcdonald'" % 315 outcome) 316 self.assertEqual([ 317 ('startTest', self.test), 318 ('addError', self.test, failure), 319 ('stopTest', self.test), 320 ], self.client._events) 350 321 351 322 def test_lost_connection_during_error(self): 352 self.protocol.lineReceived("test old mcdonald\n") 353 self.protocol.lineReceived("error old mcdonald [\n") 354 self.protocol.lostConnection() 355 self.assertEqual(self.client.start_calls, [self.test]) 356 self.assertEqual(self.client.end_calls, [self.test]) 357 self.assertEqual(self.client.error_calls, [ 358 (self.test, subunit.RemoteError("lost connection during error " 359 "report of test 'old mcdonald'"))]) 360 self.assertEqual(self.client.failure_calls, []) 361 self.assertEqual(self.client.success_calls, []) 323 self.do_connection_lost("error", "[\n") 324 325 def test_lost_connection_during_error_details(self): 326 self.do_connection_lost("error", "[ multipart\n") 362 327 363 328 def test_lost_connected_after_failure(self): … … 365 330 self.protocol.lineReceived("failure old mcdonald\n") 366 331 self.protocol.lostConnection() 367 test = subunit.RemotedTestCase("old mcdonald") 368 self.assertEqual(self.client.start_calls, [self.test]) 369 self.assertEqual(self.client.end_calls, [self.test]) 370 self.assertEqual(self.client.error_calls, []) 371 self.assertEqual(self.client.failure_calls, 372 [(self.test, subunit.RemoteError())]) 373 self.assertEqual(self.client.success_calls, []) 332 self.assertEqual([ 333 ('startTest', self.test), 334 ('addFailure', self.test, subunit.RemoteError(u"")), 335 ('stopTest', self.test), 336 ], self.client._events) 374 337 375 338 def test_lost_connection_during_failure(self): 376 self.protocol.lineReceived("test old mcdonald\n") 377 self.protocol.lineReceived("failure old mcdonald [\n") 378 self.protocol.lostConnection() 379 self.assertEqual(self.client.start_calls, [self.test]) 380 self.assertEqual(self.client.end_calls, [self.test]) 381 self.assertEqual(self.client.error_calls, 382 [(self.test, 383 subunit.RemoteError("lost connection during " 384 "failure report" 385 " of test 'old mcdonald'"))]) 386 self.assertEqual(self.client.failure_calls, []) 387 self.assertEqual(self.client.success_calls, []) 339 self.do_connection_lost("failure", "[\n") 340 341 def test_lost_connection_during_failure_details(self): 342 self.do_connection_lost("failure", "[ multipart\n") 388 343 389 344 def test_lost_connection_after_success(self): … … 391 346 self.protocol.lineReceived("success old mcdonald\n") 392 347 self.protocol.lostConnection() 393 self.assertEqual(self.client.start_calls, [self.test]) 394 self.assertEqual(self.client.end_calls, [self.test]) 395 self.assertEqual(self.client.error_calls, []) 396 self.assertEqual(self.client.failure_calls, []) 397 self.assertEqual(self.client.success_calls, [self.test]) 398 399 400 class TestTestProtocolServerAddError(unittest.TestCase): 348 self.assertEqual([ 349 ('startTest', self.test), 350 ('addSuccess', self.test), 351 ('stopTest', self.test), 352 ], self.client._events) 353 354 def test_lost_connection_during_success(self): 355 self.do_connection_lost("success", "[\n") 356 357 def test_lost_connection_during_success_details(self): 358 self.do_connection_lost("success", "[ multipart\n") 359 360 def test_lost_connection_during_skip(self): 361 self.do_connection_lost("skip", "[\n") 362 363 def test_lost_connection_during_skip_details(self): 364 self.do_connection_lost("skip", "[ multipart\n") 365 366 def test_lost_connection_during_xfail(self): 367 self.do_connection_lost("xfail", "[\n") 368 369 def test_lost_connection_during_xfail_details(self): 370 self.do_connection_lost("xfail", "[ multipart\n") 371 372 373 class TestInTestMultipart(unittest.TestCase): 401 374 402 375 def setUp(self): 403 self.client = MockTestProtocolServerClient()376 self.client = ExtendedTestResult() 404 377 self.protocol = subunit.TestProtocolServer(self.client) 405 378 self.protocol.lineReceived("test mcdonalds farm\n") 406 379 self.test = subunit.RemotedTestCase("mcdonalds farm") 407 380 381 def test__outcome_sets_details_parser(self): 382 self.protocol._reading_success_details.details_parser = None 383 self.protocol._state._outcome(0, "mcdonalds farm [ multipart\n", 384 None, self.protocol._reading_success_details) 385 parser = self.protocol._reading_success_details.details_parser 386 self.assertNotEqual(None, parser) 387 self.assertTrue(isinstance(parser, 388 subunit.details.MultipartDetailsParser)) 389 390 391 class TestTestProtocolServerAddError(unittest.TestCase): 392 393 def setUp(self): 394 self.client = ExtendedTestResult() 395 self.protocol = subunit.TestProtocolServer(self.client) 396 self.protocol.lineReceived("test mcdonalds farm\n") 397 self.test = subunit.RemotedTestCase("mcdonalds farm") 398 408 399 def simple_error_keyword(self, keyword): 409 400 self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) 410 self.assertEqual(self.client.start_calls, [self.test]) 411 self.assertEqual(self.client.end_calls, [self.test]) 412 self.assertEqual(self.client.error_calls, [ 413 (self.test, subunit.RemoteError(""))]) 414 self.assertEqual(self.client.failure_calls, []) 401 details = {} 402 self.assertEqual([ 403 ('startTest', self.test), 404 ('addError', self.test, details), 405 ('stopTest', self.test), 406 ], self.client._events) 415 407 416 408 def test_simple_error(self): … … 423 415 self.protocol.lineReceived("error mcdonalds farm [\n") 424 416 self.protocol.lineReceived("]\n") 425 self.assertEqual(self.client.start_calls, [self.test]) 426 self.assertEqual(self.client.end_calls, [self.test]) 427 self.assertEqual(self.client.error_calls, [ 428 (self.test, subunit.RemoteError(""))]) 429 self.assertEqual(self.client.failure_calls, []) 417 details = {} 418 details['traceback'] = Content(ContentType("text", "x-traceback", 419 {'charset': 'utf8'}), lambda:[""]) 420 self.assertEqual([ 421 ('startTest', self.test), 422 ('addError', self.test, details), 423 ('stopTest', self.test), 424 ], self.client._events) 430 425 431 426 def error_quoted_bracket(self, keyword): … … 433 428 self.protocol.lineReceived(" ]\n") 434 429 self.protocol.lineReceived("]\n") 435 self.assertEqual(self.client.start_calls, [self.test]) 436 self.assertEqual(self.client.end_calls, [self.test]) 437 self.assertEqual(self.client.error_calls, [ 438 (self.test, subunit.RemoteError("]\n"))]) 439 self.assertEqual(self.client.failure_calls, []) 430 details = {} 431 details['traceback'] = Content(ContentType("text", "x-traceback", 432 {'charset': 'utf8'}), lambda:["]\n"]) 433 self.assertEqual([ 434 ('startTest', self.test), 435 ('addError', self.test, details), 436 ('stopTest', self.test), 437 ], self.client._events) 440 438 441 439 def test_error_quoted_bracket(self): … … 449 447 450 448 def setUp(self): 451 self.client = MockTestProtocolServerClient()449 self.client = ExtendedTestResult() 452 450 self.protocol = subunit.TestProtocolServer(self.client) 453 451 self.protocol.lineReceived("test mcdonalds farm\n") 454 452 self.test = subunit.RemotedTestCase("mcdonalds farm") 455 453 454 def assertFailure(self, details): 455 self.assertEqual([ 456 ('startTest', self.test), 457 ('addFailure', self.test, details), 458 ('stopTest', self.test), 459 ], self.client._events) 460 456 461 def simple_failure_keyword(self, keyword): 457 462 self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) 458 self.assertEqual(self.client.start_calls, [self.test]) 459 self.assertEqual(self.client.end_calls, [self.test]) 460 self.assertEqual(self.client.error_calls, []) 461 self.assertEqual(self.client.failure_calls, 462 [(self.test, subunit.RemoteError())]) 463 details = {} 464 self.assertFailure(details) 463 465 464 466 def test_simple_failure(self): … … 471 473 self.protocol.lineReceived("failure mcdonalds farm [\n") 472 474 self.protocol.lineReceived("]\n") 473 self.assertEqual(self.client.start_calls, [self.test]) 474 self.assertEqual(self.client.end_calls, [self.test]) 475 self.assertEqual(self.client.error_calls, []) 476 self.assertEqual(self.client.failure_calls, 477 [(self.test, subunit.RemoteError())]) 475 details = {} 476 details['traceback'] = Content(ContentType("text", "x-traceback", 477 {'charset': 'utf8'}), lambda:[""]) 478 self.assertFailure(details) 478 479 479 480 def failure_quoted_bracket(self, keyword): … … 481 482 self.protocol.lineReceived(" ]\n") 482 483 self.protocol.lineReceived("]\n") 483 self.assertEqual(self.client.start_calls, [self.test]) 484 self.assertEqual(self.client.end_calls, [self.test]) 485 self.assertEqual(self.client.error_calls, []) 486 self.assertEqual(self.client.failure_calls, 487 [(self.test, subunit.RemoteError("]\n"))]) 484 details = {} 485 details['traceback'] = Content(ContentType("text", "x-traceback", 486 {'charset': 'utf8'}), lambda:["]\n"]) 487 self.assertFailure(details) 488 488 489 489 def test_failure_quoted_bracket(self): … … 494 494 495 495 496 class TestTestProtocolServerAddxFail(unittest.TestCase): 497 """Tests for the xfail keyword. 498 499 In Python this can thunk through to Success due to stdlib limitations (see 500 README). 501 """ 502 503 def capture_expected_failure(self, test, err): 504 self._events.append((test, err)) 505 506 def setup_python26(self): 507 """Setup a test object ready to be xfailed and thunk to success.""" 508 self.client = Python26TestResult() 509 self.setup_protocol() 510 511 def setup_python27(self): 512 """Setup a test object ready to be xfailed.""" 513 self.client = Python27TestResult() 514 self.setup_protocol() 515 516 def setup_python_ex(self): 517 """Setup a test object ready to be xfailed with details.""" 518 self.client = ExtendedTestResult() 519 self.setup_protocol() 520 521 def setup_protocol(self): 522 """Setup the protocol based on self.client.""" 523 self.protocol = subunit.TestProtocolServer(self.client) 524 self.protocol.lineReceived("test mcdonalds farm\n") 525 self.test = self.client._events[-1][-1] 526 527 def simple_xfail_keyword(self, keyword, as_success): 528 self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) 529 self.check_success_or_xfail(as_success) 530 531 def check_success_or_xfail(self, as_success, error_message=None): 532 if as_success: 533 self.assertEqual([ 534 ('startTest', self.test), 535 ('addSuccess', self.test), 536 ('stopTest', self.test), 537 ], self.client._events) 538 else: 539 details = {} 540 if error_message is not None: 541 details['traceback'] = Content( 542 ContentType("text", "x-traceback", {'charset': 'utf8'}), 543 lambda:[error_message]) 544 if isinstance(self.client, ExtendedTestResult): 545 value = details 546 else: 547 if error_message is not None: 548 value = subunit.RemoteError(u'Text attachment: traceback\n' 549 '------------\n' + error_message + '------------\n') 550 else: 551 value = subunit.RemoteError() 552 self.assertEqual([ 553 ('startTest', self.test), 554 ('addExpectedFailure', self.test, value), 555 ('stopTest', self.test), 556 ], self.client._events) 557 558 def test_simple_xfail(self): 559 self.setup_python26() 560 self.simple_xfail_keyword("xfail", True) 561 self.setup_python27() 562 self.simple_xfail_keyword("xfail", False) 563 self.setup_python_ex() 564 self.simple_xfail_keyword("xfail", False) 565 566 def test_simple_xfail_colon(self): 567 self.setup_python26() 568 self.simple_xfail_keyword("xfail:", True) 569 self.setup_python27() 570 self.simple_xfail_keyword("xfail:", False) 571 self.setup_python_ex() 572 self.simple_xfail_keyword("xfail:", False) 573 574 def test_xfail_empty_message(self): 575 self.setup_python26() 576 self.empty_message(True) 577 self.setup_python27() 578 self.empty_message(False) 579 self.setup_python_ex() 580 self.empty_message(False, error_message="") 581 582 def empty_message(self, as_success, error_message="\n"): 583 self.protocol.lineReceived("xfail mcdonalds farm [\n") 584 self.protocol.lineReceived("]\n") 585 self.check_success_or_xfail(as_success, error_message) 586 587 def xfail_quoted_bracket(self, keyword, as_success): 588 # This tests it is accepted, but cannot test it is used today, because 589 # of not having a way to expose it in Python so far. 590 self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) 591 self.protocol.lineReceived(" ]\n") 592 self.protocol.lineReceived("]\n") 593 self.check_success_or_xfail(as_success, "]\n") 594 595 def test_xfail_quoted_bracket(self): 596 self.setup_python26() 597 self.xfail_quoted_bracket("xfail", True) 598 self.setup_python27() 599 self.xfail_quoted_bracket("xfail", False) 600 self.setup_python_ex() 601 self.xfail_quoted_bracket("xfail", False) 602 603 def test_xfail_colon_quoted_bracket(self): 604 self.setup_python26() 605 self.xfail_quoted_bracket("xfail:", True) 606 self.setup_python27() 607 self.xfail_quoted_bracket("xfail:", False) 608 self.setup_python_ex() 609 self.xfail_quoted_bracket("xfail:", False) 610 611 612 class TestTestProtocolServerAddSkip(unittest.TestCase): 613 """Tests for the skip keyword. 614 615 In Python this meets the testtools extended TestResult contract. 616 (See https://launchpad.net/testtools). 617 """ 618 619 def setUp(self): 620 """Setup a test object ready to be skipped.""" 621 self.client = ExtendedTestResult() 622 self.protocol = subunit.TestProtocolServer(self.client) 623 self.protocol.lineReceived("test mcdonalds farm\n") 624 self.test = self.client._events[-1][-1] 625 626 def assertSkip(self, reason): 627 details = {} 628 if reason is not None: 629 details['reason'] = Content( 630 ContentType("text", "plain"), lambda:[reason]) 631 self.assertEqual([ 632 ('startTest', self.test), 633 ('addSkip', self.test, details), 634 ('stopTest', self.test), 635 ], self.client._events) 636 637 def simple_skip_keyword(self, keyword): 638 self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) 639 self.assertSkip(None) 640 641 def test_simple_skip(self): 642 self.simple_skip_keyword("skip") 643 644 def test_simple_skip_colon(self): 645 self.simple_skip_keyword("skip:") 646 647 def test_skip_empty_message(self): 648 self.protocol.lineReceived("skip mcdonalds farm [\n") 649 self.protocol.lineReceived("]\n") 650 self.assertSkip("") 651 652 def skip_quoted_bracket(self, keyword): 653 # This tests it is accepted, but cannot test it is used today, because 654 # of not having a way to expose it in Python so far. 655 self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) 656 self.protocol.lineReceived(" ]\n") 657 self.protocol.lineReceived("]\n") 658 self.assertSkip("]\n") 659 660 def test_skip_quoted_bracket(self): 661 self.skip_quoted_bracket("skip") 662 663 def test_skip_colon_quoted_bracket(self): 664 self.skip_quoted_bracket("skip:") 665 666 496 667 class TestTestProtocolServerAddSuccess(unittest.TestCase): 497 668 498 669 def setUp(self): 499 self.client = MockTestProtocolServerClient()670 self.client = ExtendedTestResult() 500 671 self.protocol = subunit.TestProtocolServer(self.client) 501 672 self.protocol.lineReceived("test mcdonalds farm\n") … … 504 675 def simple_success_keyword(self, keyword): 505 676 self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) 506 self.assertEqual(self.client.start_calls, [self.test]) 507 self.assertEqual(self.client.end_calls, [self.test]) 508 self.assertEqual(self.client.error_calls, []) 509 self.assertEqual(self.client.success_calls, [self.test]) 677 self.assertEqual([ 678 ('startTest', self.test), 679 ('addSuccess', self.test), 680 ('stopTest', self.test), 681 ], self.client._events) 510 682 511 683 def test_simple_success(self): … … 520 692 def test_simple_success_colon(self): 521 693 self.simple_success_keyword("successful:") 694 695 def assertSuccess(self, details): 696 self.assertEqual([ 697 ('startTest', self.test), 698 ('addSuccess', self.test, details), 699 ('stopTest', self.test), 700 ], self.client._events) 701 702 def test_success_empty_message(self): 703 self.protocol.lineReceived("success mcdonalds farm [\n") 704 self.protocol.lineReceived("]\n") 705 details = {} 706 details['message'] = Content(ContentType("text", "plain"), 707 lambda:[""]) 708 self.assertSuccess(details) 709 710 def success_quoted_bracket(self, keyword): 711 # This tests it is accepted, but cannot test it is used today, because 712 # of not having a way to expose it in Python so far. 713 self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) 714 self.protocol.lineReceived(" ]\n") 715 self.protocol.lineReceived("]\n") 716 details = {} 717 details['message'] = Content(ContentType("text", "plain"), 718 lambda:["]\n"]) 719 self.assertSuccess(details) 720 721 def test_success_quoted_bracket(self): 722 self.success_quoted_bracket("success") 723 724 def test_success_colon_quoted_bracket(self): 725 self.success_quoted_bracket("success:") 726 727 728 class TestTestProtocolServerProgress(unittest.TestCase): 729 """Test receipt of progress: directives.""" 730 731 def test_progress_accepted_stdlib(self): 732 self.result = Python26TestResult() 733 self.stream = StringIO() 734 self.protocol = subunit.TestProtocolServer(self.result, 735 stream=self.stream) 736 self.protocol.lineReceived("progress: 23") 737 self.protocol.lineReceived("progress: -2") 738 self.protocol.lineReceived("progress: +4") 739 self.assertEqual("", self.stream.getvalue()) 740 741 def test_progress_accepted_extended(self): 742 # With a progress capable TestResult, progress events are emitted. 743 self.result = ExtendedTestResult() 744 self.stream = StringIO() 745 self.protocol = subunit.TestProtocolServer(self.result, 746 stream=self.stream) 747 self.protocol.lineReceived("progress: 23") 748 self.protocol.lineReceived("progress: push") 749 self.protocol.lineReceived("progress: -2") 750 self.protocol.lineReceived("progress: pop") 751 self.protocol.lineReceived("progress: +4") 752 self.assertEqual("", self.stream.getvalue()) 753 self.assertEqual([ 754 ('progress', 23, subunit.PROGRESS_SET), 755 ('progress', None, subunit.PROGRESS_PUSH), 756 ('progress', -2, subunit.PROGRESS_CUR), 757 ('progress', None, subunit.PROGRESS_POP), 758 ('progress', 4, subunit.PROGRESS_CUR), 759 ], self.result._events) 760 761 762 class TestTestProtocolServerStreamTags(unittest.TestCase): 763 """Test managing tags on the protocol level.""" 764 765 def setUp(self): 766 self.client = ExtendedTestResult() 767 self.protocol = subunit.TestProtocolServer(self.client) 768 769 def test_initial_tags(self): 770 self.protocol.lineReceived("tags: foo bar:baz quux\n") 771 self.assertEqual([ 772 ('tags', set(["foo", "bar:baz", "quux"]), set()), 773 ], self.client._events) 774 775 def test_minus_removes_tags(self): 776 self.protocol.lineReceived("tags: -bar quux\n") 777 self.assertEqual([ 778 ('tags', set(["quux"]), set(["bar"])), 779 ], self.client._events) 780 781 def test_tags_do_not_get_set_on_test(self): 782 self.protocol.lineReceived("test mcdonalds farm\n") 783 test = self.client._events[0][-1] 784 self.assertEqual(None, getattr(test, 'tags', None)) 785 786 def test_tags_do_not_get_set_on_global_tags(self): 787 self.protocol.lineReceived("tags: foo bar\n") 788 self.protocol.lineReceived("test mcdonalds farm\n") 789 test = self.client._events[-1][-1] 790 self.assertEqual(None, getattr(test, 'tags', None)) 791 792 def test_tags_get_set_on_test_tags(self): 793 self.protocol.lineReceived("test mcdonalds farm\n") 794 test = self.client._events[-1][-1] 795 self.protocol.lineReceived("tags: foo bar\n") 796 self.protocol.lineReceived("success mcdonalds farm\n") 797 self.assertEqual(None, getattr(test, 'tags', None)) 798 799 800 class TestTestProtocolServerStreamTime(unittest.TestCase): 801 """Test managing time information at the protocol level.""" 802 803 def test_time_accepted_stdlib(self): 804 self.result = Python26TestResult() 805 self.stream = StringIO() 806 self.protocol = subunit.TestProtocolServer(self.result, 807 stream=self.stream) 808 self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n") 809 self.assertEqual("", self.stream.getvalue()) 810 811 def test_time_accepted_extended(self): 812 self.result = ExtendedTestResult() 813 self.stream = StringIO() 814 self.protocol = subunit.TestProtocolServer(self.result, 815 stream=self.stream) 816 self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n") 817 self.assertEqual("", self.stream.getvalue()) 818 self.assertEqual([ 819 ('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0, 820 iso8601.Utc())) 821 ], self.result._events) 522 822 523 823 … … 530 830 self.assertEqual("A test description", 531 831 test.shortDescription()) 532 self.assertEqual(" subunit.RemotedTestCase.A test description",832 self.assertEqual("A test description", 533 833 test.id()) 534 834 self.assertEqual("A test description (subunit.RemotedTestCase)", "%s" % test) … … 537 837 result = unittest.TestResult() 538 838 test.run(result) 539 self.assertEqual([(test, "RemoteException: "839 self.assertEqual([(test, _remote_exception_str + ": " 540 840 "Cannot run RemotedTestCases.\n\n")], 541 841 result.errors) … … 551 851 552 852 def test_eq(self): 553 error = subunit.RemoteError( "Something went wrong")554 another_error = subunit.RemoteError( "Something went wrong")555 different_error = subunit.RemoteError( "boo!")853 error = subunit.RemoteError(u"Something went wrong") 854 another_error = subunit.RemoteError(u"Something went wrong") 855 different_error = subunit.RemoteError(u"boo!") 556 856 self.assertEqual(error, another_error) 557 857 self.assertNotEqual(error, different_error) … … 559 859 560 860 def test_empty_constructor(self): 561 self.assertEqual(subunit.RemoteError(), subunit.RemoteError( ""))861 self.assertEqual(subunit.RemoteError(), subunit.RemoteError(u"")) 562 862 563 863 … … 571 871 # that fails, errors and succeeds 572 872 873 def test_sample_method_args(self): 874 """sample-script.py foo""" 875 # sample that will run just one test. 573 876 574 877 def test_construct(self): … … 577 880 subunit.join_dir(__file__, 'sample-script.py')) 578 881 882 def test_args(self): 883 result = unittest.TestResult() 884 test = self.SampleExecTestCase("test_sample_method_args") 885 test.run(result) 886 self.assertEqual(1, result.testsRun) 887 579 888 def test_run(self): 580 r unner = MockTestProtocolServerClient()889 result = ExtendedTestResult() 581 890 test = self.SampleExecTestCase("test_sample_method") 582 test.run(r unner)891 test.run(result) 583 892 mcdonald = subunit.RemotedTestCase("old mcdonald") 584 893 bing = subunit.RemotedTestCase("bing crosby") 894 bing_details = {} 895 bing_details['traceback'] = Content(ContentType("text", "x-traceback", 896 {'charset': 'utf8'}), lambda:["foo.c:53:ERROR invalid state\n"]) 585 897 an_error = subunit.RemotedTestCase("an error") 586 self.assertEqual(runner.error_calls, 587 [(an_error, subunit.RemoteError())]) 588 self.assertEqual(runner.failure_calls, 589 [(bing, 590 subunit.RemoteError( 591 "foo.c:53:ERROR invalid state\n"))]) 592 self.assertEqual(runner.start_calls, [mcdonald, bing, an_error]) 593 self.assertEqual(runner.end_calls, [mcdonald, bing, an_error]) 898 error_details = {} 899 self.assertEqual([ 900 ('startTest', mcdonald), 901 ('addSuccess', mcdonald), 902 ('stopTest', mcdonald), 903 ('startTest', bing), 904 ('addFailure', bing, bing_details), 905 ('stopTest', bing), 906 ('startTest', an_error), 907 ('addError', an_error, error_details), 908 ('stopTest', an_error), 909 ], result._events) 594 910 595 911 def test_debug(self): … … 690 1006 self.protocol = subunit.TestProtocolClient(self.io) 691 1007 self.test = TestTestProtocolClient("test_start_test") 692 1008 self.sample_details = {'something':Content( 1009 ContentType('text', 'plain'), lambda:['serialised\nform'])} 1010 self.sample_tb_details = dict(self.sample_details) 1011 self.sample_tb_details['traceback'] = TracebackContent( 1012 subunit.RemoteError(u"boo qux"), self.test) 693 1013 694 1014 def test_start_test(self): … … 698 1018 699 1019 def test_stop_test(self): 700 """Test stopTest on a TestProtocolClient."""1020 # stopTest doesn't output anything. 701 1021 self.protocol.stopTest(self.test) 702 1022 self.assertEqual(self.io.getvalue(), "") … … 708 1028 self.io.getvalue(), "successful: %s\n" % self.test.id()) 709 1029 1030 def test_add_success_details(self): 1031 """Test addSuccess on a TestProtocolClient with details.""" 1032 self.protocol.addSuccess(self.test, details=self.sample_details) 1033 self.assertEqual( 1034 self.io.getvalue(), "successful: %s [ multipart\n" 1035 "Content-Type: text/plain\n" 1036 "something\n" 1037 "F\r\nserialised\nform0\r\n]\n" % self.test.id()) 1038 710 1039 def test_add_failure(self): 711 1040 """Test addFailure on a TestProtocolClient.""" 712 self.protocol.addFailure(self.test, subunit.RemoteError("boo")) 1041 self.protocol.addFailure( 1042 self.test, subunit.RemoteError(u"boo qux")) 713 1043 self.assertEqual( 714 1044 self.io.getvalue(), 715 'failure: %s [\nRemoteException: boo\n]\n' % self.test.id()) 1045 ('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n') 1046 % self.test.id()) 1047 1048 def test_add_failure_details(self): 1049 """Test addFailure on a TestProtocolClient with details.""" 1050 self.protocol.addFailure( 1051 self.test, details=self.sample_tb_details) 1052 self.assertEqual( 1053 self.io.getvalue(), 1054 ("failure: %s [ multipart\n" 1055 "Content-Type: text/plain\n" 1056 "something\n" 1057 "F\r\nserialised\nform0\r\n" 1058 "Content-Type: text/x-traceback;charset=utf8,language=python\n" 1059 "traceback\n" 1060 "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n" 1061 "]\n") % self.test.id()) 716 1062 717 1063 def test_add_error(self): 718 1064 """Test stopTest on a TestProtocolClient.""" 719 self.protocol.addError(self.test, subunit.RemoteError("phwoar")) 1065 self.protocol.addError( 1066 self.test, subunit.RemoteError(u"phwoar crikey")) 720 1067 self.assertEqual( 721 1068 self.io.getvalue(), 722 'error: %s [\n' 723 "RemoteException: phwoar\n" 1069 ('error: %s [\n' + 1070 _remote_exception_str + ": phwoar crikey\n" 1071 "]\n") % self.test.id()) 1072 1073 def test_add_error_details(self): 1074 """Test stopTest on a TestProtocolClient with details.""" 1075 self.protocol.addError( 1076 self.test, details=self.sample_tb_details) 1077 self.assertEqual( 1078 self.io.getvalue(), 1079 ("error: %s [ multipart\n" 1080 "Content-Type: text/plain\n" 1081 "something\n" 1082 "F\r\nserialised\nform0\r\n" 1083 "Content-Type: text/x-traceback;charset=utf8,language=python\n" 1084 "traceback\n" 1085 "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n" 1086 "]\n") % self.test.id()) 1087 1088 def test_add_expected_failure(self): 1089 """Test addExpectedFailure on a TestProtocolClient.""" 1090 self.protocol.addExpectedFailure( 1091 self.test, subunit.RemoteError(u"phwoar crikey")) 1092 self.assertEqual( 1093 self.io.getvalue(), 1094 ('xfail: %s [\n' + 1095 _remote_exception_str + ": phwoar crikey\n" 1096 "]\n") % self.test.id()) 1097 1098 def test_add_expected_failure_details(self): 1099 """Test addExpectedFailure on a TestProtocolClient with details.""" 1100 self.protocol.addExpectedFailure( 1101 self.test, details=self.sample_tb_details) 1102 self.assertEqual( 1103 self.io.getvalue(), 1104 ("xfail: %s [ multipart\n" 1105 "Content-Type: text/plain\n" 1106 "something\n" 1107 "F\r\nserialised\nform0\r\n" 1108 "Content-Type: text/x-traceback;charset=utf8,language=python\n" 1109 "traceback\n" 1110 "1A\r\n"+ _remote_exception_str + ": boo qux\n0\r\n" 1111 "]\n") % self.test.id()) 1112 1113 def test_add_skip(self): 1114 """Test addSkip on a TestProtocolClient.""" 1115 self.protocol.addSkip( 1116 self.test, "Has it really?") 1117 self.assertEqual( 1118 self.io.getvalue(), 1119 'skip: %s [\nHas it really?\n]\n' % self.test.id()) 1120 1121 def test_add_skip_details(self): 1122 """Test addSkip on a TestProtocolClient with details.""" 1123 details = {'reason':Content( 1124 ContentType('text', 'plain'), lambda:['Has it really?'])} 1125 self.protocol.addSkip( 1126 self.test, details=details) 1127 self.assertEqual( 1128 self.io.getvalue(), 1129 "skip: %s [ multipart\n" 1130 "Content-Type: text/plain\n" 1131 "reason\n" 1132 "E\r\nHas it really?0\r\n" 724 1133 "]\n" % self.test.id()) 1134 1135 def test_progress_set(self): 1136 self.protocol.progress(23, subunit.PROGRESS_SET) 1137 self.assertEqual(self.io.getvalue(), 'progress: 23\n') 1138 1139 def test_progress_neg_cur(self): 1140 self.protocol.progress(-23, subunit.PROGRESS_CUR) 1141 self.assertEqual(self.io.getvalue(), 'progress: -23\n') 1142 1143 def test_progress_pos_cur(self): 1144 self.protocol.progress(23, subunit.PROGRESS_CUR) 1145 self.assertEqual(self.io.getvalue(), 'progress: +23\n') 1146 1147 def test_progress_pop(self): 1148 self.protocol.progress(1234, subunit.PROGRESS_POP) 1149 self.assertEqual(self.io.getvalue(), 'progress: pop\n') 1150 1151 def test_progress_push(self): 1152 self.protocol.progress(1234, subunit.PROGRESS_PUSH) 1153 self.assertEqual(self.io.getvalue(), 'progress: push\n') 1154 1155 def test_time(self): 1156 # Calling time() outputs a time signal immediately. 1157 self.protocol.time( 1158 datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())) 1159 self.assertEqual( 1160 "time: 2009-10-11 12:13:14.000015Z\n", 1161 self.io.getvalue()) 1162 1163 def test_add_unexpected_success(self): 1164 """Test addUnexpectedSuccess on a TestProtocolClient.""" 1165 self.protocol.addUnexpectedSuccess(self.test) 1166 self.assertEqual( 1167 self.io.getvalue(), "successful: %s\n" % self.test.id()) 1168 1169 def test_add_unexpected_success_details(self): 1170 """Test addUnexpectedSuccess on a TestProtocolClient with details.""" 1171 self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details) 1172 self.assertEqual( 1173 self.io.getvalue(), "successful: %s [ multipart\n" 1174 "Content-Type: text/plain\n" 1175 "something\n" 1176 "F\r\nserialised\nform0\r\n]\n" % self.test.id()) 725 1177 726 1178
Note:
See TracChangeset
for help on using the changeset viewer.