Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_telnetlib.py

    r2 r391  
    11import socket
    2 import threading
    32import telnetlib
    43import time
    5 
     4import Queue
     5
     6import unittest
    67from unittest import TestCase
    78from test import test_support
     9threading = test_support.import_module('threading')
    810
    911HOST = test_support.HOST
    10 
    11 def server(evt, serv):
     12EOF_sigil = object()
     13
     14def server(evt, serv, dataq=None):
     15    """ Open a tcp server in three steps
     16        1) set evt to true to let the parent know we are ready
     17        2) [optional] if is not False, write the list of data from dataq.get()
     18           to the socket.
     19    """
    1220    serv.listen(5)
    1321    evt.set()
    1422    try:
    1523        conn, addr = serv.accept()
     24        if dataq:
     25            data = ''
     26            new_data = dataq.get(True, 0.5)
     27            dataq.task_done()
     28            for item in new_data:
     29                if item == EOF_sigil:
     30                    break
     31                if type(item) in [int, float]:
     32                    time.sleep(item)
     33                else:
     34                    data += item
     35                written = conn.send(data)
     36                data = data[written:]
     37        conn.close()
    1638    except socket.timeout:
    1739        pass
    1840    finally:
    1941        serv.close()
    20         evt.set()
    2142
    2243class GeneralTests(TestCase):
     
    2546        self.evt = threading.Event()
    2647        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    27         self.sock.settimeout(3)
     48        self.sock.settimeout(60)  # Safety net. Look issue 11812
    2849        self.port = test_support.bind_port(self.sock)
    29         threading.Thread(target=server, args=(self.evt,self.sock)).start()
     50        self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
     51        self.thread.setDaemon(True)
     52        self.thread.start()
    3053        self.evt.wait()
    31         self.evt.clear()
    32         time.sleep(.1)
    3354
    3455    def tearDown(self):
    35         self.evt.wait()
     56        self.thread.join()
    3657
    3758    def testBasic(self):
     
    4465        socket.setdefaulttimeout(30)
    4566        try:
    46             telnet = telnetlib.Telnet("localhost", self.port)
     67            telnet = telnetlib.Telnet(HOST, self.port)
    4768        finally:
    4869            socket.setdefaulttimeout(None)
     
    6283
    6384    def testTimeoutValue(self):
    64         telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
     85        telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
    6586        self.assertEqual(telnet.sock.gettimeout(), 30)
    6687        telnet.sock.close()
     
    6889    def testTimeoutOpen(self):
    6990        telnet = telnetlib.Telnet()
    70         telnet.open("localhost", self.port, timeout=30)
     91        telnet.open(HOST, self.port, timeout=30)
    7192        self.assertEqual(telnet.sock.gettimeout(), 30)
    7293        telnet.sock.close()
    7394
     95    def testGetters(self):
     96        # Test telnet getter methods
     97        telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
     98        t_sock = telnet.sock
     99        self.assertEqual(telnet.get_socket(), t_sock)
     100        self.assertEqual(telnet.fileno(), t_sock.fileno())
     101        telnet.sock.close()
     102
     103def _read_setUp(self):
     104    self.evt = threading.Event()
     105    self.dataq = Queue.Queue()
     106    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     107    self.sock.settimeout(10)
     108    self.port = test_support.bind_port(self.sock)
     109    self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
     110    self.thread.start()
     111    self.evt.wait()
     112
     113def _read_tearDown(self):
     114    self.thread.join()
     115
     116class ReadTests(TestCase):
     117    setUp = _read_setUp
     118    tearDown = _read_tearDown
     119
     120    # use a similar approach to testing timeouts as test_timeout.py
     121    # these will never pass 100% but make the fuzz big enough that it is rare
     122    block_long = 0.6
     123    block_short = 0.3
     124    def test_read_until_A(self):
     125        """
     126        read_until(expected, [timeout])
     127          Read until the expected string has been seen, or a timeout is
     128          hit (default is no timeout); may block.
     129        """
     130        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     131        self.dataq.put(want)
     132        telnet = telnetlib.Telnet(HOST, self.port)
     133        self.dataq.join()
     134        data = telnet.read_until('match')
     135        self.assertEqual(data, ''.join(want[:-2]))
     136
     137    def test_read_until_B(self):
     138        # test the timeout - it does NOT raise socket.timeout
     139        want = ['hello', self.block_long, 'not seen', EOF_sigil]
     140        self.dataq.put(want)
     141        telnet = telnetlib.Telnet(HOST, self.port)
     142        self.dataq.join()
     143        data = telnet.read_until('not seen', self.block_short)
     144        self.assertEqual(data, want[0])
     145        self.assertEqual(telnet.read_all(), 'not seen')
     146
     147    def test_read_until_with_poll(self):
     148        """Use select.poll() to implement telnet.read_until()."""
     149        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     150        self.dataq.put(want)
     151        telnet = telnetlib.Telnet(HOST, self.port)
     152        if not telnet._has_poll:
     153            raise unittest.SkipTest('select.poll() is required')
     154        telnet._has_poll = True
     155        self.dataq.join()
     156        data = telnet.read_until('match')
     157        self.assertEqual(data, ''.join(want[:-2]))
     158
     159    def test_read_until_with_select(self):
     160        """Use select.select() to implement telnet.read_until()."""
     161        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     162        self.dataq.put(want)
     163        telnet = telnetlib.Telnet(HOST, self.port)
     164        telnet._has_poll = False
     165        self.dataq.join()
     166        data = telnet.read_until('match')
     167        self.assertEqual(data, ''.join(want[:-2]))
     168
     169    def test_read_all_A(self):
     170        """
     171        read_all()
     172          Read all data until EOF; may block.
     173        """
     174        want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
     175        self.dataq.put(want)
     176        telnet = telnetlib.Telnet(HOST, self.port)
     177        self.dataq.join()
     178        data = telnet.read_all()
     179        self.assertEqual(data, ''.join(want[:-1]))
     180        return
     181
     182    def _test_blocking(self, func):
     183        self.dataq.put([self.block_long, EOF_sigil])
     184        self.dataq.join()
     185        start = time.time()
     186        data = func()
     187        self.assertTrue(self.block_short <= time.time() - start)
     188
     189    def test_read_all_B(self):
     190        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
     191
     192    def test_read_all_C(self):
     193        self.dataq.put([EOF_sigil])
     194        telnet = telnetlib.Telnet(HOST, self.port)
     195        self.dataq.join()
     196        telnet.read_all()
     197        telnet.read_all() # shouldn't raise
     198
     199    def test_read_some_A(self):
     200        """
     201        read_some()
     202          Read at least one byte or EOF; may block.
     203        """
     204        # test 'at least one byte'
     205        want = ['x' * 500, EOF_sigil]
     206        self.dataq.put(want)
     207        telnet = telnetlib.Telnet(HOST, self.port)
     208        self.dataq.join()
     209        data = telnet.read_all()
     210        self.assertTrue(len(data) >= 1)
     211
     212    def test_read_some_B(self):
     213        # test EOF
     214        self.dataq.put([EOF_sigil])
     215        telnet = telnetlib.Telnet(HOST, self.port)
     216        self.dataq.join()
     217        self.assertEqual('', telnet.read_some())
     218
     219    def test_read_some_C(self):
     220        self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
     221
     222    def _test_read_any_eager_A(self, func_name):
     223        """
     224        read_very_eager()
     225          Read all data available already queued or on the socket,
     226          without blocking.
     227        """
     228        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
     229        expects = want[1] + want[2]
     230        self.dataq.put(want)
     231        telnet = telnetlib.Telnet(HOST, self.port)
     232        self.dataq.join()
     233        func = getattr(telnet, func_name)
     234        data = ''
     235        while True:
     236            try:
     237                data += func()
     238                self.assertTrue(expects.startswith(data))
     239            except EOFError:
     240                break
     241        self.assertEqual(expects, data)
     242
     243    def _test_read_any_eager_B(self, func_name):
     244        # test EOF
     245        self.dataq.put([EOF_sigil])
     246        telnet = telnetlib.Telnet(HOST, self.port)
     247        self.dataq.join()
     248        time.sleep(self.block_short)
     249        func = getattr(telnet, func_name)
     250        self.assertRaises(EOFError, func)
     251
     252    # read_eager and read_very_eager make the same gaurantees
     253    # (they behave differently but we only test the gaurantees)
     254    def test_read_very_eager_A(self):
     255        self._test_read_any_eager_A('read_very_eager')
     256    def test_read_very_eager_B(self):
     257        self._test_read_any_eager_B('read_very_eager')
     258    def test_read_eager_A(self):
     259        self._test_read_any_eager_A('read_eager')
     260    def test_read_eager_B(self):
     261        self._test_read_any_eager_B('read_eager')
     262    # NB -- we need to test the IAC block which is mentioned in the docstring
     263    # but not in the module docs
     264
     265    def _test_read_any_lazy_B(self, func_name):
     266        self.dataq.put([EOF_sigil])
     267        telnet = telnetlib.Telnet(HOST, self.port)
     268        self.dataq.join()
     269        func = getattr(telnet, func_name)
     270        telnet.fill_rawq()
     271        self.assertRaises(EOFError, func)
     272
     273    def test_read_lazy_A(self):
     274        want = ['x' * 100, EOF_sigil]
     275        self.dataq.put(want)
     276        telnet = telnetlib.Telnet(HOST, self.port)
     277        self.dataq.join()
     278        time.sleep(self.block_short)
     279        self.assertEqual('', telnet.read_lazy())
     280        data = ''
     281        while True:
     282            try:
     283                read_data = telnet.read_lazy()
     284                data += read_data
     285                if not read_data:
     286                    telnet.fill_rawq()
     287            except EOFError:
     288                break
     289            self.assertTrue(want[0].startswith(data))
     290        self.assertEqual(data, want[0])
     291
     292    def test_read_lazy_B(self):
     293        self._test_read_any_lazy_B('read_lazy')
     294
     295    def test_read_very_lazy_A(self):
     296        want = ['x' * 100, EOF_sigil]
     297        self.dataq.put(want)
     298        telnet = telnetlib.Telnet(HOST, self.port)
     299        self.dataq.join()
     300        time.sleep(self.block_short)
     301        self.assertEqual('', telnet.read_very_lazy())
     302        data = ''
     303        while True:
     304            try:
     305                read_data = telnet.read_very_lazy()
     306            except EOFError:
     307                break
     308            data += read_data
     309            if not read_data:
     310                telnet.fill_rawq()
     311                self.assertEqual('', telnet.cookedq)
     312                telnet.process_rawq()
     313            self.assertTrue(want[0].startswith(data))
     314        self.assertEqual(data, want[0])
     315
     316    def test_read_very_lazy_B(self):
     317        self._test_read_any_lazy_B('read_very_lazy')
     318
     319class nego_collector(object):
     320    def __init__(self, sb_getter=None):
     321        self.seen = ''
     322        self.sb_getter = sb_getter
     323        self.sb_seen = ''
     324
     325    def do_nego(self, sock, cmd, opt):
     326        self.seen += cmd + opt
     327        if cmd == tl.SE and self.sb_getter:
     328            sb_data = self.sb_getter()
     329            self.sb_seen += sb_data
     330
     331tl = telnetlib
     332class OptionTests(TestCase):
     333    setUp = _read_setUp
     334    tearDown = _read_tearDown
     335    # RFC 854 commands
     336    cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
     337
     338    def _test_command(self, data):
     339        """ helper for testing IAC + cmd """
     340        self.setUp()
     341        self.dataq.put(data)
     342        telnet = telnetlib.Telnet(HOST, self.port)
     343        self.dataq.join()
     344        nego = nego_collector()
     345        telnet.set_option_negotiation_callback(nego.do_nego)
     346        txt = telnet.read_all()
     347        cmd = nego.seen
     348        self.assertTrue(len(cmd) > 0) # we expect at least one command
     349        self.assertIn(cmd[0], self.cmds)
     350        self.assertEqual(cmd[1], tl.NOOPT)
     351        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
     352        nego.sb_getter = None # break the nego => telnet cycle
     353        self.tearDown()
     354
     355    def test_IAC_commands(self):
     356        # reset our setup
     357        self.dataq.put([EOF_sigil])
     358        telnet = telnetlib.Telnet(HOST, self.port)
     359        self.dataq.join()
     360        self.tearDown()
     361
     362        for cmd in self.cmds:
     363            self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
     364            self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
     365            self._test_command([tl.IAC + cmd, EOF_sigil])
     366        # all at once
     367        self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
     368        self.assertEqual('', telnet.read_sb_data())
     369
     370    def test_SB_commands(self):
     371        # RFC 855, subnegotiations portion
     372        send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
     373                tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
     374                tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
     375                tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
     376                tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
     377                EOF_sigil,
     378               ]
     379        self.dataq.put(send)
     380        telnet = telnetlib.Telnet(HOST, self.port)
     381        self.dataq.join()
     382        nego = nego_collector(telnet.read_sb_data)
     383        telnet.set_option_negotiation_callback(nego.do_nego)
     384        txt = telnet.read_all()
     385        self.assertEqual(txt, '')
     386        want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
     387        self.assertEqual(nego.sb_seen, want_sb_data)
     388        self.assertEqual('', telnet.read_sb_data())
     389        nego.sb_getter = None # break the nego => telnet cycle
     390
     391
     392class ExpectTests(TestCase):
     393    def setUp(self):
     394        self.evt = threading.Event()
     395        self.dataq = Queue.Queue()
     396        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     397        self.sock.settimeout(10)
     398        self.port = test_support.bind_port(self.sock)
     399        self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
     400                                                            self.dataq))
     401        self.thread.start()
     402        self.evt.wait()
     403
     404    def tearDown(self):
     405        self.thread.join()
     406
     407    # use a similar approach to testing timeouts as test_timeout.py
     408    # these will never pass 100% but make the fuzz big enough that it is rare
     409    block_long = 0.6
     410    block_short = 0.3
     411    def test_expect_A(self):
     412        """
     413        expect(expected, [timeout])
     414          Read until the expected string has been seen, or a timeout is
     415          hit (default is no timeout); may block.
     416        """
     417        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     418        self.dataq.put(want)
     419        telnet = telnetlib.Telnet(HOST, self.port)
     420        self.dataq.join()
     421        (_,_,data) = telnet.expect(['match'])
     422        self.assertEqual(data, ''.join(want[:-2]))
     423
     424    def test_expect_B(self):
     425        # test the timeout - it does NOT raise socket.timeout
     426        want = ['hello', self.block_long, 'not seen', EOF_sigil]
     427        self.dataq.put(want)
     428        telnet = telnetlib.Telnet(HOST, self.port)
     429        self.dataq.join()
     430        (_,_,data) = telnet.expect(['not seen'], self.block_short)
     431        self.assertEqual(data, want[0])
     432        self.assertEqual(telnet.read_all(), 'not seen')
     433
     434    def test_expect_with_poll(self):
     435        """Use select.poll() to implement telnet.expect()."""
     436        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     437        self.dataq.put(want)
     438        telnet = telnetlib.Telnet(HOST, self.port)
     439        if not telnet._has_poll:
     440            raise unittest.SkipTest('select.poll() is required')
     441        telnet._has_poll = True
     442        self.dataq.join()
     443        (_,_,data) = telnet.expect(['match'])
     444        self.assertEqual(data, ''.join(want[:-2]))
     445
     446    def test_expect_with_select(self):
     447        """Use select.select() to implement telnet.expect()."""
     448        want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
     449        self.dataq.put(want)
     450        telnet = telnetlib.Telnet(HOST, self.port)
     451        telnet._has_poll = False
     452        self.dataq.join()
     453        (_,_,data) = telnet.expect(['match'])
     454        self.assertEqual(data, ''.join(want[:-2]))
     455
    74456
    75457def test_main(verbose=None):
    76     test_support.run_unittest(GeneralTests)
     458    test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
     459                              ExpectTests)
    77460
    78461if __name__ == '__main__':
Note: See TracChangeset for help on using the changeset viewer.