Ignore:
Timestamp:
Mar 19, 2014, 11:31:01 PM (11 years ago)
Author:
dmik
Message:

python: Merge vendor 2.7.6 to trunk.

Location:
python/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • python/trunk

  • python/trunk/Lib/test/test_poplib.py

    r2 r391  
     1"""Test script for poplib module."""
     2
     3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
     4# a real test suite
     5
     6import poplib
     7import asyncore
     8import asynchat
    19import socket
    2 import threading
    3 import poplib
     10import os
    411import time
     12import errno
    513
    614from unittest import TestCase
    715from test import test_support
    8 
    9 HOST = test_support.HOST
    10 
    11 def server(evt, serv):
    12     serv.listen(5)
    13     try:
    14         conn, addr = serv.accept()
    15     except socket.timeout:
    16         pass
    17     else:
    18         conn.send("+ Hola mundo\n")
    19         conn.close()
    20     finally:
    21         serv.close()
    22         evt.set()
    23 
    24 class GeneralTests(TestCase):
     16from test.test_support import HOST
     17threading = test_support.import_module('threading')
     18
     19
     20# the dummy data returned by server when LIST and RETR commands are issued
     21LIST_RESP = '1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
     22RETR_RESP = """From: postmaster@python.org\
     23\r\nContent-Type: text/plain\r\n\
     24MIME-Version: 1.0\r\n\
     25Subject: Dummy\r\n\
     26\r\n\
     27line1\r\n\
     28line2\r\n\
     29line3\r\n\
     30.\r\n"""
     31
     32
     33class DummyPOP3Handler(asynchat.async_chat):
     34
     35    def __init__(self, conn):
     36        asynchat.async_chat.__init__(self, conn)
     37        self.set_terminator("\r\n")
     38        self.in_buffer = []
     39        self.push('+OK dummy pop3 server ready.')
     40
     41    def collect_incoming_data(self, data):
     42        self.in_buffer.append(data)
     43
     44    def found_terminator(self):
     45        line = ''.join(self.in_buffer)
     46        self.in_buffer = []
     47        cmd = line.split(' ')[0].lower()
     48        space = line.find(' ')
     49        if space != -1:
     50            arg = line[space + 1:]
     51        else:
     52            arg = ""
     53        if hasattr(self, 'cmd_' + cmd):
     54            method = getattr(self, 'cmd_' + cmd)
     55            method(arg)
     56        else:
     57            self.push('-ERR unrecognized POP3 command "%s".' %cmd)
     58
     59    def handle_error(self):
     60        raise
     61
     62    def push(self, data):
     63        asynchat.async_chat.push(self, data + '\r\n')
     64
     65    def cmd_echo(self, arg):
     66        # sends back the received string (used by the test suite)
     67        self.push(arg)
     68
     69    def cmd_user(self, arg):
     70        if arg != "guido":
     71            self.push("-ERR no such user")
     72        self.push('+OK password required')
     73
     74    def cmd_pass(self, arg):
     75        if arg != "python":
     76            self.push("-ERR wrong password")
     77        self.push('+OK 10 messages')
     78
     79    def cmd_stat(self, arg):
     80        self.push('+OK 10 100')
     81
     82    def cmd_list(self, arg):
     83        if arg:
     84            self.push('+OK %s %s' %(arg, arg))
     85        else:
     86            self.push('+OK')
     87            asynchat.async_chat.push(self, LIST_RESP)
     88
     89    cmd_uidl = cmd_list
     90
     91    def cmd_retr(self, arg):
     92        self.push('+OK %s bytes' %len(RETR_RESP))
     93        asynchat.async_chat.push(self, RETR_RESP)
     94
     95    cmd_top = cmd_retr
     96
     97    def cmd_dele(self, arg):
     98        self.push('+OK message marked for deletion.')
     99
     100    def cmd_noop(self, arg):
     101        self.push('+OK done nothing.')
     102
     103    def cmd_rpop(self, arg):
     104        self.push('+OK done nothing.')
     105
     106
     107class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
     108
     109    handler = DummyPOP3Handler
     110
     111    def __init__(self, address, af=socket.AF_INET):
     112        threading.Thread.__init__(self)
     113        asyncore.dispatcher.__init__(self)
     114        self.create_socket(af, socket.SOCK_STREAM)
     115        self.bind(address)
     116        self.listen(5)
     117        self.active = False
     118        self.active_lock = threading.Lock()
     119        self.host, self.port = self.socket.getsockname()[:2]
     120
     121    def start(self):
     122        assert not self.active
     123        self.__flag = threading.Event()
     124        threading.Thread.start(self)
     125        self.__flag.wait()
     126
     127    def run(self):
     128        self.active = True
     129        self.__flag.set()
     130        while self.active and asyncore.socket_map:
     131            self.active_lock.acquire()
     132            asyncore.loop(timeout=0.1, count=1)
     133            self.active_lock.release()
     134        asyncore.close_all(ignore_all=True)
     135
     136    def stop(self):
     137        assert self.active
     138        self.active = False
     139        self.join()
     140
     141    def handle_accept(self):
     142        conn, addr = self.accept()
     143        self.handler = self.handler(conn)
     144        self.close()
     145
     146    def handle_connect(self):
     147        self.close()
     148    handle_read = handle_connect
     149
     150    def writable(self):
     151        return 0
     152
     153    def handle_error(self):
     154        raise
     155
     156
     157class TestPOP3Class(TestCase):
     158
     159    def assertOK(self, resp):
     160        self.assertTrue(resp.startswith("+OK"))
     161
     162    def setUp(self):
     163        self.server = DummyPOP3Server((HOST, 0))
     164        self.server.start()
     165        self.client = poplib.POP3(self.server.host, self.server.port)
     166
     167    def tearDown(self):
     168        self.client.quit()
     169        self.server.stop()
     170
     171    def test_getwelcome(self):
     172        self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.')
     173
     174    def test_exceptions(self):
     175        self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
     176
     177    def test_user(self):
     178        self.assertOK(self.client.user('guido'))
     179        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     180
     181    def test_pass_(self):
     182        self.assertOK(self.client.pass_('python'))
     183        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
     184
     185    def test_stat(self):
     186        self.assertEqual(self.client.stat(), (10, 100))
     187
     188    def test_list(self):
     189        self.assertEqual(self.client.list()[1:],
     190                         (['1 1', '2 2', '3 3', '4 4', '5 5'], 25))
     191        self.assertTrue(self.client.list('1').endswith("OK 1 1"))
     192
     193    def test_retr(self):
     194        expected = ('+OK 116 bytes',
     195                    ['From: postmaster@python.org', 'Content-Type: text/plain',
     196                     'MIME-Version: 1.0', 'Subject: Dummy',
     197                     '', 'line1', 'line2', 'line3'],
     198                    113)
     199        self.assertEqual(self.client.retr('foo'), expected)
     200
     201    def test_dele(self):
     202        self.assertOK(self.client.dele('foo'))
     203
     204    def test_noop(self):
     205        self.assertOK(self.client.noop())
     206
     207    def test_rpop(self):
     208        self.assertOK(self.client.rpop('foo'))
     209
     210    def test_top(self):
     211        expected =  ('+OK 116 bytes',
     212                     ['From: postmaster@python.org', 'Content-Type: text/plain',
     213                      'MIME-Version: 1.0', 'Subject: Dummy', '',
     214                      'line1', 'line2', 'line3'],
     215                     113)
     216        self.assertEqual(self.client.top(1, 1), expected)
     217
     218    def test_uidl(self):
     219        self.client.uidl()
     220        self.client.uidl('foo')
     221
     222
     223SUPPORTS_SSL = False
     224if hasattr(poplib, 'POP3_SSL'):
     225    import ssl
     226
     227    SUPPORTS_SSL = True
     228    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
     229
     230    class DummyPOP3_SSLHandler(DummyPOP3Handler):
     231
     232        def __init__(self, conn):
     233            asynchat.async_chat.__init__(self, conn)
     234            self.socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
     235                                          server_side=True,
     236                                          do_handshake_on_connect=False)
     237            # Must try handshake before calling push()
     238            self._ssl_accepting = True
     239            self._do_ssl_handshake()
     240            self.set_terminator("\r\n")
     241            self.in_buffer = []
     242            self.push('+OK dummy pop3 server ready.')
     243
     244        def _do_ssl_handshake(self):
     245            try:
     246                self.socket.do_handshake()
     247            except ssl.SSLError, err:
     248                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
     249                                   ssl.SSL_ERROR_WANT_WRITE):
     250                    return
     251                elif err.args[0] == ssl.SSL_ERROR_EOF:
     252                    return self.handle_close()
     253                raise
     254            except socket.error, err:
     255                if err.args[0] == errno.ECONNABORTED:
     256                    return self.handle_close()
     257            else:
     258                self._ssl_accepting = False
     259
     260        def handle_read(self):
     261            if self._ssl_accepting:
     262                self._do_ssl_handshake()
     263            else:
     264                DummyPOP3Handler.handle_read(self)
     265
     266    class TestPOP3_SSLClass(TestPOP3Class):
     267        # repeat previous tests by using poplib.POP3_SSL
     268
     269        def setUp(self):
     270            self.server = DummyPOP3Server((HOST, 0))
     271            self.server.handler = DummyPOP3_SSLHandler
     272            self.server.start()
     273            self.client = poplib.POP3_SSL(self.server.host, self.server.port)
     274
     275        def test__all__(self):
     276            self.assertIn('POP3_SSL', poplib.__all__)
     277
     278
     279class TestTimeouts(TestCase):
    25280
    26281    def setUp(self):
    27282        self.evt = threading.Event()
    28283        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    29         self.sock.settimeout(3)
     284        self.sock.settimeout(60)  # Safety net. Look issue 11812
    30285        self.port = test_support.bind_port(self.sock)
    31         threading.Thread(target=server, args=(self.evt,self.sock)).start()
    32         time.sleep(.1)
     286        self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock))
     287        self.thread.setDaemon(True)
     288        self.thread.start()
     289        self.evt.wait()
    33290
    34291    def tearDown(self):
    35         self.evt.wait()
    36 
    37     def testBasic(self):
    38         # connects
    39         pop = poplib.POP3(HOST, self.port)
    40         pop.sock.close()
     292        self.thread.join()
     293        del self.thread  # Clear out any dangling Thread objects.
     294
     295    def server(self, evt, serv):
     296        serv.listen(5)
     297        evt.set()
     298        try:
     299            conn, addr = serv.accept()
     300            conn.send("+ Hola mundo\n")
     301            conn.close()
     302        except socket.timeout:
     303            pass
     304        finally:
     305            serv.close()
    41306
    42307    def testTimeoutDefault(self):
     
    44309        socket.setdefaulttimeout(30)
    45310        try:
    46             pop = poplib.POP3("localhost", self.port)
     311            pop = poplib.POP3(HOST, self.port)
    47312        finally:
    48313            socket.setdefaulttimeout(None)
     
    61326
    62327    def testTimeoutValue(self):
    63         pop = poplib.POP3("localhost", self.port, timeout=30)
     328        pop = poplib.POP3(HOST, self.port, timeout=30)
    64329        self.assertEqual(pop.sock.gettimeout(), 30)
    65330        pop.sock.close()
    66331
    67332
    68 def test_main(verbose=None):
    69     test_support.run_unittest(GeneralTests)
     333def test_main():
     334    tests = [TestPOP3Class, TestTimeouts]
     335    if SUPPORTS_SSL:
     336        tests.append(TestPOP3_SSLClass)
     337    thread_info = test_support.threading_setup()
     338    try:
     339        test_support.run_unittest(*tests)
     340    finally:
     341        test_support.threading_cleanup(*thread_info)
     342
    70343
    71344if __name__ == '__main__':
Note: See TracChangeset for help on using the changeset viewer.