Changeset 391 for python/trunk/Lib/test/test_xmlrpc.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_xmlrpc.py
r2 r391 6 6 import xmlrpclib 7 7 import SimpleXMLRPCServer 8 import threading9 8 import mimetools 10 9 import httplib … … 12 11 import StringIO 13 12 import os 13 import re 14 14 from test import test_support 15 16 try: 17 import threading 18 except ImportError: 19 threading = None 15 20 16 21 try: … … 40 45 41 46 def test_dump_load(self): 42 self.assertEqual s(alist,43 47 self.assertEqual(alist, 48 xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0]) 44 49 45 50 def test_dump_bare_datetime(self): … … 51 56 s = xmlrpclib.dumps((dt,)) 52 57 (newdt,), m = xmlrpclib.loads(s, use_datetime=1) 53 self.assertEqual s(newdt, dt)54 self.assertEqual s(m, None)58 self.assertEqual(newdt, dt) 59 self.assertEqual(m, None) 55 60 56 61 (newdt,), m = xmlrpclib.loads(s, use_datetime=0) 57 self.assertEqual s(newdt, xmlrpclib.DateTime('20050210T11:41:23'))62 self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23')) 58 63 59 64 def test_datetime_before_1900(self): … … 62 67 s = xmlrpclib.dumps((dt,)) 63 68 (newdt,), m = xmlrpclib.loads(s, use_datetime=1) 64 self.assertEqual s(newdt, dt)65 self.assertEqual s(m, None)69 self.assertEqual(newdt, dt) 70 self.assertEqual(m, None) 66 71 67 72 (newdt,), m = xmlrpclib.loads(s, use_datetime=0) 68 self.assertEqual s(newdt, xmlrpclib.DateTime('00010210T11:41:23'))73 self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23')) 69 74 70 75 def test_cmp_datetime_DateTime(self): 71 76 now = datetime.datetime.now() 72 77 dt = xmlrpclib.DateTime(now.timetuple()) 73 self.assert _(dt == now)74 self.assert _(now == dt)78 self.assertTrue(dt == now) 79 self.assertTrue(now == dt) 75 80 then = now + datetime.timedelta(seconds=4) 76 self.assert _(then >= dt)77 self.assert _(dt < then)81 self.assertTrue(then >= dt) 82 self.assertTrue(dt < then) 78 83 79 84 def test_bug_1164912 (self): … … 81 86 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 82 87 methodresponse=True)) 83 self.assert _(isinstance(new_d.value, str))88 self.assertIsInstance(new_d.value, str) 84 89 85 90 # Check that the output of dumps() is still an 8-bit string 86 91 s = xmlrpclib.dumps((new_d,), methodresponse=True) 87 self.assert _(isinstance(s, str))92 self.assertIsInstance(s, str) 88 93 89 94 def test_newstyle_class(self): … … 94 99 t.y = "Hello" 95 100 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 96 self.assertEqual s(t2, t.__dict__)101 self.assertEqual(t2, t.__dict__) 97 102 98 103 def test_dump_big_long(self): … … 137 142 arg1 = (alist + [None],) 138 143 strg = xmlrpclib.dumps(arg1, allow_none=True) 139 self.assertEqual s(value,140 144 self.assertEqual(value, 145 xmlrpclib.loads(strg)[0][0]) 141 146 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 142 147 … … 160 165 161 166 # sys.setdefaultencoding() normally doesn't exist after site.py is 162 # loaded. reload(sys) is the way to get it back. 167 # loaded. Import a temporary fresh copy to get access to it 168 # but then restore the original copy to avoid messing with 169 # other potentially modified sys module attributes 163 170 old_encoding = sys.getdefaultencoding() 164 setdefaultencoding_existed = hasattr(sys, "setdefaultencoding") 165 reload(sys) # ugh! 166 sys.setdefaultencoding("iso-8859-1") 167 try: 168 (s, d), m = xmlrpclib.loads(utf8) 169 finally: 170 sys.setdefaultencoding(old_encoding) 171 if not setdefaultencoding_existed: 172 del sys.setdefaultencoding 171 with test_support.CleanImport('sys'): 172 import sys as temp_sys 173 temp_sys.setdefaultencoding("iso-8859-1") 174 try: 175 (s, d), m = xmlrpclib.loads(utf8) 176 finally: 177 temp_sys.setdefaultencoding(old_encoding) 173 178 174 179 items = d.items() 175 180 if have_unicode: 176 self.assertEqual s(s, u"abc \x95")177 self.assert _(isinstance(s, unicode))178 self.assertEqual s(items, [(u"def \x96", u"ghi \x97")])179 self.assert _(isinstance(items[0][0], unicode))180 self.assert _(isinstance(items[0][1], unicode))181 self.assertEqual(s, u"abc \x95") 182 self.assertIsInstance(s, unicode) 183 self.assertEqual(items, [(u"def \x96", u"ghi \x97")]) 184 self.assertIsInstance(items[0][0], unicode) 185 self.assertIsInstance(items[0][1], unicode) 181 186 else: 182 self.assertEqual s(s, "abc \xc2\x95")183 self.assertEqual s(items, [("def \xc2\x96", "ghi \xc2\x97")])187 self.assertEqual(s, "abc \xc2\x95") 188 self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")]) 184 189 185 190 … … 200 205 s = xmlrpclib.dumps((f,)) 201 206 (newf,), m = xmlrpclib.loads(s) 202 self.assertEqual s(newf, {'faultCode': 42, 'faultString': 'Test Fault'})203 self.assertEqual s(m, None)207 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 208 self.assertEqual(m, None) 204 209 205 210 s = xmlrpclib.Marshaller().dumps(f) … … 268 273 269 274 270 PORT= None275 ADDR = PORT = URL = None 271 276 272 277 # The evt is set twice. First when the server is ready to serve. 273 278 # Second when the server has been shutdown. The user must clear 274 279 # the event after it has been set the first time to catch the second set. 275 def http_server(evt, numrequests ):280 def http_server(evt, numrequests, requestHandler=None): 276 281 class TestInstanceClass: 277 282 def div(self, x, y): … … 294 299 return s, port 295 300 301 if not requestHandler: 302 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 303 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 304 logRequests=False, bind_and_activate=False) 296 305 try: 297 serv = MyXMLRPCServer(("localhost", 0),298 logRequests=False, bind_and_activate=False)299 306 serv.socket.settimeout(3) 300 307 serv.server_bind() 301 global PORT 302 PORT = serv.socket.getsockname()[1] 308 global ADDR, PORT, URL 309 ADDR, PORT = serv.socket.getsockname() 310 #connect to IP address directly. This avoids socket.create_connection() 311 #trying to connect to "localhost" using all address families, which 312 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 313 #on AF_INET only. 314 URL = "http://%s:%d"%(ADDR, PORT) 303 315 serv.server_activate() 304 316 serv.register_introspection_functions() … … 322 334 evt.set() 323 335 336 def http_multi_server(evt, numrequests, requestHandler=None): 337 class TestInstanceClass: 338 def div(self, x, y): 339 return x // y 340 341 def _methodHelp(self, name): 342 if name == 'div': 343 return 'This is the div function' 344 345 def my_function(): 346 '''This is my function''' 347 return True 348 349 class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer): 350 def get_request(self): 351 # Ensure the socket is always non-blocking. On Linux, socket 352 # attributes are not inherited like they are on *BSD and Windows. 353 s, port = self.socket.accept() 354 s.setblocking(True) 355 return s, port 356 357 if not requestHandler: 358 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 359 class MyRequestHandler(requestHandler): 360 rpc_paths = [] 361 362 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 363 logRequests=False, bind_and_activate=False) 364 serv.socket.settimeout(3) 365 serv.server_bind() 366 try: 367 global ADDR, PORT, URL 368 ADDR, PORT = serv.socket.getsockname() 369 #connect to IP address directly. This avoids socket.create_connection() 370 #trying to connect to "localhost" using all address families, which 371 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 372 #on AF_INET only. 373 URL = "http://%s:%d"%(ADDR, PORT) 374 serv.server_activate() 375 paths = ["/foo", "/foo/bar"] 376 for path in paths: 377 d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher()) 378 d.register_introspection_functions() 379 d.register_multicall_functions() 380 serv.get_dispatcher(paths[0]).register_function(pow) 381 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 382 evt.set() 383 384 # handle up to 'numrequests' requests 385 while numrequests > 0: 386 serv.handle_request() 387 numrequests -= 1 388 389 except socket.timeout: 390 pass 391 finally: 392 serv.socket.close() 393 PORT = None 394 evt.set() 395 324 396 # This function prevents errors like: 325 397 # <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> … … 343 415 return False 344 416 417 @unittest.skipUnless(threading, 'Threading required for this test.') 418 class BaseServerTestCase(unittest.TestCase): 419 requestHandler = None 420 request_count = 1 421 threadFunc = staticmethod(http_server) 422 423 def setUp(self): 424 # enable traceback reporting 425 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True 426 427 self.evt = threading.Event() 428 # start server thread to handle requests 429 serv_args = (self.evt, self.request_count, self.requestHandler) 430 threading.Thread(target=self.threadFunc, args=serv_args).start() 431 432 # wait for the server to be ready 433 self.evt.wait(10) 434 self.evt.clear() 435 436 def tearDown(self): 437 # wait on the server thread to terminate 438 self.evt.wait(10) 439 440 # disable traceback reporting 441 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False 442 345 443 # NOTE: The tests in SimpleServerTestCase will ignore failures caused by 346 444 # "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This 347 445 # condition occurs infrequently on some platforms, frequently on others, and 348 # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket .446 # is apparently caused by using SimpleXMLRPCServer with a non-blocking socket 349 447 # If the server class is updated at some point in the future to handle this 350 448 # situation more gracefully, these tests should be modified appropriately. 351 449 352 class SimpleServerTestCase(unittest.TestCase): 353 def setUp(self): 354 # enable traceback reporting 355 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True 356 357 self.evt = threading.Event() 358 # start server thread to handle requests 359 serv_args = (self.evt, 1) 360 threading.Thread(target=http_server, args=serv_args).start() 361 362 # wait for the server to be ready 363 self.evt.wait() 364 self.evt.clear() 365 366 def tearDown(self): 367 # wait on the server thread to terminate 368 self.evt.wait() 369 370 # disable traceback reporting 371 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False 372 450 class SimpleServerTestCase(BaseServerTestCase): 373 451 def test_simple1(self): 374 452 try: 375 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)453 p = xmlrpclib.ServerProxy(URL) 376 454 self.assertEqual(p.pow(6,8), 6**8) 377 455 except (xmlrpclib.ProtocolError, socket.error), e: … … 381 459 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 382 460 461 def test_nonascii(self): 462 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 463 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 464 465 try: 466 p = xmlrpclib.ServerProxy(URL) 467 self.assertEqual(p.add(start_string, end_string), 468 start_string + end_string) 469 except (xmlrpclib.ProtocolError, socket.error) as e: 470 # ignore failures due to non-blocking socket unavailable errors. 471 if not is_unavailable_exception(e): 472 # protocol error; provide additional information in test output 473 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 474 475 def test_unicode_host(self): 476 server = xmlrpclib.ServerProxy(u"http://%s:%d/RPC2"%(ADDR, PORT)) 477 self.assertEqual(server.add("a", u"\xe9"), u"a\xe9") 478 383 479 # [ch] The test 404 is causing lots of false alarms. 384 480 def XXXtest_404(self): 385 481 # send POST with httplib, it should return 404 header and 386 482 # 'Not Found' message. 387 conn = httplib.HTTPConnection( 'localhost', PORT)483 conn = httplib.HTTPConnection(ADDR, PORT) 388 484 conn.request('POST', '/this-is-not-valid') 389 485 response = conn.getresponse() … … 395 491 def test_introspection1(self): 396 492 try: 397 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)493 p = xmlrpclib.ServerProxy(URL) 398 494 meth = p.system.listMethods() 399 495 expected_methods = set(['pow', 'div', 'my_function', 'add', … … 410 506 try: 411 507 # test _methodHelp() 412 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)508 p = xmlrpclib.ServerProxy(URL) 413 509 divhelp = p.system.methodHelp('div') 414 510 self.assertEqual(divhelp, 'This is the div function') … … 419 515 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 420 516 517 @unittest.skipIf(sys.flags.optimize >= 2, 518 "Docstrings are omitted with -O2 and above") 421 519 def test_introspection3(self): 422 520 try: 423 521 # test native doc 424 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)522 p = xmlrpclib.ServerProxy(URL) 425 523 myfunction = p.system.methodHelp('my_function') 426 524 self.assertEqual(myfunction, 'This is my function') … … 435 533 # at least check that we can try making the call 436 534 try: 437 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)535 p = xmlrpclib.ServerProxy(URL) 438 536 divsig = p.system.methodSignature('div') 439 537 self.assertEqual(divsig, 'signatures not supported') … … 446 544 def test_multicall(self): 447 545 try: 448 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)546 p = xmlrpclib.ServerProxy(URL) 449 547 multicall = xmlrpclib.MultiCall(p) 450 548 multicall.add(2,3) … … 463 561 def test_non_existing_multicall(self): 464 562 try: 465 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)563 p = xmlrpclib.ServerProxy(URL) 466 564 multicall = xmlrpclib.MultiCall(p) 467 565 multicall.this_is_not_exists() … … 487 585 SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add') 488 586 489 self.assert _(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))587 self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title')) 490 588 # Get the test to run faster by sending a request with test_simple1. 491 589 # This avoids waiting for the socket timeout. 492 590 self.test_simple1() 591 592 def test_partial_post(self): 593 # Check that a partial POST doesn't make the server loop: issue #14001. 594 conn = httplib.HTTPConnection(ADDR, PORT) 595 conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') 596 conn.close() 597 598 class MultiPathServerTestCase(BaseServerTestCase): 599 threadFunc = staticmethod(http_multi_server) 600 request_count = 2 601 def test_path1(self): 602 p = xmlrpclib.ServerProxy(URL+"/foo") 603 self.assertEqual(p.pow(6,8), 6**8) 604 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 605 def test_path2(self): 606 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 607 self.assertEqual(p.add(6,8), 6+8) 608 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 609 610 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 611 #does indeed serve subsequent requests on the same connection 612 class BaseKeepaliveServerTestCase(BaseServerTestCase): 613 #a request handler that supports keep-alive and logs requests into a 614 #class variable 615 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): 616 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 617 protocol_version = 'HTTP/1.1' 618 myRequests = [] 619 def handle(self): 620 self.myRequests.append([]) 621 self.reqidx = len(self.myRequests)-1 622 return self.parentClass.handle(self) 623 def handle_one_request(self): 624 result = self.parentClass.handle_one_request(self) 625 self.myRequests[self.reqidx].append(self.raw_requestline) 626 return result 627 628 requestHandler = RequestHandler 629 def setUp(self): 630 #clear request log 631 self.RequestHandler.myRequests = [] 632 return BaseServerTestCase.setUp(self) 633 634 #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 635 #does indeed serve subsequent requests on the same connection 636 class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 637 def test_two(self): 638 p = xmlrpclib.ServerProxy(URL) 639 #do three requests. 640 self.assertEqual(p.pow(6,8), 6**8) 641 self.assertEqual(p.pow(6,8), 6**8) 642 self.assertEqual(p.pow(6,8), 6**8) 643 644 #they should have all been handled by a single request handler 645 self.assertEqual(len(self.RequestHandler.myRequests), 1) 646 647 #check that we did at least two (the third may be pending append 648 #due to thread scheduling) 649 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 650 651 #test special attribute access on the serverproxy, through the __call__ 652 #function. 653 class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 654 #ask for two keepalive requests to be handled. 655 request_count=2 656 657 def test_close(self): 658 p = xmlrpclib.ServerProxy(URL) 659 #do some requests with close. 660 self.assertEqual(p.pow(6,8), 6**8) 661 self.assertEqual(p.pow(6,8), 6**8) 662 self.assertEqual(p.pow(6,8), 6**8) 663 p("close")() #this should trigger a new keep-alive request 664 self.assertEqual(p.pow(6,8), 6**8) 665 self.assertEqual(p.pow(6,8), 6**8) 666 self.assertEqual(p.pow(6,8), 6**8) 667 668 #they should have all been two request handlers, each having logged at least 669 #two complete requests 670 self.assertEqual(len(self.RequestHandler.myRequests), 2) 671 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 672 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 673 674 def test_transport(self): 675 p = xmlrpclib.ServerProxy(URL) 676 #do some requests with close. 677 self.assertEqual(p.pow(6,8), 6**8) 678 p("transport").close() #same as above, really. 679 self.assertEqual(p.pow(6,8), 6**8) 680 self.assertEqual(len(self.RequestHandler.myRequests), 2) 681 682 #A test case that verifies that gzip encoding works in both directions 683 #(for a request and the response) 684 class GzipServerTestCase(BaseServerTestCase): 685 #a request handler that supports keep-alive and logs requests into a 686 #class variable 687 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): 688 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler 689 protocol_version = 'HTTP/1.1' 690 691 def do_POST(self): 692 #store content of last request in class 693 self.__class__.content_length = int(self.headers["content-length"]) 694 return self.parentClass.do_POST(self) 695 requestHandler = RequestHandler 696 697 class Transport(xmlrpclib.Transport): 698 #custom transport, stores the response length for our perusal 699 fake_gzip = False 700 def parse_response(self, response): 701 self.response_length=int(response.getheader("content-length", 0)) 702 return xmlrpclib.Transport.parse_response(self, response) 703 704 def send_content(self, connection, body): 705 if self.fake_gzip: 706 #add a lone gzip header to induce decode error remotely 707 connection.putheader("Content-Encoding", "gzip") 708 return xmlrpclib.Transport.send_content(self, connection, body) 709 710 def setUp(self): 711 BaseServerTestCase.setUp(self) 712 713 def test_gzip_request(self): 714 t = self.Transport() 715 t.encode_threshold = None 716 p = xmlrpclib.ServerProxy(URL, transport=t) 717 self.assertEqual(p.pow(6,8), 6**8) 718 a = self.RequestHandler.content_length 719 t.encode_threshold = 0 #turn on request encoding 720 self.assertEqual(p.pow(6,8), 6**8) 721 b = self.RequestHandler.content_length 722 self.assertTrue(a>b) 723 724 def test_bad_gzip_request(self): 725 t = self.Transport() 726 t.encode_threshold = None 727 t.fake_gzip = True 728 p = xmlrpclib.ServerProxy(URL, transport=t) 729 cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError, 730 re.compile(r"\b400\b")) 731 with cm: 732 p.pow(6, 8) 733 734 def test_gsip_response(self): 735 t = self.Transport() 736 p = xmlrpclib.ServerProxy(URL, transport=t) 737 old = self.requestHandler.encode_threshold 738 self.requestHandler.encode_threshold = None #no encoding 739 self.assertEqual(p.pow(6,8), 6**8) 740 a = t.response_length 741 self.requestHandler.encode_threshold = 0 #always encode 742 self.assertEqual(p.pow(6,8), 6**8) 743 b = t.response_length 744 self.requestHandler.encode_threshold = old 745 self.assertTrue(a>b) 746 747 #Test special attributes of the ServerProxy object 748 class ServerProxyTestCase(unittest.TestCase): 749 def setUp(self): 750 unittest.TestCase.setUp(self) 751 if threading: 752 self.url = URL 753 else: 754 # Without threading, http_server() and http_multi_server() will not 755 # be executed and URL is still equal to None. 'http://' is a just 756 # enough to choose the scheme (HTTP) 757 self.url = 'http://' 758 759 def test_close(self): 760 p = xmlrpclib.ServerProxy(self.url) 761 self.assertEqual(p('close')(), None) 762 763 def test_transport(self): 764 t = xmlrpclib.Transport() 765 p = xmlrpclib.ServerProxy(self.url, transport=t) 766 self.assertEqual(p('transport'), t) 493 767 494 768 # This is a contrived way to make a failure occur on the server side … … 502 776 503 777 778 @unittest.skipUnless(threading, 'Threading required for this test.') 504 779 class FailingServerTestCase(unittest.TestCase): 505 780 def setUp(self): … … 531 806 # test a call that shouldn't fail just as a smoke test 532 807 try: 533 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)808 p = xmlrpclib.ServerProxy(URL) 534 809 self.assertEqual(p.pow(6,8), 6**8) 535 810 except (xmlrpclib.ProtocolError, socket.error), e: … … 544 819 545 820 try: 546 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)821 p = xmlrpclib.ServerProxy(URL) 547 822 p.pow(6,8) 548 823 except (xmlrpclib.ProtocolError, socket.error), e: … … 564 839 565 840 try: 566 p = xmlrpclib.ServerProxy( 'http://localhost:%d' % PORT)841 p = xmlrpclib.ServerProxy(URL) 567 842 p.pow(6,8) 568 843 except (xmlrpclib.ProtocolError, socket.error), e: … … 585 860 def test_cgi_get(self): 586 861 with test_support.EnvironmentVarGuard() as env: 587 env .set('REQUEST_METHOD', 'GET')862 env['REQUEST_METHOD'] = 'GET' 588 863 # if the method is GET and no request_text is given, it runs handle_get 589 864 # get sysout output 590 tmp = sys.stdout 591 sys.stdout = open(test_support.TESTFN, "w") 592 self.cgi.handle_request() 593 sys.stdout.close() 594 sys.stdout = tmp 865 with test_support.captured_stdout() as data_out: 866 self.cgi.handle_request() 595 867 596 868 # parse Status header 597 handle = open(test_support.TESTFN, "r").read() 869 data_out.seek(0) 870 handle = data_out.read() 598 871 status = handle.split()[1] 599 872 message = ' '.join(handle.split()[2:4]) … … 602 875 self.assertEqual(message, 'Bad Request') 603 876 604 os.remove(test_support.TESTFN)605 877 606 878 def test_cgi_xmlrpc_response(self): 607 879 data = """<?xml version='1.0'?> 608 <methodCall> 609 <methodName>test_method</methodName> 610 <params> 611 <param> 612 <value><string>foo</string></value> 613 </param> 614 <param> 615 <value><string>bar</string></value> 616 </param> 617 </params> 618 </methodCall> 619 """ 620 open("xmldata.txt", "w").write(data) 621 tmp1 = sys.stdin 622 tmp2 = sys.stdout 623 624 sys.stdin = open("xmldata.txt", "r") 625 sys.stdout = open(test_support.TESTFN, "w") 626 627 with test_support.EnvironmentVarGuard() as env: 628 env.set('CONTENT_LENGTH', str(len(data))) 880 <methodCall> 881 <methodName>test_method</methodName> 882 <params> 883 <param> 884 <value><string>foo</string></value> 885 </param> 886 <param> 887 <value><string>bar</string></value> 888 </param> 889 </params> 890 </methodCall> 891 """ 892 893 with test_support.EnvironmentVarGuard() as env, \ 894 test_support.captured_stdout() as data_out, \ 895 test_support.captured_stdin() as data_in: 896 data_in.write(data) 897 data_in.seek(0) 898 env['CONTENT_LENGTH'] = str(len(data)) 629 899 self.cgi.handle_request() 630 631 sys.stdin.close() 632 sys.stdout.close() 633 sys.stdin = tmp1 634 sys.stdout = tmp2 900 data_out.seek(0) 635 901 636 902 # will respond exception, if so, our goal is achieved ;) 637 handle = open(test_support.TESTFN, "r").read()903 handle = data_out.read() 638 904 639 905 # start with 44th char so as not to get http header, we just need only xml 640 906 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 641 907 642 os.remove("xmldata.txt") 643 os.remove(test_support.TESTFN) 908 # Also test the content-length returned by handle_request 909 # Using the same test method inorder to avoid all the datapassing 910 # boilerplate code. 911 # Test for bug: http://bugs.python.org/issue5040 912 913 content = handle[handle.find("<?xml"):] 914 915 self.assertEqual( 916 int(re.search('Content-Length: (\d+)', handle).group(1)), 917 len(content)) 918 644 919 645 920 class FakeSocket: … … 658 933 return self.data.getvalue() 659 934 660 def makefile(self, x , y):935 def makefile(self, x='r', y=-1): 661 936 raise RuntimeError 937 938 def close(self): 939 pass 662 940 663 941 class FakeTransport(xmlrpclib.Transport): … … 671 949 def make_connection(self, host): 672 950 conn = xmlrpclib.Transport.make_connection(self, host) 673 conn. _conn.sock = self.fake_socket = FakeSocket()951 conn.sock = self.fake_socket = FakeSocket() 674 952 return conn 675 953 … … 695 973 696 974 req = self.issue_request(TestTransport) 697 self.assert _("X-Test: test_custom_user_agent\r\n" inreq)975 self.assertIn("X-Test: test_custom_user_agent\r\n", req) 698 976 699 977 def test_send_host(self): … … 705 983 706 984 req = self.issue_request(TestTransport) 707 self.assert _("X-Test: test_send_host\r\n" inreq)985 self.assertIn("X-Test: test_send_host\r\n", req) 708 986 709 987 def test_send_request(self): … … 715 993 716 994 req = self.issue_request(TestTransport) 717 self.assert _("X-Test: test_send_request\r\n" inreq)995 self.assertIn("X-Test: test_send_request\r\n", req) 718 996 719 997 def test_send_content(self): … … 725 1003 726 1004 req = self.issue_request(TestTransport) 727 self.assert_("X-Test: test_send_content\r\n" in req) 728 1005 self.assertIn("X-Test: test_send_content\r\n", req) 1006 1007 @test_support.reap_threads 729 1008 def test_main(): 730 1009 xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 731 1010 BinaryTestCase, FaultTestCase, TransportSubclassTestCase] 732 733 # The test cases against a SimpleXMLRPCServer raise a socket error 734 # 10035 (WSAEWOULDBLOCK) in the server thread handle_request call when 735 # run on Windows. This only happens on the first test to run, but it 736 # fails every time and so these tests are skipped on win32 platforms. 737 if sys.platform != 'win32': 738 xmlrpc_tests.append(SimpleServerTestCase) 739 xmlrpc_tests.append(FailingServerTestCase) 740 xmlrpc_tests.append(CGIHandlerTestCase) 1011 xmlrpc_tests.append(SimpleServerTestCase) 1012 xmlrpc_tests.append(KeepaliveServerTestCase1) 1013 xmlrpc_tests.append(KeepaliveServerTestCase2) 1014 try: 1015 import gzip 1016 xmlrpc_tests.append(GzipServerTestCase) 1017 except ImportError: 1018 pass #gzip not supported in this build 1019 xmlrpc_tests.append(MultiPathServerTestCase) 1020 xmlrpc_tests.append(ServerProxyTestCase) 1021 xmlrpc_tests.append(FailingServerTestCase) 1022 xmlrpc_tests.append(CGIHandlerTestCase) 741 1023 742 1024 test_support.run_unittest(*xmlrpc_tests)
Note:
See TracChangeset
for help on using the changeset viewer.