source: python/trunk/Lib/test/test_telnetlib.py

Last change on this file was 391, checked in by dmik, 11 years ago

python: Merge vendor 2.7.6 to trunk.

  • Property svn:eol-style set to native
File size: 15.7 KB
Line 
1import socket
2import telnetlib
3import time
4import Queue
5
6import unittest
7from unittest import TestCase
8from test import test_support
9threading = test_support.import_module('threading')
10
11HOST = test_support.HOST
12EOF_sigil = object()
13
14def server(evt, serv, dataq=None):
15 """ Open a tcp server in three steps
16 1) set evt to true to let the parent know we are ready
17 2) [optional] if is not False, write the list of data from dataq.get()
18 to the socket.
19 """
20 serv.listen(5)
21 evt.set()
22 try:
23 conn, addr = serv.accept()
24 if dataq:
25 data = ''
26 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
30 break
31 if type(item) in [int, float]:
32 time.sleep(item)
33 else:
34 data += item
35 written = conn.send(data)
36 data = data[written:]
37 conn.close()
38 except socket.timeout:
39 pass
40 finally:
41 serv.close()
42
43class GeneralTests(TestCase):
44
45 def setUp(self):
46 self.evt = threading.Event()
47 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 self.sock.settimeout(60) # Safety net. Look issue 11812
49 self.port = test_support.bind_port(self.sock)
50 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
51 self.thread.setDaemon(True)
52 self.thread.start()
53 self.evt.wait()
54
55 def tearDown(self):
56 self.thread.join()
57
58 def testBasic(self):
59 # connects
60 telnet = telnetlib.Telnet(HOST, self.port)
61 telnet.sock.close()
62
63 def testTimeoutDefault(self):
64 self.assertTrue(socket.getdefaulttimeout() is None)
65 socket.setdefaulttimeout(30)
66 try:
67 telnet = telnetlib.Telnet(HOST, self.port)
68 finally:
69 socket.setdefaulttimeout(None)
70 self.assertEqual(telnet.sock.gettimeout(), 30)
71 telnet.sock.close()
72
73 def testTimeoutNone(self):
74 # None, having other default
75 self.assertTrue(socket.getdefaulttimeout() is None)
76 socket.setdefaulttimeout(30)
77 try:
78 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
79 finally:
80 socket.setdefaulttimeout(None)
81 self.assertTrue(telnet.sock.gettimeout() is None)
82 telnet.sock.close()
83
84 def testTimeoutValue(self):
85 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
86 self.assertEqual(telnet.sock.gettimeout(), 30)
87 telnet.sock.close()
88
89 def testTimeoutOpen(self):
90 telnet = telnetlib.Telnet()
91 telnet.open(HOST, self.port, timeout=30)
92 self.assertEqual(telnet.sock.gettimeout(), 30)
93 telnet.sock.close()
94
95 def testGetters(self):
96 # Test telnet getter methods
97 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
98 t_sock = telnet.sock
99 self.assertEqual(telnet.get_socket(), t_sock)
100 self.assertEqual(telnet.fileno(), t_sock.fileno())
101 telnet.sock.close()
102
103def _read_setUp(self):
104 self.evt = threading.Event()
105 self.dataq = Queue.Queue()
106 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107 self.sock.settimeout(10)
108 self.port = test_support.bind_port(self.sock)
109 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
110 self.thread.start()
111 self.evt.wait()
112
113def _read_tearDown(self):
114 self.thread.join()
115
116class ReadTests(TestCase):
117 setUp = _read_setUp
118 tearDown = _read_tearDown
119
120 # use a similar approach to testing timeouts as test_timeout.py
121 # these will never pass 100% but make the fuzz big enough that it is rare
122 block_long = 0.6
123 block_short = 0.3
124 def test_read_until_A(self):
125 """
126 read_until(expected, [timeout])
127 Read until the expected string has been seen, or a timeout is
128 hit (default is no timeout); may block.
129 """
130 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
131 self.dataq.put(want)
132 telnet = telnetlib.Telnet(HOST, self.port)
133 self.dataq.join()
134 data = telnet.read_until('match')
135 self.assertEqual(data, ''.join(want[:-2]))
136
137 def test_read_until_B(self):
138 # test the timeout - it does NOT raise socket.timeout
139 want = ['hello', self.block_long, 'not seen', EOF_sigil]
140 self.dataq.put(want)
141 telnet = telnetlib.Telnet(HOST, self.port)
142 self.dataq.join()
143 data = telnet.read_until('not seen', self.block_short)
144 self.assertEqual(data, want[0])
145 self.assertEqual(telnet.read_all(), 'not seen')
146
147 def test_read_until_with_poll(self):
148 """Use select.poll() to implement telnet.read_until()."""
149 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
150 self.dataq.put(want)
151 telnet = telnetlib.Telnet(HOST, self.port)
152 if not telnet._has_poll:
153 raise unittest.SkipTest('select.poll() is required')
154 telnet._has_poll = True
155 self.dataq.join()
156 data = telnet.read_until('match')
157 self.assertEqual(data, ''.join(want[:-2]))
158
159 def test_read_until_with_select(self):
160 """Use select.select() to implement telnet.read_until()."""
161 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
162 self.dataq.put(want)
163 telnet = telnetlib.Telnet(HOST, self.port)
164 telnet._has_poll = False
165 self.dataq.join()
166 data = telnet.read_until('match')
167 self.assertEqual(data, ''.join(want[:-2]))
168
169 def test_read_all_A(self):
170 """
171 read_all()
172 Read all data until EOF; may block.
173 """
174 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
175 self.dataq.put(want)
176 telnet = telnetlib.Telnet(HOST, self.port)
177 self.dataq.join()
178 data = telnet.read_all()
179 self.assertEqual(data, ''.join(want[:-1]))
180 return
181
182 def _test_blocking(self, func):
183 self.dataq.put([self.block_long, EOF_sigil])
184 self.dataq.join()
185 start = time.time()
186 data = func()
187 self.assertTrue(self.block_short <= time.time() - start)
188
189 def test_read_all_B(self):
190 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
191
192 def test_read_all_C(self):
193 self.dataq.put([EOF_sigil])
194 telnet = telnetlib.Telnet(HOST, self.port)
195 self.dataq.join()
196 telnet.read_all()
197 telnet.read_all() # shouldn't raise
198
199 def test_read_some_A(self):
200 """
201 read_some()
202 Read at least one byte or EOF; may block.
203 """
204 # test 'at least one byte'
205 want = ['x' * 500, EOF_sigil]
206 self.dataq.put(want)
207 telnet = telnetlib.Telnet(HOST, self.port)
208 self.dataq.join()
209 data = telnet.read_all()
210 self.assertTrue(len(data) >= 1)
211
212 def test_read_some_B(self):
213 # test EOF
214 self.dataq.put([EOF_sigil])
215 telnet = telnetlib.Telnet(HOST, self.port)
216 self.dataq.join()
217 self.assertEqual('', telnet.read_some())
218
219 def test_read_some_C(self):
220 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
221
222 def _test_read_any_eager_A(self, func_name):
223 """
224 read_very_eager()
225 Read all data available already queued or on the socket,
226 without blocking.
227 """
228 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
229 expects = want[1] + want[2]
230 self.dataq.put(want)
231 telnet = telnetlib.Telnet(HOST, self.port)
232 self.dataq.join()
233 func = getattr(telnet, func_name)
234 data = ''
235 while True:
236 try:
237 data += func()
238 self.assertTrue(expects.startswith(data))
239 except EOFError:
240 break
241 self.assertEqual(expects, data)
242
243 def _test_read_any_eager_B(self, func_name):
244 # test EOF
245 self.dataq.put([EOF_sigil])
246 telnet = telnetlib.Telnet(HOST, self.port)
247 self.dataq.join()
248 time.sleep(self.block_short)
249 func = getattr(telnet, func_name)
250 self.assertRaises(EOFError, func)
251
252 # read_eager and read_very_eager make the same gaurantees
253 # (they behave differently but we only test the gaurantees)
254 def test_read_very_eager_A(self):
255 self._test_read_any_eager_A('read_very_eager')
256 def test_read_very_eager_B(self):
257 self._test_read_any_eager_B('read_very_eager')
258 def test_read_eager_A(self):
259 self._test_read_any_eager_A('read_eager')
260 def test_read_eager_B(self):
261 self._test_read_any_eager_B('read_eager')
262 # NB -- we need to test the IAC block which is mentioned in the docstring
263 # but not in the module docs
264
265 def _test_read_any_lazy_B(self, func_name):
266 self.dataq.put([EOF_sigil])
267 telnet = telnetlib.Telnet(HOST, self.port)
268 self.dataq.join()
269 func = getattr(telnet, func_name)
270 telnet.fill_rawq()
271 self.assertRaises(EOFError, func)
272
273 def test_read_lazy_A(self):
274 want = ['x' * 100, EOF_sigil]
275 self.dataq.put(want)
276 telnet = telnetlib.Telnet(HOST, self.port)
277 self.dataq.join()
278 time.sleep(self.block_short)
279 self.assertEqual('', telnet.read_lazy())
280 data = ''
281 while True:
282 try:
283 read_data = telnet.read_lazy()
284 data += read_data
285 if not read_data:
286 telnet.fill_rawq()
287 except EOFError:
288 break
289 self.assertTrue(want[0].startswith(data))
290 self.assertEqual(data, want[0])
291
292 def test_read_lazy_B(self):
293 self._test_read_any_lazy_B('read_lazy')
294
295 def test_read_very_lazy_A(self):
296 want = ['x' * 100, EOF_sigil]
297 self.dataq.put(want)
298 telnet = telnetlib.Telnet(HOST, self.port)
299 self.dataq.join()
300 time.sleep(self.block_short)
301 self.assertEqual('', telnet.read_very_lazy())
302 data = ''
303 while True:
304 try:
305 read_data = telnet.read_very_lazy()
306 except EOFError:
307 break
308 data += read_data
309 if not read_data:
310 telnet.fill_rawq()
311 self.assertEqual('', telnet.cookedq)
312 telnet.process_rawq()
313 self.assertTrue(want[0].startswith(data))
314 self.assertEqual(data, want[0])
315
316 def test_read_very_lazy_B(self):
317 self._test_read_any_lazy_B('read_very_lazy')
318
319class nego_collector(object):
320 def __init__(self, sb_getter=None):
321 self.seen = ''
322 self.sb_getter = sb_getter
323 self.sb_seen = ''
324
325 def do_nego(self, sock, cmd, opt):
326 self.seen += cmd + opt
327 if cmd == tl.SE and self.sb_getter:
328 sb_data = self.sb_getter()
329 self.sb_seen += sb_data
330
331tl = telnetlib
332class OptionTests(TestCase):
333 setUp = _read_setUp
334 tearDown = _read_tearDown
335 # RFC 854 commands
336 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
337
338 def _test_command(self, data):
339 """ helper for testing IAC + cmd """
340 self.setUp()
341 self.dataq.put(data)
342 telnet = telnetlib.Telnet(HOST, self.port)
343 self.dataq.join()
344 nego = nego_collector()
345 telnet.set_option_negotiation_callback(nego.do_nego)
346 txt = telnet.read_all()
347 cmd = nego.seen
348 self.assertTrue(len(cmd) > 0) # we expect at least one command
349 self.assertIn(cmd[0], self.cmds)
350 self.assertEqual(cmd[1], tl.NOOPT)
351 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
352 nego.sb_getter = None # break the nego => telnet cycle
353 self.tearDown()
354
355 def test_IAC_commands(self):
356 # reset our setup
357 self.dataq.put([EOF_sigil])
358 telnet = telnetlib.Telnet(HOST, self.port)
359 self.dataq.join()
360 self.tearDown()
361
362 for cmd in self.cmds:
363 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
364 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
365 self._test_command([tl.IAC + cmd, EOF_sigil])
366 # all at once
367 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
368 self.assertEqual('', telnet.read_sb_data())
369
370 def test_SB_commands(self):
371 # RFC 855, subnegotiations portion
372 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
373 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
374 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
375 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
376 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
377 EOF_sigil,
378 ]
379 self.dataq.put(send)
380 telnet = telnetlib.Telnet(HOST, self.port)
381 self.dataq.join()
382 nego = nego_collector(telnet.read_sb_data)
383 telnet.set_option_negotiation_callback(nego.do_nego)
384 txt = telnet.read_all()
385 self.assertEqual(txt, '')
386 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
387 self.assertEqual(nego.sb_seen, want_sb_data)
388 self.assertEqual('', telnet.read_sb_data())
389 nego.sb_getter = None # break the nego => telnet cycle
390
391
392class ExpectTests(TestCase):
393 def setUp(self):
394 self.evt = threading.Event()
395 self.dataq = Queue.Queue()
396 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
397 self.sock.settimeout(10)
398 self.port = test_support.bind_port(self.sock)
399 self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
400 self.dataq))
401 self.thread.start()
402 self.evt.wait()
403
404 def tearDown(self):
405 self.thread.join()
406
407 # use a similar approach to testing timeouts as test_timeout.py
408 # these will never pass 100% but make the fuzz big enough that it is rare
409 block_long = 0.6
410 block_short = 0.3
411 def test_expect_A(self):
412 """
413 expect(expected, [timeout])
414 Read until the expected string has been seen, or a timeout is
415 hit (default is no timeout); may block.
416 """
417 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
418 self.dataq.put(want)
419 telnet = telnetlib.Telnet(HOST, self.port)
420 self.dataq.join()
421 (_,_,data) = telnet.expect(['match'])
422 self.assertEqual(data, ''.join(want[:-2]))
423
424 def test_expect_B(self):
425 # test the timeout - it does NOT raise socket.timeout
426 want = ['hello', self.block_long, 'not seen', EOF_sigil]
427 self.dataq.put(want)
428 telnet = telnetlib.Telnet(HOST, self.port)
429 self.dataq.join()
430 (_,_,data) = telnet.expect(['not seen'], self.block_short)
431 self.assertEqual(data, want[0])
432 self.assertEqual(telnet.read_all(), 'not seen')
433
434 def test_expect_with_poll(self):
435 """Use select.poll() to implement telnet.expect()."""
436 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
437 self.dataq.put(want)
438 telnet = telnetlib.Telnet(HOST, self.port)
439 if not telnet._has_poll:
440 raise unittest.SkipTest('select.poll() is required')
441 telnet._has_poll = True
442 self.dataq.join()
443 (_,_,data) = telnet.expect(['match'])
444 self.assertEqual(data, ''.join(want[:-2]))
445
446 def test_expect_with_select(self):
447 """Use select.select() to implement telnet.expect()."""
448 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
449 self.dataq.put(want)
450 telnet = telnetlib.Telnet(HOST, self.port)
451 telnet._has_poll = False
452 self.dataq.join()
453 (_,_,data) = telnet.expect(['match'])
454 self.assertEqual(data, ''.join(want[:-2]))
455
456
457def test_main(verbose=None):
458 test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
459 ExpectTests)
460
461if __name__ == '__main__':
462 test_main()
Note: See TracBrowser for help on using the repository browser.