source: python/trunk/Lib/test/test_xmlrpc.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 38.1 KB
Line 
1import base64
2import datetime
3import sys
4import time
5import unittest
6import xmlrpclib
7import SimpleXMLRPCServer
8import mimetools
9import httplib
10import socket
11import StringIO
12import os
13import re
14from test import test_support
15
16try:
17 import threading
18except ImportError:
19 threading = None
20
21try:
22 unicode
23except NameError:
24 have_unicode = False
25else:
26 have_unicode = True
27
28alist = [{'astring': 'foo@bar.baz.spam',
29 'afloat': 7283.43,
30 'anint': 2**20,
31 'ashortlong': 2L,
32 'anotherlist': ['.zyx.41'],
33 'abase64': xmlrpclib.Binary("my dog has fleas"),
34 'boolean': xmlrpclib.False,
35 'unicode': u'\u4000\u6000\u8000',
36 u'ukey\u4000': 'regular value',
37 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
38 'datetime2': xmlrpclib.DateTime(
39 (2005, 02, 10, 11, 41, 23, 0, 1, -1)),
40 'datetime3': xmlrpclib.DateTime(
41 datetime.datetime(2005, 02, 10, 11, 41, 23)),
42 }]
43
44class XMLRPCTestCase(unittest.TestCase):
45
46 def test_dump_load(self):
47 self.assertEqual(alist,
48 xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])
49
50 def test_dump_bare_datetime(self):
51 # This checks that an unwrapped datetime.date object can be handled
52 # by the marshalling code. This can't be done via test_dump_load()
53 # since with use_datetime set to 1 the unmarshaller would create
54 # datetime objects for the 'datetime[123]' keys as well
55 dt = datetime.datetime(2005, 02, 10, 11, 41, 23)
56 s = xmlrpclib.dumps((dt,))
57 (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
58 self.assertEqual(newdt, dt)
59 self.assertEqual(m, None)
60
61 (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
62 self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
63
64 def test_datetime_before_1900(self):
65 # same as before but with a date before 1900
66 dt = datetime.datetime(1, 02, 10, 11, 41, 23)
67 s = xmlrpclib.dumps((dt,))
68 (newdt,), m = xmlrpclib.loads(s, use_datetime=1)
69 self.assertEqual(newdt, dt)
70 self.assertEqual(m, None)
71
72 (newdt,), m = xmlrpclib.loads(s, use_datetime=0)
73 self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
74
75 def test_cmp_datetime_DateTime(self):
76 now = datetime.datetime.now()
77 dt = xmlrpclib.DateTime(now.timetuple())
78 self.assertTrue(dt == now)
79 self.assertTrue(now == dt)
80 then = now + datetime.timedelta(seconds=4)
81 self.assertTrue(then >= dt)
82 self.assertTrue(dt < then)
83
84 def test_bug_1164912 (self):
85 d = xmlrpclib.DateTime()
86 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
87 methodresponse=True))
88 self.assertIsInstance(new_d.value, str)
89
90 # Check that the output of dumps() is still an 8-bit string
91 s = xmlrpclib.dumps((new_d,), methodresponse=True)
92 self.assertIsInstance(s, str)
93
94 def test_newstyle_class(self):
95 class T(object):
96 pass
97 t = T()
98 t.x = 100
99 t.y = "Hello"
100 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
101 self.assertEqual(t2, t.__dict__)
102
103 def test_dump_big_long(self):
104 self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,))
105
106 def test_dump_bad_dict(self):
107 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
108
109 def test_dump_recursive_seq(self):
110 l = [1,2,3]
111 t = [3,4,5,l]
112 l.append(t)
113 self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
114
115 def test_dump_recursive_dict(self):
116 d = {'1':1, '2':1}
117 t = {'3':3, 'd':d}
118 d['t'] = t
119 self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
120
121 def test_dump_big_int(self):
122 if sys.maxint > 2L**31-1:
123 self.assertRaises(OverflowError, xmlrpclib.dumps,
124 (int(2L**34),))
125
126 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
127 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))
128 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))
129
130 def dummy_write(s):
131 pass
132
133 m = xmlrpclib.Marshaller()
134 m.dump_int(xmlrpclib.MAXINT, dummy_write)
135 m.dump_int(xmlrpclib.MININT, dummy_write)
136 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)
137 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)
138
139
140 def test_dump_none(self):
141 value = alist + [None]
142 arg1 = (alist + [None],)
143 strg = xmlrpclib.dumps(arg1, allow_none=True)
144 self.assertEqual(value,
145 xmlrpclib.loads(strg)[0][0])
146 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
147
148 def test_default_encoding_issues(self):
149 # SF bug #1115989: wrong decoding in '_stringify'
150 utf8 = """<?xml version='1.0' encoding='iso-8859-1'?>
151 <params>
152 <param><value>
153 <string>abc \x95</string>
154 </value></param>
155 <param><value>
156 <struct>
157 <member>
158 <name>def \x96</name>
159 <value><string>ghi \x97</string></value>
160 </member>
161 </struct>
162 </value></param>
163 </params>
164 """
165
166 # sys.setdefaultencoding() normally doesn't exist after site.py is
167 # loaded. Import a temporary fresh copy to get access to it
168 # but then restore the original copy to avoid messing with
169 # other potentially modified sys module attributes
170 old_encoding = sys.getdefaultencoding()
171 with test_support.CleanImport('sys'):
172 import sys as temp_sys
173 temp_sys.setdefaultencoding("iso-8859-1")
174 try:
175 (s, d), m = xmlrpclib.loads(utf8)
176 finally:
177 temp_sys.setdefaultencoding(old_encoding)
178
179 items = d.items()
180 if have_unicode:
181 self.assertEqual(s, u"abc \x95")
182 self.assertIsInstance(s, unicode)
183 self.assertEqual(items, [(u"def \x96", u"ghi \x97")])
184 self.assertIsInstance(items[0][0], unicode)
185 self.assertIsInstance(items[0][1], unicode)
186 else:
187 self.assertEqual(s, "abc \xc2\x95")
188 self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")])
189
190
191class HelperTestCase(unittest.TestCase):
192 def test_escape(self):
193 self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
194 self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
195 self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
196
197class FaultTestCase(unittest.TestCase):
198 def test_repr(self):
199 f = xmlrpclib.Fault(42, 'Test Fault')
200 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
201 self.assertEqual(repr(f), str(f))
202
203 def test_dump_fault(self):
204 f = xmlrpclib.Fault(42, 'Test Fault')
205 s = xmlrpclib.dumps((f,))
206 (newf,), m = xmlrpclib.loads(s)
207 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
208 self.assertEqual(m, None)
209
210 s = xmlrpclib.Marshaller().dumps(f)
211 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
212
213
214class DateTimeTestCase(unittest.TestCase):
215 def test_default(self):
216 t = xmlrpclib.DateTime()
217
218 def test_time(self):
219 d = 1181399930.036952
220 t = xmlrpclib.DateTime(d)
221 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
222
223 def test_time_tuple(self):
224 d = (2007,6,9,10,38,50,5,160,0)
225 t = xmlrpclib.DateTime(d)
226 self.assertEqual(str(t), '20070609T10:38:50')
227
228 def test_time_struct(self):
229 d = time.localtime(1181399930.036952)
230 t = xmlrpclib.DateTime(d)
231 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
232
233 def test_datetime_datetime(self):
234 d = datetime.datetime(2007,1,2,3,4,5)
235 t = xmlrpclib.DateTime(d)
236 self.assertEqual(str(t), '20070102T03:04:05')
237
238 def test_repr(self):
239 d = datetime.datetime(2007,1,2,3,4,5)
240 t = xmlrpclib.DateTime(d)
241 val ="<DateTime '20070102T03:04:05' at %x>" % id(t)
242 self.assertEqual(repr(t), val)
243
244 def test_decode(self):
245 d = ' 20070908T07:11:13 '
246 t1 = xmlrpclib.DateTime()
247 t1.decode(d)
248 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
249 self.assertEqual(t1, tref)
250
251 t2 = xmlrpclib._datetime(d)
252 self.assertEqual(t1, tref)
253
254class BinaryTestCase(unittest.TestCase):
255 def test_default(self):
256 t = xmlrpclib.Binary()
257 self.assertEqual(str(t), '')
258
259 def test_string(self):
260 d = '\x01\x02\x03abc123\xff\xfe'
261 t = xmlrpclib.Binary(d)
262 self.assertEqual(str(t), d)
263
264 def test_decode(self):
265 d = '\x01\x02\x03abc123\xff\xfe'
266 de = base64.encodestring(d)
267 t1 = xmlrpclib.Binary()
268 t1.decode(de)
269 self.assertEqual(str(t1), d)
270
271 t2 = xmlrpclib._binary(de)
272 self.assertEqual(str(t2), d)
273
274
275ADDR = PORT = URL = None
276
277# The evt is set twice. First when the server is ready to serve.
278# Second when the server has been shutdown. The user must clear
279# the event after it has been set the first time to catch the second set.
280def http_server(evt, numrequests, requestHandler=None):
281 class TestInstanceClass:
282 def div(self, x, y):
283 return x // y
284
285 def _methodHelp(self, name):
286 if name == 'div':
287 return 'This is the div function'
288
289 def my_function():
290 '''This is my function'''
291 return True
292
293 class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
294 def get_request(self):
295 # Ensure the socket is always non-blocking. On Linux, socket
296 # attributes are not inherited like they are on *BSD and Windows.
297 s, port = self.socket.accept()
298 s.setblocking(True)
299 return s, port
300
301 if not requestHandler:
302 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
303 serv = MyXMLRPCServer(("localhost", 0), requestHandler,
304 logRequests=False, bind_and_activate=False)
305 try:
306 serv.socket.settimeout(3)
307 serv.server_bind()
308 global ADDR, PORT, URL
309 ADDR, PORT = serv.socket.getsockname()
310 #connect to IP address directly. This avoids socket.create_connection()
311 #trying to connect to "localhost" using all address families, which
312 #causes slowdown e.g. on vista which supports AF_INET6. The server listens
313 #on AF_INET only.
314 URL = "http://%s:%d"%(ADDR, PORT)
315 serv.server_activate()
316 serv.register_introspection_functions()
317 serv.register_multicall_functions()
318 serv.register_function(pow)
319 serv.register_function(lambda x,y: x+y, 'add')
320 serv.register_function(my_function)
321 serv.register_instance(TestInstanceClass())
322 evt.set()
323
324 # handle up to 'numrequests' requests
325 while numrequests > 0:
326 serv.handle_request()
327 numrequests -= 1
328
329 except socket.timeout:
330 pass
331 finally:
332 serv.socket.close()
333 PORT = None
334 evt.set()
335
336def http_multi_server(evt, numrequests, requestHandler=None):
337 class TestInstanceClass:
338 def div(self, x, y):
339 return x // y
340
341 def _methodHelp(self, name):
342 if name == 'div':
343 return 'This is the div function'
344
345 def my_function():
346 '''This is my function'''
347 return True
348
349 class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer):
350 def get_request(self):
351 # Ensure the socket is always non-blocking. On Linux, socket
352 # attributes are not inherited like they are on *BSD and Windows.
353 s, port = self.socket.accept()
354 s.setblocking(True)
355 return s, port
356
357 if not requestHandler:
358 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
359 class MyRequestHandler(requestHandler):
360 rpc_paths = []
361
362 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
363 logRequests=False, bind_and_activate=False)
364 serv.socket.settimeout(3)
365 serv.server_bind()
366 try:
367 global ADDR, PORT, URL
368 ADDR, PORT = serv.socket.getsockname()
369 #connect to IP address directly. This avoids socket.create_connection()
370 #trying to connect to "localhost" using all address families, which
371 #causes slowdown e.g. on vista which supports AF_INET6. The server listens
372 #on AF_INET only.
373 URL = "http://%s:%d"%(ADDR, PORT)
374 serv.server_activate()
375 paths = ["/foo", "/foo/bar"]
376 for path in paths:
377 d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher())
378 d.register_introspection_functions()
379 d.register_multicall_functions()
380 serv.get_dispatcher(paths[0]).register_function(pow)
381 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
382 evt.set()
383
384 # handle up to 'numrequests' requests
385 while numrequests > 0:
386 serv.handle_request()
387 numrequests -= 1
388
389 except socket.timeout:
390 pass
391 finally:
392 serv.socket.close()
393 PORT = None
394 evt.set()
395
396# This function prevents errors like:
397# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
398def is_unavailable_exception(e):
399 '''Returns True if the given ProtocolError is the product of a server-side
400 exception caused by the 'temporarily unavailable' response sometimes
401 given by operations on non-blocking sockets.'''
402
403 # sometimes we get a -1 error code and/or empty headers
404 try:
405 if e.errcode == -1 or e.headers is None:
406 return True
407 exc_mess = e.headers.get('X-exception')
408 except AttributeError:
409 # Ignore socket.errors here.
410 exc_mess = str(e)
411
412 if exc_mess and 'temporarily unavailable' in exc_mess.lower():
413 return True
414
415 return False
416
417@unittest.skipUnless(threading, 'Threading required for this test.')
418class BaseServerTestCase(unittest.TestCase):
419 requestHandler = None
420 request_count = 1
421 threadFunc = staticmethod(http_server)
422
423 def setUp(self):
424 # enable traceback reporting
425 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
426
427 self.evt = threading.Event()
428 # start server thread to handle requests
429 serv_args = (self.evt, self.request_count, self.requestHandler)
430 threading.Thread(target=self.threadFunc, args=serv_args).start()
431
432 # wait for the server to be ready
433 self.evt.wait(10)
434 self.evt.clear()
435
436 def tearDown(self):
437 # wait on the server thread to terminate
438 self.evt.wait(10)
439
440 # disable traceback reporting
441 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
442
443# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
444# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
445# condition occurs infrequently on some platforms, frequently on others, and
446# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
447# If the server class is updated at some point in the future to handle this
448# situation more gracefully, these tests should be modified appropriately.
449
450class SimpleServerTestCase(BaseServerTestCase):
451 def test_simple1(self):
452 try:
453 p = xmlrpclib.ServerProxy(URL)
454 self.assertEqual(p.pow(6,8), 6**8)
455 except (xmlrpclib.ProtocolError, socket.error), e:
456 # ignore failures due to non-blocking socket 'unavailable' errors
457 if not is_unavailable_exception(e):
458 # protocol error; provide additional information in test output
459 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
460
461 def test_nonascii(self):
462 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
463 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
464
465 try:
466 p = xmlrpclib.ServerProxy(URL)
467 self.assertEqual(p.add(start_string, end_string),
468 start_string + end_string)
469 except (xmlrpclib.ProtocolError, socket.error) as e:
470 # ignore failures due to non-blocking socket unavailable errors.
471 if not is_unavailable_exception(e):
472 # protocol error; provide additional information in test output
473 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
474
475 def test_unicode_host(self):
476 server = xmlrpclib.ServerProxy(u"http://%s:%d/RPC2"%(ADDR, PORT))
477 self.assertEqual(server.add("a", u"\xe9"), u"a\xe9")
478
479 # [ch] The test 404 is causing lots of false alarms.
480 def XXXtest_404(self):
481 # send POST with httplib, it should return 404 header and
482 # 'Not Found' message.
483 conn = httplib.HTTPConnection(ADDR, PORT)
484 conn.request('POST', '/this-is-not-valid')
485 response = conn.getresponse()
486 conn.close()
487
488 self.assertEqual(response.status, 404)
489 self.assertEqual(response.reason, 'Not Found')
490
491 def test_introspection1(self):
492 try:
493 p = xmlrpclib.ServerProxy(URL)
494 meth = p.system.listMethods()
495 expected_methods = set(['pow', 'div', 'my_function', 'add',
496 'system.listMethods', 'system.methodHelp',
497 'system.methodSignature', 'system.multicall'])
498 self.assertEqual(set(meth), expected_methods)
499 except (xmlrpclib.ProtocolError, socket.error), e:
500 # ignore failures due to non-blocking socket 'unavailable' errors
501 if not is_unavailable_exception(e):
502 # protocol error; provide additional information in test output
503 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
504
505 def test_introspection2(self):
506 try:
507 # test _methodHelp()
508 p = xmlrpclib.ServerProxy(URL)
509 divhelp = p.system.methodHelp('div')
510 self.assertEqual(divhelp, 'This is the div function')
511 except (xmlrpclib.ProtocolError, socket.error), e:
512 # ignore failures due to non-blocking socket 'unavailable' errors
513 if not is_unavailable_exception(e):
514 # protocol error; provide additional information in test output
515 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
516
517 @unittest.skipIf(sys.flags.optimize >= 2,
518 "Docstrings are omitted with -O2 and above")
519 def test_introspection3(self):
520 try:
521 # test native doc
522 p = xmlrpclib.ServerProxy(URL)
523 myfunction = p.system.methodHelp('my_function')
524 self.assertEqual(myfunction, 'This is my function')
525 except (xmlrpclib.ProtocolError, socket.error), e:
526 # ignore failures due to non-blocking socket 'unavailable' errors
527 if not is_unavailable_exception(e):
528 # protocol error; provide additional information in test output
529 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
530
531 def test_introspection4(self):
532 # the SimpleXMLRPCServer doesn't support signatures, but
533 # at least check that we can try making the call
534 try:
535 p = xmlrpclib.ServerProxy(URL)
536 divsig = p.system.methodSignature('div')
537 self.assertEqual(divsig, 'signatures not supported')
538 except (xmlrpclib.ProtocolError, socket.error), e:
539 # ignore failures due to non-blocking socket 'unavailable' errors
540 if not is_unavailable_exception(e):
541 # protocol error; provide additional information in test output
542 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
543
544 def test_multicall(self):
545 try:
546 p = xmlrpclib.ServerProxy(URL)
547 multicall = xmlrpclib.MultiCall(p)
548 multicall.add(2,3)
549 multicall.pow(6,8)
550 multicall.div(127,42)
551 add_result, pow_result, div_result = multicall()
552 self.assertEqual(add_result, 2+3)
553 self.assertEqual(pow_result, 6**8)
554 self.assertEqual(div_result, 127//42)
555 except (xmlrpclib.ProtocolError, socket.error), e:
556 # ignore failures due to non-blocking socket 'unavailable' errors
557 if not is_unavailable_exception(e):
558 # protocol error; provide additional information in test output
559 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
560
561 def test_non_existing_multicall(self):
562 try:
563 p = xmlrpclib.ServerProxy(URL)
564 multicall = xmlrpclib.MultiCall(p)
565 multicall.this_is_not_exists()
566 result = multicall()
567
568 # result.results contains;
569 # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:'
570 # 'method "this_is_not_exists" is not supported'>}]
571
572 self.assertEqual(result.results[0]['faultCode'], 1)
573 self.assertEqual(result.results[0]['faultString'],
574 '<type \'exceptions.Exception\'>:method "this_is_not_exists" '
575 'is not supported')
576 except (xmlrpclib.ProtocolError, socket.error), e:
577 # ignore failures due to non-blocking socket 'unavailable' errors
578 if not is_unavailable_exception(e):
579 # protocol error; provide additional information in test output
580 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
581
582 def test_dotted_attribute(self):
583 # Raises an AttributeError because private methods are not allowed.
584 self.assertRaises(AttributeError,
585 SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
586
587 self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
588 # Get the test to run faster by sending a request with test_simple1.
589 # This avoids waiting for the socket timeout.
590 self.test_simple1()
591
592 def test_partial_post(self):
593 # Check that a partial POST doesn't make the server loop: issue #14001.
594 conn = httplib.HTTPConnection(ADDR, PORT)
595 conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye')
596 conn.close()
597
598class MultiPathServerTestCase(BaseServerTestCase):
599 threadFunc = staticmethod(http_multi_server)
600 request_count = 2
601 def test_path1(self):
602 p = xmlrpclib.ServerProxy(URL+"/foo")
603 self.assertEqual(p.pow(6,8), 6**8)
604 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
605 def test_path2(self):
606 p = xmlrpclib.ServerProxy(URL+"/foo/bar")
607 self.assertEqual(p.add(6,8), 6+8)
608 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
609
610#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
611#does indeed serve subsequent requests on the same connection
612class BaseKeepaliveServerTestCase(BaseServerTestCase):
613 #a request handler that supports keep-alive and logs requests into a
614 #class variable
615 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
616 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
617 protocol_version = 'HTTP/1.1'
618 myRequests = []
619 def handle(self):
620 self.myRequests.append([])
621 self.reqidx = len(self.myRequests)-1
622 return self.parentClass.handle(self)
623 def handle_one_request(self):
624 result = self.parentClass.handle_one_request(self)
625 self.myRequests[self.reqidx].append(self.raw_requestline)
626 return result
627
628 requestHandler = RequestHandler
629 def setUp(self):
630 #clear request log
631 self.RequestHandler.myRequests = []
632 return BaseServerTestCase.setUp(self)
633
634#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
635#does indeed serve subsequent requests on the same connection
636class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
637 def test_two(self):
638 p = xmlrpclib.ServerProxy(URL)
639 #do three requests.
640 self.assertEqual(p.pow(6,8), 6**8)
641 self.assertEqual(p.pow(6,8), 6**8)
642 self.assertEqual(p.pow(6,8), 6**8)
643
644 #they should have all been handled by a single request handler
645 self.assertEqual(len(self.RequestHandler.myRequests), 1)
646
647 #check that we did at least two (the third may be pending append
648 #due to thread scheduling)
649 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
650
651#test special attribute access on the serverproxy, through the __call__
652#function.
653class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
654 #ask for two keepalive requests to be handled.
655 request_count=2
656
657 def test_close(self):
658 p = xmlrpclib.ServerProxy(URL)
659 #do some requests with close.
660 self.assertEqual(p.pow(6,8), 6**8)
661 self.assertEqual(p.pow(6,8), 6**8)
662 self.assertEqual(p.pow(6,8), 6**8)
663 p("close")() #this should trigger a new keep-alive request
664 self.assertEqual(p.pow(6,8), 6**8)
665 self.assertEqual(p.pow(6,8), 6**8)
666 self.assertEqual(p.pow(6,8), 6**8)
667
668 #they should have all been two request handlers, each having logged at least
669 #two complete requests
670 self.assertEqual(len(self.RequestHandler.myRequests), 2)
671 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
672 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
673
674 def test_transport(self):
675 p = xmlrpclib.ServerProxy(URL)
676 #do some requests with close.
677 self.assertEqual(p.pow(6,8), 6**8)
678 p("transport").close() #same as above, really.
679 self.assertEqual(p.pow(6,8), 6**8)
680 self.assertEqual(len(self.RequestHandler.myRequests), 2)
681
682#A test case that verifies that gzip encoding works in both directions
683#(for a request and the response)
684class GzipServerTestCase(BaseServerTestCase):
685 #a request handler that supports keep-alive and logs requests into a
686 #class variable
687 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
688 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
689 protocol_version = 'HTTP/1.1'
690
691 def do_POST(self):
692 #store content of last request in class
693 self.__class__.content_length = int(self.headers["content-length"])
694 return self.parentClass.do_POST(self)
695 requestHandler = RequestHandler
696
697 class Transport(xmlrpclib.Transport):
698 #custom transport, stores the response length for our perusal
699 fake_gzip = False
700 def parse_response(self, response):
701 self.response_length=int(response.getheader("content-length", 0))
702 return xmlrpclib.Transport.parse_response(self, response)
703
704 def send_content(self, connection, body):
705 if self.fake_gzip:
706 #add a lone gzip header to induce decode error remotely
707 connection.putheader("Content-Encoding", "gzip")
708 return xmlrpclib.Transport.send_content(self, connection, body)
709
710 def setUp(self):
711 BaseServerTestCase.setUp(self)
712
713 def test_gzip_request(self):
714 t = self.Transport()
715 t.encode_threshold = None
716 p = xmlrpclib.ServerProxy(URL, transport=t)
717 self.assertEqual(p.pow(6,8), 6**8)
718 a = self.RequestHandler.content_length
719 t.encode_threshold = 0 #turn on request encoding
720 self.assertEqual(p.pow(6,8), 6**8)
721 b = self.RequestHandler.content_length
722 self.assertTrue(a>b)
723
724 def test_bad_gzip_request(self):
725 t = self.Transport()
726 t.encode_threshold = None
727 t.fake_gzip = True
728 p = xmlrpclib.ServerProxy(URL, transport=t)
729 cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,
730 re.compile(r"\b400\b"))
731 with cm:
732 p.pow(6, 8)
733
734 def test_gsip_response(self):
735 t = self.Transport()
736 p = xmlrpclib.ServerProxy(URL, transport=t)
737 old = self.requestHandler.encode_threshold
738 self.requestHandler.encode_threshold = None #no encoding
739 self.assertEqual(p.pow(6,8), 6**8)
740 a = t.response_length
741 self.requestHandler.encode_threshold = 0 #always encode
742 self.assertEqual(p.pow(6,8), 6**8)
743 b = t.response_length
744 self.requestHandler.encode_threshold = old
745 self.assertTrue(a>b)
746
747#Test special attributes of the ServerProxy object
748class ServerProxyTestCase(unittest.TestCase):
749 def setUp(self):
750 unittest.TestCase.setUp(self)
751 if threading:
752 self.url = URL
753 else:
754 # Without threading, http_server() and http_multi_server() will not
755 # be executed and URL is still equal to None. 'http://' is a just
756 # enough to choose the scheme (HTTP)
757 self.url = 'http://'
758
759 def test_close(self):
760 p = xmlrpclib.ServerProxy(self.url)
761 self.assertEqual(p('close')(), None)
762
763 def test_transport(self):
764 t = xmlrpclib.Transport()
765 p = xmlrpclib.ServerProxy(self.url, transport=t)
766 self.assertEqual(p('transport'), t)
767
768# This is a contrived way to make a failure occur on the server side
769# in order to test the _send_traceback_header flag on the server
770class FailingMessageClass(mimetools.Message):
771 def __getitem__(self, key):
772 key = key.lower()
773 if key == 'content-length':
774 return 'I am broken'
775 return mimetools.Message.__getitem__(self, key)
776
777
778@unittest.skipUnless(threading, 'Threading required for this test.')
779class FailingServerTestCase(unittest.TestCase):
780 def setUp(self):
781 self.evt = threading.Event()
782 # start server thread to handle requests
783 serv_args = (self.evt, 1)
784 threading.Thread(target=http_server, args=serv_args).start()
785
786 # wait for the server to be ready
787 self.evt.wait()
788 self.evt.clear()
789
790 def tearDown(self):
791 # wait on the server thread to terminate
792 self.evt.wait()
793 # reset flag
794 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
795 # reset message class
796 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
797
798 def test_basic(self):
799 # check that flag is false by default
800 flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
801 self.assertEqual(flagval, False)
802
803 # enable traceback reporting
804 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
805
806 # test a call that shouldn't fail just as a smoke test
807 try:
808 p = xmlrpclib.ServerProxy(URL)
809 self.assertEqual(p.pow(6,8), 6**8)
810 except (xmlrpclib.ProtocolError, socket.error), e:
811 # ignore failures due to non-blocking socket 'unavailable' errors
812 if not is_unavailable_exception(e):
813 # protocol error; provide additional information in test output
814 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
815
816 def test_fail_no_info(self):
817 # use the broken message class
818 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
819
820 try:
821 p = xmlrpclib.ServerProxy(URL)
822 p.pow(6,8)
823 except (xmlrpclib.ProtocolError, socket.error), e:
824 # ignore failures due to non-blocking socket 'unavailable' errors
825 if not is_unavailable_exception(e) and hasattr(e, "headers"):
826 # The two server-side error headers shouldn't be sent back in this case
827 self.assertTrue(e.headers.get("X-exception") is None)
828 self.assertTrue(e.headers.get("X-traceback") is None)
829 else:
830 self.fail('ProtocolError not raised')
831
832 def test_fail_with_info(self):
833 # use the broken message class
834 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
835
836 # Check that errors in the server send back exception/traceback
837 # info when flag is set
838 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
839
840 try:
841 p = xmlrpclib.ServerProxy(URL)
842 p.pow(6,8)
843 except (xmlrpclib.ProtocolError, socket.error), e:
844 # ignore failures due to non-blocking socket 'unavailable' errors
845 if not is_unavailable_exception(e) and hasattr(e, "headers"):
846 # We should get error info in the response
847 expected_err = "invalid literal for int() with base 10: 'I am broken'"
848 self.assertEqual(e.headers.get("x-exception"), expected_err)
849 self.assertTrue(e.headers.get("x-traceback") is not None)
850 else:
851 self.fail('ProtocolError not raised')
852
853class CGIHandlerTestCase(unittest.TestCase):
854 def setUp(self):
855 self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
856
857 def tearDown(self):
858 self.cgi = None
859
860 def test_cgi_get(self):
861 with test_support.EnvironmentVarGuard() as env:
862 env['REQUEST_METHOD'] = 'GET'
863 # if the method is GET and no request_text is given, it runs handle_get
864 # get sysout output
865 with test_support.captured_stdout() as data_out:
866 self.cgi.handle_request()
867
868 # parse Status header
869 data_out.seek(0)
870 handle = data_out.read()
871 status = handle.split()[1]
872 message = ' '.join(handle.split()[2:4])
873
874 self.assertEqual(status, '400')
875 self.assertEqual(message, 'Bad Request')
876
877
878 def test_cgi_xmlrpc_response(self):
879 data = """<?xml version='1.0'?>
880 <methodCall>
881 <methodName>test_method</methodName>
882 <params>
883 <param>
884 <value><string>foo</string></value>
885 </param>
886 <param>
887 <value><string>bar</string></value>
888 </param>
889 </params>
890 </methodCall>
891 """
892
893 with test_support.EnvironmentVarGuard() as env, \
894 test_support.captured_stdout() as data_out, \
895 test_support.captured_stdin() as data_in:
896 data_in.write(data)
897 data_in.seek(0)
898 env['CONTENT_LENGTH'] = str(len(data))
899 self.cgi.handle_request()
900 data_out.seek(0)
901
902 # will respond exception, if so, our goal is achieved ;)
903 handle = data_out.read()
904
905 # start with 44th char so as not to get http header, we just need only xml
906 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
907
908 # Also test the content-length returned by handle_request
909 # Using the same test method inorder to avoid all the datapassing
910 # boilerplate code.
911 # Test for bug: http://bugs.python.org/issue5040
912
913 content = handle[handle.find("<?xml"):]
914
915 self.assertEqual(
916 int(re.search('Content-Length: (\d+)', handle).group(1)),
917 len(content))
918
919
920class FakeSocket:
921
922 def __init__(self):
923 self.data = StringIO.StringIO()
924
925 def send(self, buf):
926 self.data.write(buf)
927 return len(buf)
928
929 def sendall(self, buf):
930 self.data.write(buf)
931
932 def getvalue(self):
933 return self.data.getvalue()
934
935 def makefile(self, x='r', y=-1):
936 raise RuntimeError
937
938 def close(self):
939 pass
940
941class FakeTransport(xmlrpclib.Transport):
942 """A Transport instance that records instead of sending a request.
943
944 This class replaces the actual socket used by httplib with a
945 FakeSocket object that records the request. It doesn't provide a
946 response.
947 """
948
949 def make_connection(self, host):
950 conn = xmlrpclib.Transport.make_connection(self, host)
951 conn.sock = self.fake_socket = FakeSocket()
952 return conn
953
954class TransportSubclassTestCase(unittest.TestCase):
955
956 def issue_request(self, transport_class):
957 """Return an HTTP request made via transport_class."""
958 transport = transport_class()
959 proxy = xmlrpclib.ServerProxy("http://example.com/",
960 transport=transport)
961 try:
962 proxy.pow(6, 8)
963 except RuntimeError:
964 return transport.fake_socket.getvalue()
965 return None
966
967 def test_custom_user_agent(self):
968 class TestTransport(FakeTransport):
969
970 def send_user_agent(self, conn):
971 xmlrpclib.Transport.send_user_agent(self, conn)
972 conn.putheader("X-Test", "test_custom_user_agent")
973
974 req = self.issue_request(TestTransport)
975 self.assertIn("X-Test: test_custom_user_agent\r\n", req)
976
977 def test_send_host(self):
978 class TestTransport(FakeTransport):
979
980 def send_host(self, conn, host):
981 xmlrpclib.Transport.send_host(self, conn, host)
982 conn.putheader("X-Test", "test_send_host")
983
984 req = self.issue_request(TestTransport)
985 self.assertIn("X-Test: test_send_host\r\n", req)
986
987 def test_send_request(self):
988 class TestTransport(FakeTransport):
989
990 def send_request(self, conn, url, body):
991 xmlrpclib.Transport.send_request(self, conn, url, body)
992 conn.putheader("X-Test", "test_send_request")
993
994 req = self.issue_request(TestTransport)
995 self.assertIn("X-Test: test_send_request\r\n", req)
996
997 def test_send_content(self):
998 class TestTransport(FakeTransport):
999
1000 def send_content(self, conn, body):
1001 conn.putheader("X-Test", "test_send_content")
1002 xmlrpclib.Transport.send_content(self, conn, body)
1003
1004 req = self.issue_request(TestTransport)
1005 self.assertIn("X-Test: test_send_content\r\n", req)
1006
1007@test_support.reap_threads
1008def test_main():
1009 xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1010 BinaryTestCase, FaultTestCase, TransportSubclassTestCase]
1011 xmlrpc_tests.append(SimpleServerTestCase)
1012 xmlrpc_tests.append(KeepaliveServerTestCase1)
1013 xmlrpc_tests.append(KeepaliveServerTestCase2)
1014 try:
1015 import gzip
1016 xmlrpc_tests.append(GzipServerTestCase)
1017 except ImportError:
1018 pass #gzip not supported in this build
1019 xmlrpc_tests.append(MultiPathServerTestCase)
1020 xmlrpc_tests.append(ServerProxyTestCase)
1021 xmlrpc_tests.append(FailingServerTestCase)
1022 xmlrpc_tests.append(CGIHandlerTestCase)
1023
1024 test_support.run_unittest(*xmlrpc_tests)
1025
1026if __name__ == "__main__":
1027 test_main()
Note: See TracBrowser for help on using the repository browser.