Changeset 391 for python/trunk/Lib/test/test_telnetlib.py
- Timestamp:
- Mar 19, 2014, 11:31:01 PM (11 years ago)
- Location:
- python/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
python/trunk
-
Property svn:mergeinfo
set to
/python/vendor/Python-2.7.6 merged eligible /python/vendor/current merged eligible
-
Property svn:mergeinfo
set to
-
python/trunk/Lib/test/test_telnetlib.py
r2 r391 1 1 import socket 2 import threading3 2 import telnetlib 4 3 import time 5 4 import Queue 5 6 import unittest 6 7 from unittest import TestCase 7 8 from test import test_support 9 threading = test_support.import_module('threading') 8 10 9 11 HOST = test_support.HOST 10 11 def server(evt, serv): 12 EOF_sigil = object() 13 14 def server(evt, serv, dataq=None): 15 """ Open a tcp server in three steps 16 1) set evt to true to let the parent know we are ready 17 2) [optional] if is not False, write the list of data from dataq.get() 18 to the socket. 19 """ 12 20 serv.listen(5) 13 21 evt.set() 14 22 try: 15 23 conn, addr = serv.accept() 24 if dataq: 25 data = '' 26 new_data = dataq.get(True, 0.5) 27 dataq.task_done() 28 for item in new_data: 29 if item == EOF_sigil: 30 break 31 if type(item) in [int, float]: 32 time.sleep(item) 33 else: 34 data += item 35 written = conn.send(data) 36 data = data[written:] 37 conn.close() 16 38 except socket.timeout: 17 39 pass 18 40 finally: 19 41 serv.close() 20 evt.set()21 42 22 43 class GeneralTests(TestCase): … … 25 46 self.evt = threading.Event() 26 47 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 27 self.sock.settimeout( 3)48 self.sock.settimeout(60) # Safety net. Look issue 11812 28 49 self.port = test_support.bind_port(self.sock) 29 threading.Thread(target=server, args=(self.evt,self.sock)).start() 50 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 51 self.thread.setDaemon(True) 52 self.thread.start() 30 53 self.evt.wait() 31 self.evt.clear()32 time.sleep(.1)33 54 34 55 def tearDown(self): 35 self. evt.wait()56 self.thread.join() 36 57 37 58 def testBasic(self): … … 44 65 socket.setdefaulttimeout(30) 45 66 try: 46 telnet = telnetlib.Telnet( "localhost", self.port)67 telnet = telnetlib.Telnet(HOST, self.port) 47 68 finally: 48 69 socket.setdefaulttimeout(None) … … 62 83 63 84 def testTimeoutValue(self): 64 telnet = telnetlib.Telnet( "localhost", self.port, timeout=30)85 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 65 86 self.assertEqual(telnet.sock.gettimeout(), 30) 66 87 telnet.sock.close() … … 68 89 def testTimeoutOpen(self): 69 90 telnet = telnetlib.Telnet() 70 telnet.open( "localhost", self.port, timeout=30)91 telnet.open(HOST, self.port, timeout=30) 71 92 self.assertEqual(telnet.sock.gettimeout(), 30) 72 93 telnet.sock.close() 73 94 95 def testGetters(self): 96 # Test telnet getter methods 97 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 98 t_sock = telnet.sock 99 self.assertEqual(telnet.get_socket(), t_sock) 100 self.assertEqual(telnet.fileno(), t_sock.fileno()) 101 telnet.sock.close() 102 103 def _read_setUp(self): 104 self.evt = threading.Event() 105 self.dataq = Queue.Queue() 106 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 107 self.sock.settimeout(10) 108 self.port = test_support.bind_port(self.sock) 109 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) 110 self.thread.start() 111 self.evt.wait() 112 113 def _read_tearDown(self): 114 self.thread.join() 115 116 class ReadTests(TestCase): 117 setUp = _read_setUp 118 tearDown = _read_tearDown 119 120 # use a similar approach to testing timeouts as test_timeout.py 121 # these will never pass 100% but make the fuzz big enough that it is rare 122 block_long = 0.6 123 block_short = 0.3 124 def test_read_until_A(self): 125 """ 126 read_until(expected, [timeout]) 127 Read until the expected string has been seen, or a timeout is 128 hit (default is no timeout); may block. 129 """ 130 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 131 self.dataq.put(want) 132 telnet = telnetlib.Telnet(HOST, self.port) 133 self.dataq.join() 134 data = telnet.read_until('match') 135 self.assertEqual(data, ''.join(want[:-2])) 136 137 def test_read_until_B(self): 138 # test the timeout - it does NOT raise socket.timeout 139 want = ['hello', self.block_long, 'not seen', EOF_sigil] 140 self.dataq.put(want) 141 telnet = telnetlib.Telnet(HOST, self.port) 142 self.dataq.join() 143 data = telnet.read_until('not seen', self.block_short) 144 self.assertEqual(data, want[0]) 145 self.assertEqual(telnet.read_all(), 'not seen') 146 147 def test_read_until_with_poll(self): 148 """Use select.poll() to implement telnet.read_until().""" 149 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 150 self.dataq.put(want) 151 telnet = telnetlib.Telnet(HOST, self.port) 152 if not telnet._has_poll: 153 raise unittest.SkipTest('select.poll() is required') 154 telnet._has_poll = True 155 self.dataq.join() 156 data = telnet.read_until('match') 157 self.assertEqual(data, ''.join(want[:-2])) 158 159 def test_read_until_with_select(self): 160 """Use select.select() to implement telnet.read_until().""" 161 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 162 self.dataq.put(want) 163 telnet = telnetlib.Telnet(HOST, self.port) 164 telnet._has_poll = False 165 self.dataq.join() 166 data = telnet.read_until('match') 167 self.assertEqual(data, ''.join(want[:-2])) 168 169 def test_read_all_A(self): 170 """ 171 read_all() 172 Read all data until EOF; may block. 173 """ 174 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] 175 self.dataq.put(want) 176 telnet = telnetlib.Telnet(HOST, self.port) 177 self.dataq.join() 178 data = telnet.read_all() 179 self.assertEqual(data, ''.join(want[:-1])) 180 return 181 182 def _test_blocking(self, func): 183 self.dataq.put([self.block_long, EOF_sigil]) 184 self.dataq.join() 185 start = time.time() 186 data = func() 187 self.assertTrue(self.block_short <= time.time() - start) 188 189 def test_read_all_B(self): 190 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) 191 192 def test_read_all_C(self): 193 self.dataq.put([EOF_sigil]) 194 telnet = telnetlib.Telnet(HOST, self.port) 195 self.dataq.join() 196 telnet.read_all() 197 telnet.read_all() # shouldn't raise 198 199 def test_read_some_A(self): 200 """ 201 read_some() 202 Read at least one byte or EOF; may block. 203 """ 204 # test 'at least one byte' 205 want = ['x' * 500, EOF_sigil] 206 self.dataq.put(want) 207 telnet = telnetlib.Telnet(HOST, self.port) 208 self.dataq.join() 209 data = telnet.read_all() 210 self.assertTrue(len(data) >= 1) 211 212 def test_read_some_B(self): 213 # test EOF 214 self.dataq.put([EOF_sigil]) 215 telnet = telnetlib.Telnet(HOST, self.port) 216 self.dataq.join() 217 self.assertEqual('', telnet.read_some()) 218 219 def test_read_some_C(self): 220 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) 221 222 def _test_read_any_eager_A(self, func_name): 223 """ 224 read_very_eager() 225 Read all data available already queued or on the socket, 226 without blocking. 227 """ 228 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil] 229 expects = want[1] + want[2] 230 self.dataq.put(want) 231 telnet = telnetlib.Telnet(HOST, self.port) 232 self.dataq.join() 233 func = getattr(telnet, func_name) 234 data = '' 235 while True: 236 try: 237 data += func() 238 self.assertTrue(expects.startswith(data)) 239 except EOFError: 240 break 241 self.assertEqual(expects, data) 242 243 def _test_read_any_eager_B(self, func_name): 244 # test EOF 245 self.dataq.put([EOF_sigil]) 246 telnet = telnetlib.Telnet(HOST, self.port) 247 self.dataq.join() 248 time.sleep(self.block_short) 249 func = getattr(telnet, func_name) 250 self.assertRaises(EOFError, func) 251 252 # read_eager and read_very_eager make the same gaurantees 253 # (they behave differently but we only test the gaurantees) 254 def test_read_very_eager_A(self): 255 self._test_read_any_eager_A('read_very_eager') 256 def test_read_very_eager_B(self): 257 self._test_read_any_eager_B('read_very_eager') 258 def test_read_eager_A(self): 259 self._test_read_any_eager_A('read_eager') 260 def test_read_eager_B(self): 261 self._test_read_any_eager_B('read_eager') 262 # NB -- we need to test the IAC block which is mentioned in the docstring 263 # but not in the module docs 264 265 def _test_read_any_lazy_B(self, func_name): 266 self.dataq.put([EOF_sigil]) 267 telnet = telnetlib.Telnet(HOST, self.port) 268 self.dataq.join() 269 func = getattr(telnet, func_name) 270 telnet.fill_rawq() 271 self.assertRaises(EOFError, func) 272 273 def test_read_lazy_A(self): 274 want = ['x' * 100, EOF_sigil] 275 self.dataq.put(want) 276 telnet = telnetlib.Telnet(HOST, self.port) 277 self.dataq.join() 278 time.sleep(self.block_short) 279 self.assertEqual('', telnet.read_lazy()) 280 data = '' 281 while True: 282 try: 283 read_data = telnet.read_lazy() 284 data += read_data 285 if not read_data: 286 telnet.fill_rawq() 287 except EOFError: 288 break 289 self.assertTrue(want[0].startswith(data)) 290 self.assertEqual(data, want[0]) 291 292 def test_read_lazy_B(self): 293 self._test_read_any_lazy_B('read_lazy') 294 295 def test_read_very_lazy_A(self): 296 want = ['x' * 100, EOF_sigil] 297 self.dataq.put(want) 298 telnet = telnetlib.Telnet(HOST, self.port) 299 self.dataq.join() 300 time.sleep(self.block_short) 301 self.assertEqual('', telnet.read_very_lazy()) 302 data = '' 303 while True: 304 try: 305 read_data = telnet.read_very_lazy() 306 except EOFError: 307 break 308 data += read_data 309 if not read_data: 310 telnet.fill_rawq() 311 self.assertEqual('', telnet.cookedq) 312 telnet.process_rawq() 313 self.assertTrue(want[0].startswith(data)) 314 self.assertEqual(data, want[0]) 315 316 def test_read_very_lazy_B(self): 317 self._test_read_any_lazy_B('read_very_lazy') 318 319 class nego_collector(object): 320 def __init__(self, sb_getter=None): 321 self.seen = '' 322 self.sb_getter = sb_getter 323 self.sb_seen = '' 324 325 def do_nego(self, sock, cmd, opt): 326 self.seen += cmd + opt 327 if cmd == tl.SE and self.sb_getter: 328 sb_data = self.sb_getter() 329 self.sb_seen += sb_data 330 331 tl = telnetlib 332 class OptionTests(TestCase): 333 setUp = _read_setUp 334 tearDown = _read_tearDown 335 # RFC 854 commands 336 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 337 338 def _test_command(self, data): 339 """ helper for testing IAC + cmd """ 340 self.setUp() 341 self.dataq.put(data) 342 telnet = telnetlib.Telnet(HOST, self.port) 343 self.dataq.join() 344 nego = nego_collector() 345 telnet.set_option_negotiation_callback(nego.do_nego) 346 txt = telnet.read_all() 347 cmd = nego.seen 348 self.assertTrue(len(cmd) > 0) # we expect at least one command 349 self.assertIn(cmd[0], self.cmds) 350 self.assertEqual(cmd[1], tl.NOOPT) 351 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd)) 352 nego.sb_getter = None # break the nego => telnet cycle 353 self.tearDown() 354 355 def test_IAC_commands(self): 356 # reset our setup 357 self.dataq.put([EOF_sigil]) 358 telnet = telnetlib.Telnet(HOST, self.port) 359 self.dataq.join() 360 self.tearDown() 361 362 for cmd in self.cmds: 363 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil]) 364 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil]) 365 self._test_command([tl.IAC + cmd, EOF_sigil]) 366 # all at once 367 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) 368 self.assertEqual('', telnet.read_sb_data()) 369 370 def test_SB_commands(self): 371 # RFC 855, subnegotiations portion 372 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 373 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 374 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE, 375 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 376 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, 377 EOF_sigil, 378 ] 379 self.dataq.put(send) 380 telnet = telnetlib.Telnet(HOST, self.port) 381 self.dataq.join() 382 nego = nego_collector(telnet.read_sb_data) 383 telnet.set_option_negotiation_callback(nego.do_nego) 384 txt = telnet.read_all() 385 self.assertEqual(txt, '') 386 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' 387 self.assertEqual(nego.sb_seen, want_sb_data) 388 self.assertEqual('', telnet.read_sb_data()) 389 nego.sb_getter = None # break the nego => telnet cycle 390 391 392 class ExpectTests(TestCase): 393 def setUp(self): 394 self.evt = threading.Event() 395 self.dataq = Queue.Queue() 396 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 397 self.sock.settimeout(10) 398 self.port = test_support.bind_port(self.sock) 399 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, 400 self.dataq)) 401 self.thread.start() 402 self.evt.wait() 403 404 def tearDown(self): 405 self.thread.join() 406 407 # use a similar approach to testing timeouts as test_timeout.py 408 # these will never pass 100% but make the fuzz big enough that it is rare 409 block_long = 0.6 410 block_short = 0.3 411 def test_expect_A(self): 412 """ 413 expect(expected, [timeout]) 414 Read until the expected string has been seen, or a timeout is 415 hit (default is no timeout); may block. 416 """ 417 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 418 self.dataq.put(want) 419 telnet = telnetlib.Telnet(HOST, self.port) 420 self.dataq.join() 421 (_,_,data) = telnet.expect(['match']) 422 self.assertEqual(data, ''.join(want[:-2])) 423 424 def test_expect_B(self): 425 # test the timeout - it does NOT raise socket.timeout 426 want = ['hello', self.block_long, 'not seen', EOF_sigil] 427 self.dataq.put(want) 428 telnet = telnetlib.Telnet(HOST, self.port) 429 self.dataq.join() 430 (_,_,data) = telnet.expect(['not seen'], self.block_short) 431 self.assertEqual(data, want[0]) 432 self.assertEqual(telnet.read_all(), 'not seen') 433 434 def test_expect_with_poll(self): 435 """Use select.poll() to implement telnet.expect().""" 436 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 437 self.dataq.put(want) 438 telnet = telnetlib.Telnet(HOST, self.port) 439 if not telnet._has_poll: 440 raise unittest.SkipTest('select.poll() is required') 441 telnet._has_poll = True 442 self.dataq.join() 443 (_,_,data) = telnet.expect(['match']) 444 self.assertEqual(data, ''.join(want[:-2])) 445 446 def test_expect_with_select(self): 447 """Use select.select() to implement telnet.expect().""" 448 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 449 self.dataq.put(want) 450 telnet = telnetlib.Telnet(HOST, self.port) 451 telnet._has_poll = False 452 self.dataq.join() 453 (_,_,data) = telnet.expect(['match']) 454 self.assertEqual(data, ''.join(want[:-2])) 455 74 456 75 457 def test_main(verbose=None): 76 test_support.run_unittest(GeneralTests) 458 test_support.run_unittest(GeneralTests, ReadTests, OptionTests, 459 ExpectTests) 77 460 78 461 if __name__ == '__main__':
Note:
See TracChangeset
for help on using the changeset viewer.