1 | import socket
|
---|
2 | import telnetlib
|
---|
3 | import time
|
---|
4 | import Queue
|
---|
5 |
|
---|
6 | import unittest
|
---|
7 | from unittest import TestCase
|
---|
8 | from test import test_support
|
---|
9 | threading = test_support.import_module('threading')
|
---|
10 |
|
---|
11 | HOST = test_support.HOST
|
---|
12 | EOF_sigil = object()
|
---|
13 |
|
---|
14 | def server(evt, serv, dataq=None):
|
---|
15 | """ Open a tcp server in three steps
|
---|
16 | 1) set evt to true to let the parent know we are ready
|
---|
17 | 2) [optional] if is not False, write the list of data from dataq.get()
|
---|
18 | to the socket.
|
---|
19 | """
|
---|
20 | serv.listen(5)
|
---|
21 | evt.set()
|
---|
22 | try:
|
---|
23 | conn, addr = serv.accept()
|
---|
24 | if dataq:
|
---|
25 | data = ''
|
---|
26 | new_data = dataq.get(True, 0.5)
|
---|
27 | dataq.task_done()
|
---|
28 | for item in new_data:
|
---|
29 | if item == EOF_sigil:
|
---|
30 | break
|
---|
31 | if type(item) in [int, float]:
|
---|
32 | time.sleep(item)
|
---|
33 | else:
|
---|
34 | data += item
|
---|
35 | written = conn.send(data)
|
---|
36 | data = data[written:]
|
---|
37 | conn.close()
|
---|
38 | except socket.timeout:
|
---|
39 | pass
|
---|
40 | finally:
|
---|
41 | serv.close()
|
---|
42 |
|
---|
43 | class GeneralTests(TestCase):
|
---|
44 |
|
---|
45 | def setUp(self):
|
---|
46 | self.evt = threading.Event()
|
---|
47 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
48 | self.sock.settimeout(60) # Safety net. Look issue 11812
|
---|
49 | self.port = test_support.bind_port(self.sock)
|
---|
50 | self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
|
---|
51 | self.thread.setDaemon(True)
|
---|
52 | self.thread.start()
|
---|
53 | self.evt.wait()
|
---|
54 |
|
---|
55 | def tearDown(self):
|
---|
56 | self.thread.join()
|
---|
57 |
|
---|
58 | def testBasic(self):
|
---|
59 | # connects
|
---|
60 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
61 | telnet.sock.close()
|
---|
62 |
|
---|
63 | def testTimeoutDefault(self):
|
---|
64 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
65 | socket.setdefaulttimeout(30)
|
---|
66 | try:
|
---|
67 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
68 | finally:
|
---|
69 | socket.setdefaulttimeout(None)
|
---|
70 | self.assertEqual(telnet.sock.gettimeout(), 30)
|
---|
71 | telnet.sock.close()
|
---|
72 |
|
---|
73 | def testTimeoutNone(self):
|
---|
74 | # None, having other default
|
---|
75 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
76 | socket.setdefaulttimeout(30)
|
---|
77 | try:
|
---|
78 | telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
|
---|
79 | finally:
|
---|
80 | socket.setdefaulttimeout(None)
|
---|
81 | self.assertTrue(telnet.sock.gettimeout() is None)
|
---|
82 | telnet.sock.close()
|
---|
83 |
|
---|
84 | def testTimeoutValue(self):
|
---|
85 | telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
|
---|
86 | self.assertEqual(telnet.sock.gettimeout(), 30)
|
---|
87 | telnet.sock.close()
|
---|
88 |
|
---|
89 | def testTimeoutOpen(self):
|
---|
90 | telnet = telnetlib.Telnet()
|
---|
91 | telnet.open(HOST, self.port, timeout=30)
|
---|
92 | self.assertEqual(telnet.sock.gettimeout(), 30)
|
---|
93 | telnet.sock.close()
|
---|
94 |
|
---|
95 | def testGetters(self):
|
---|
96 | # Test telnet getter methods
|
---|
97 | telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
|
---|
98 | t_sock = telnet.sock
|
---|
99 | self.assertEqual(telnet.get_socket(), t_sock)
|
---|
100 | self.assertEqual(telnet.fileno(), t_sock.fileno())
|
---|
101 | telnet.sock.close()
|
---|
102 |
|
---|
103 | def _read_setUp(self):
|
---|
104 | self.evt = threading.Event()
|
---|
105 | self.dataq = Queue.Queue()
|
---|
106 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
107 | self.sock.settimeout(10)
|
---|
108 | self.port = test_support.bind_port(self.sock)
|
---|
109 | self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
|
---|
110 | self.thread.start()
|
---|
111 | self.evt.wait()
|
---|
112 |
|
---|
113 | def _read_tearDown(self):
|
---|
114 | self.thread.join()
|
---|
115 |
|
---|
116 | class ReadTests(TestCase):
|
---|
117 | setUp = _read_setUp
|
---|
118 | tearDown = _read_tearDown
|
---|
119 |
|
---|
120 | # use a similar approach to testing timeouts as test_timeout.py
|
---|
121 | # these will never pass 100% but make the fuzz big enough that it is rare
|
---|
122 | block_long = 0.6
|
---|
123 | block_short = 0.3
|
---|
124 | def test_read_until_A(self):
|
---|
125 | """
|
---|
126 | read_until(expected, [timeout])
|
---|
127 | Read until the expected string has been seen, or a timeout is
|
---|
128 | hit (default is no timeout); may block.
|
---|
129 | """
|
---|
130 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
131 | self.dataq.put(want)
|
---|
132 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
133 | self.dataq.join()
|
---|
134 | data = telnet.read_until('match')
|
---|
135 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
136 |
|
---|
137 | def test_read_until_B(self):
|
---|
138 | # test the timeout - it does NOT raise socket.timeout
|
---|
139 | want = ['hello', self.block_long, 'not seen', EOF_sigil]
|
---|
140 | self.dataq.put(want)
|
---|
141 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
142 | self.dataq.join()
|
---|
143 | data = telnet.read_until('not seen', self.block_short)
|
---|
144 | self.assertEqual(data, want[0])
|
---|
145 | self.assertEqual(telnet.read_all(), 'not seen')
|
---|
146 |
|
---|
147 | def test_read_until_with_poll(self):
|
---|
148 | """Use select.poll() to implement telnet.read_until()."""
|
---|
149 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
150 | self.dataq.put(want)
|
---|
151 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
152 | if not telnet._has_poll:
|
---|
153 | raise unittest.SkipTest('select.poll() is required')
|
---|
154 | telnet._has_poll = True
|
---|
155 | self.dataq.join()
|
---|
156 | data = telnet.read_until('match')
|
---|
157 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
158 |
|
---|
159 | def test_read_until_with_select(self):
|
---|
160 | """Use select.select() to implement telnet.read_until()."""
|
---|
161 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
162 | self.dataq.put(want)
|
---|
163 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
164 | telnet._has_poll = False
|
---|
165 | self.dataq.join()
|
---|
166 | data = telnet.read_until('match')
|
---|
167 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
168 |
|
---|
169 | def test_read_all_A(self):
|
---|
170 | """
|
---|
171 | read_all()
|
---|
172 | Read all data until EOF; may block.
|
---|
173 | """
|
---|
174 | want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
|
---|
175 | self.dataq.put(want)
|
---|
176 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
177 | self.dataq.join()
|
---|
178 | data = telnet.read_all()
|
---|
179 | self.assertEqual(data, ''.join(want[:-1]))
|
---|
180 | return
|
---|
181 |
|
---|
182 | def _test_blocking(self, func):
|
---|
183 | self.dataq.put([self.block_long, EOF_sigil])
|
---|
184 | self.dataq.join()
|
---|
185 | start = time.time()
|
---|
186 | data = func()
|
---|
187 | self.assertTrue(self.block_short <= time.time() - start)
|
---|
188 |
|
---|
189 | def test_read_all_B(self):
|
---|
190 | self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
|
---|
191 |
|
---|
192 | def test_read_all_C(self):
|
---|
193 | self.dataq.put([EOF_sigil])
|
---|
194 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
195 | self.dataq.join()
|
---|
196 | telnet.read_all()
|
---|
197 | telnet.read_all() # shouldn't raise
|
---|
198 |
|
---|
199 | def test_read_some_A(self):
|
---|
200 | """
|
---|
201 | read_some()
|
---|
202 | Read at least one byte or EOF; may block.
|
---|
203 | """
|
---|
204 | # test 'at least one byte'
|
---|
205 | want = ['x' * 500, EOF_sigil]
|
---|
206 | self.dataq.put(want)
|
---|
207 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
208 | self.dataq.join()
|
---|
209 | data = telnet.read_all()
|
---|
210 | self.assertTrue(len(data) >= 1)
|
---|
211 |
|
---|
212 | def test_read_some_B(self):
|
---|
213 | # test EOF
|
---|
214 | self.dataq.put([EOF_sigil])
|
---|
215 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
216 | self.dataq.join()
|
---|
217 | self.assertEqual('', telnet.read_some())
|
---|
218 |
|
---|
219 | def test_read_some_C(self):
|
---|
220 | self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
|
---|
221 |
|
---|
222 | def _test_read_any_eager_A(self, func_name):
|
---|
223 | """
|
---|
224 | read_very_eager()
|
---|
225 | Read all data available already queued or on the socket,
|
---|
226 | without blocking.
|
---|
227 | """
|
---|
228 | want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
|
---|
229 | expects = want[1] + want[2]
|
---|
230 | self.dataq.put(want)
|
---|
231 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
232 | self.dataq.join()
|
---|
233 | func = getattr(telnet, func_name)
|
---|
234 | data = ''
|
---|
235 | while True:
|
---|
236 | try:
|
---|
237 | data += func()
|
---|
238 | self.assertTrue(expects.startswith(data))
|
---|
239 | except EOFError:
|
---|
240 | break
|
---|
241 | self.assertEqual(expects, data)
|
---|
242 |
|
---|
243 | def _test_read_any_eager_B(self, func_name):
|
---|
244 | # test EOF
|
---|
245 | self.dataq.put([EOF_sigil])
|
---|
246 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
247 | self.dataq.join()
|
---|
248 | time.sleep(self.block_short)
|
---|
249 | func = getattr(telnet, func_name)
|
---|
250 | self.assertRaises(EOFError, func)
|
---|
251 |
|
---|
252 | # read_eager and read_very_eager make the same gaurantees
|
---|
253 | # (they behave differently but we only test the gaurantees)
|
---|
254 | def test_read_very_eager_A(self):
|
---|
255 | self._test_read_any_eager_A('read_very_eager')
|
---|
256 | def test_read_very_eager_B(self):
|
---|
257 | self._test_read_any_eager_B('read_very_eager')
|
---|
258 | def test_read_eager_A(self):
|
---|
259 | self._test_read_any_eager_A('read_eager')
|
---|
260 | def test_read_eager_B(self):
|
---|
261 | self._test_read_any_eager_B('read_eager')
|
---|
262 | # NB -- we need to test the IAC block which is mentioned in the docstring
|
---|
263 | # but not in the module docs
|
---|
264 |
|
---|
265 | def _test_read_any_lazy_B(self, func_name):
|
---|
266 | self.dataq.put([EOF_sigil])
|
---|
267 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
268 | self.dataq.join()
|
---|
269 | func = getattr(telnet, func_name)
|
---|
270 | telnet.fill_rawq()
|
---|
271 | self.assertRaises(EOFError, func)
|
---|
272 |
|
---|
273 | def test_read_lazy_A(self):
|
---|
274 | want = ['x' * 100, EOF_sigil]
|
---|
275 | self.dataq.put(want)
|
---|
276 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
277 | self.dataq.join()
|
---|
278 | time.sleep(self.block_short)
|
---|
279 | self.assertEqual('', telnet.read_lazy())
|
---|
280 | data = ''
|
---|
281 | while True:
|
---|
282 | try:
|
---|
283 | read_data = telnet.read_lazy()
|
---|
284 | data += read_data
|
---|
285 | if not read_data:
|
---|
286 | telnet.fill_rawq()
|
---|
287 | except EOFError:
|
---|
288 | break
|
---|
289 | self.assertTrue(want[0].startswith(data))
|
---|
290 | self.assertEqual(data, want[0])
|
---|
291 |
|
---|
292 | def test_read_lazy_B(self):
|
---|
293 | self._test_read_any_lazy_B('read_lazy')
|
---|
294 |
|
---|
295 | def test_read_very_lazy_A(self):
|
---|
296 | want = ['x' * 100, EOF_sigil]
|
---|
297 | self.dataq.put(want)
|
---|
298 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
299 | self.dataq.join()
|
---|
300 | time.sleep(self.block_short)
|
---|
301 | self.assertEqual('', telnet.read_very_lazy())
|
---|
302 | data = ''
|
---|
303 | while True:
|
---|
304 | try:
|
---|
305 | read_data = telnet.read_very_lazy()
|
---|
306 | except EOFError:
|
---|
307 | break
|
---|
308 | data += read_data
|
---|
309 | if not read_data:
|
---|
310 | telnet.fill_rawq()
|
---|
311 | self.assertEqual('', telnet.cookedq)
|
---|
312 | telnet.process_rawq()
|
---|
313 | self.assertTrue(want[0].startswith(data))
|
---|
314 | self.assertEqual(data, want[0])
|
---|
315 |
|
---|
316 | def test_read_very_lazy_B(self):
|
---|
317 | self._test_read_any_lazy_B('read_very_lazy')
|
---|
318 |
|
---|
319 | class nego_collector(object):
|
---|
320 | def __init__(self, sb_getter=None):
|
---|
321 | self.seen = ''
|
---|
322 | self.sb_getter = sb_getter
|
---|
323 | self.sb_seen = ''
|
---|
324 |
|
---|
325 | def do_nego(self, sock, cmd, opt):
|
---|
326 | self.seen += cmd + opt
|
---|
327 | if cmd == tl.SE and self.sb_getter:
|
---|
328 | sb_data = self.sb_getter()
|
---|
329 | self.sb_seen += sb_data
|
---|
330 |
|
---|
331 | tl = telnetlib
|
---|
332 | class OptionTests(TestCase):
|
---|
333 | setUp = _read_setUp
|
---|
334 | tearDown = _read_tearDown
|
---|
335 | # RFC 854 commands
|
---|
336 | cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
|
---|
337 |
|
---|
338 | def _test_command(self, data):
|
---|
339 | """ helper for testing IAC + cmd """
|
---|
340 | self.setUp()
|
---|
341 | self.dataq.put(data)
|
---|
342 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
343 | self.dataq.join()
|
---|
344 | nego = nego_collector()
|
---|
345 | telnet.set_option_negotiation_callback(nego.do_nego)
|
---|
346 | txt = telnet.read_all()
|
---|
347 | cmd = nego.seen
|
---|
348 | self.assertTrue(len(cmd) > 0) # we expect at least one command
|
---|
349 | self.assertIn(cmd[0], self.cmds)
|
---|
350 | self.assertEqual(cmd[1], tl.NOOPT)
|
---|
351 | self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
|
---|
352 | nego.sb_getter = None # break the nego => telnet cycle
|
---|
353 | self.tearDown()
|
---|
354 |
|
---|
355 | def test_IAC_commands(self):
|
---|
356 | # reset our setup
|
---|
357 | self.dataq.put([EOF_sigil])
|
---|
358 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
359 | self.dataq.join()
|
---|
360 | self.tearDown()
|
---|
361 |
|
---|
362 | for cmd in self.cmds:
|
---|
363 | self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
|
---|
364 | self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
|
---|
365 | self._test_command([tl.IAC + cmd, EOF_sigil])
|
---|
366 | # all at once
|
---|
367 | self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
|
---|
368 | self.assertEqual('', telnet.read_sb_data())
|
---|
369 |
|
---|
370 | def test_SB_commands(self):
|
---|
371 | # RFC 855, subnegotiations portion
|
---|
372 | send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
|
---|
373 | tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
---|
374 | tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
|
---|
375 | tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
---|
376 | tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
|
---|
377 | EOF_sigil,
|
---|
378 | ]
|
---|
379 | self.dataq.put(send)
|
---|
380 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
381 | self.dataq.join()
|
---|
382 | nego = nego_collector(telnet.read_sb_data)
|
---|
383 | telnet.set_option_negotiation_callback(nego.do_nego)
|
---|
384 | txt = telnet.read_all()
|
---|
385 | self.assertEqual(txt, '')
|
---|
386 | want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
|
---|
387 | self.assertEqual(nego.sb_seen, want_sb_data)
|
---|
388 | self.assertEqual('', telnet.read_sb_data())
|
---|
389 | nego.sb_getter = None # break the nego => telnet cycle
|
---|
390 |
|
---|
391 |
|
---|
392 | class ExpectTests(TestCase):
|
---|
393 | def setUp(self):
|
---|
394 | self.evt = threading.Event()
|
---|
395 | self.dataq = Queue.Queue()
|
---|
396 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
397 | self.sock.settimeout(10)
|
---|
398 | self.port = test_support.bind_port(self.sock)
|
---|
399 | self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
|
---|
400 | self.dataq))
|
---|
401 | self.thread.start()
|
---|
402 | self.evt.wait()
|
---|
403 |
|
---|
404 | def tearDown(self):
|
---|
405 | self.thread.join()
|
---|
406 |
|
---|
407 | # use a similar approach to testing timeouts as test_timeout.py
|
---|
408 | # these will never pass 100% but make the fuzz big enough that it is rare
|
---|
409 | block_long = 0.6
|
---|
410 | block_short = 0.3
|
---|
411 | def test_expect_A(self):
|
---|
412 | """
|
---|
413 | expect(expected, [timeout])
|
---|
414 | Read until the expected string has been seen, or a timeout is
|
---|
415 | hit (default is no timeout); may block.
|
---|
416 | """
|
---|
417 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
418 | self.dataq.put(want)
|
---|
419 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
420 | self.dataq.join()
|
---|
421 | (_,_,data) = telnet.expect(['match'])
|
---|
422 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
423 |
|
---|
424 | def test_expect_B(self):
|
---|
425 | # test the timeout - it does NOT raise socket.timeout
|
---|
426 | want = ['hello', self.block_long, 'not seen', EOF_sigil]
|
---|
427 | self.dataq.put(want)
|
---|
428 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
429 | self.dataq.join()
|
---|
430 | (_,_,data) = telnet.expect(['not seen'], self.block_short)
|
---|
431 | self.assertEqual(data, want[0])
|
---|
432 | self.assertEqual(telnet.read_all(), 'not seen')
|
---|
433 |
|
---|
434 | def test_expect_with_poll(self):
|
---|
435 | """Use select.poll() to implement telnet.expect()."""
|
---|
436 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
437 | self.dataq.put(want)
|
---|
438 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
439 | if not telnet._has_poll:
|
---|
440 | raise unittest.SkipTest('select.poll() is required')
|
---|
441 | telnet._has_poll = True
|
---|
442 | self.dataq.join()
|
---|
443 | (_,_,data) = telnet.expect(['match'])
|
---|
444 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
445 |
|
---|
446 | def test_expect_with_select(self):
|
---|
447 | """Use select.select() to implement telnet.expect()."""
|
---|
448 | want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
---|
449 | self.dataq.put(want)
|
---|
450 | telnet = telnetlib.Telnet(HOST, self.port)
|
---|
451 | telnet._has_poll = False
|
---|
452 | self.dataq.join()
|
---|
453 | (_,_,data) = telnet.expect(['match'])
|
---|
454 | self.assertEqual(data, ''.join(want[:-2]))
|
---|
455 |
|
---|
456 |
|
---|
457 | def test_main(verbose=None):
|
---|
458 | test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
|
---|
459 | ExpectTests)
|
---|
460 |
|
---|
461 | if __name__ == '__main__':
|
---|
462 | test_main()
|
---|