1 | from test import test_support as support
|
---|
2 | # If we end up with a significant number of tests that don't require
|
---|
3 | # threading, this test module should be split. Right now we skip
|
---|
4 | # them all if we don't have threading.
|
---|
5 | threading = support.import_module('threading')
|
---|
6 |
|
---|
7 | from contextlib import contextmanager
|
---|
8 | import imaplib
|
---|
9 | import os.path
|
---|
10 | import SocketServer
|
---|
11 | import time
|
---|
12 |
|
---|
13 | from test_support import reap_threads, verbose, transient_internet
|
---|
14 | import unittest
|
---|
15 |
|
---|
16 | try:
|
---|
17 | import ssl
|
---|
18 | except ImportError:
|
---|
19 | ssl = None
|
---|
20 |
|
---|
21 | CERTFILE = None
|
---|
22 |
|
---|
23 |
|
---|
24 | class TestImaplib(unittest.TestCase):
|
---|
25 |
|
---|
26 | def test_that_Time2Internaldate_returns_a_result(self):
|
---|
27 | # We can check only that it successfully produces a result,
|
---|
28 | # not the correctness of the result itself, since the result
|
---|
29 | # depends on the timezone the machine is in.
|
---|
30 | timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
|
---|
31 | '"18-May-2033 05:33:20 +0200"']
|
---|
32 |
|
---|
33 | for t in timevalues:
|
---|
34 | imaplib.Time2Internaldate(t)
|
---|
35 |
|
---|
36 |
|
---|
37 | if ssl:
|
---|
38 |
|
---|
39 | class SecureTCPServer(SocketServer.TCPServer):
|
---|
40 |
|
---|
41 | def get_request(self):
|
---|
42 | newsocket, fromaddr = self.socket.accept()
|
---|
43 | connstream = ssl.wrap_socket(newsocket,
|
---|
44 | server_side=True,
|
---|
45 | certfile=CERTFILE)
|
---|
46 | return connstream, fromaddr
|
---|
47 |
|
---|
48 | IMAP4_SSL = imaplib.IMAP4_SSL
|
---|
49 |
|
---|
50 | else:
|
---|
51 |
|
---|
52 | class SecureTCPServer:
|
---|
53 | pass
|
---|
54 |
|
---|
55 | IMAP4_SSL = None
|
---|
56 |
|
---|
57 |
|
---|
58 | class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
|
---|
59 |
|
---|
60 | timeout = 1
|
---|
61 |
|
---|
62 | def _send(self, message):
|
---|
63 | if verbose: print "SENT:", message.strip()
|
---|
64 | self.wfile.write(message)
|
---|
65 |
|
---|
66 | def handle(self):
|
---|
67 | # Send a welcome message.
|
---|
68 | self._send('* OK IMAP4rev1\r\n')
|
---|
69 | while 1:
|
---|
70 | # Gather up input until we receive a line terminator or we timeout.
|
---|
71 | # Accumulate read(1) because it's simpler to handle the differences
|
---|
72 | # between naked sockets and SSL sockets.
|
---|
73 | line = ''
|
---|
74 | while 1:
|
---|
75 | try:
|
---|
76 | part = self.rfile.read(1)
|
---|
77 | if part == '':
|
---|
78 | # Naked sockets return empty strings..
|
---|
79 | return
|
---|
80 | line += part
|
---|
81 | except IOError:
|
---|
82 | # ..but SSLSockets raise exceptions.
|
---|
83 | return
|
---|
84 | if line.endswith('\r\n'):
|
---|
85 | break
|
---|
86 |
|
---|
87 | if verbose: print 'GOT:', line.strip()
|
---|
88 | splitline = line.split()
|
---|
89 | tag = splitline[0]
|
---|
90 | cmd = splitline[1]
|
---|
91 | args = splitline[2:]
|
---|
92 |
|
---|
93 | if hasattr(self, 'cmd_%s' % (cmd,)):
|
---|
94 | getattr(self, 'cmd_%s' % (cmd,))(tag, args)
|
---|
95 | else:
|
---|
96 | self._send('%s BAD %s unknown\r\n' % (tag, cmd))
|
---|
97 |
|
---|
98 | def cmd_CAPABILITY(self, tag, args):
|
---|
99 | self._send('* CAPABILITY IMAP4rev1\r\n')
|
---|
100 | self._send('%s OK CAPABILITY completed\r\n' % (tag,))
|
---|
101 |
|
---|
102 |
|
---|
103 | class BaseThreadedNetworkedTests(unittest.TestCase):
|
---|
104 |
|
---|
105 | def make_server(self, addr, hdlr):
|
---|
106 |
|
---|
107 | class MyServer(self.server_class):
|
---|
108 | def handle_error(self, request, client_address):
|
---|
109 | self.close_request(request)
|
---|
110 | self.server_close()
|
---|
111 | raise
|
---|
112 |
|
---|
113 | if verbose: print "creating server"
|
---|
114 | server = MyServer(addr, hdlr)
|
---|
115 | self.assertEqual(server.server_address, server.socket.getsockname())
|
---|
116 |
|
---|
117 | if verbose:
|
---|
118 | print "server created"
|
---|
119 | print "ADDR =", addr
|
---|
120 | print "CLASS =", self.server_class
|
---|
121 | print "HDLR =", server.RequestHandlerClass
|
---|
122 |
|
---|
123 | t = threading.Thread(
|
---|
124 | name='%s serving' % self.server_class,
|
---|
125 | target=server.serve_forever,
|
---|
126 | # Short poll interval to make the test finish quickly.
|
---|
127 | # Time between requests is short enough that we won't wake
|
---|
128 | # up spuriously too many times.
|
---|
129 | kwargs={'poll_interval':0.01})
|
---|
130 | t.daemon = True # In case this function raises.
|
---|
131 | t.start()
|
---|
132 | if verbose: print "server running"
|
---|
133 | return server, t
|
---|
134 |
|
---|
135 | def reap_server(self, server, thread):
|
---|
136 | if verbose: print "waiting for server"
|
---|
137 | server.shutdown()
|
---|
138 | thread.join()
|
---|
139 | if verbose: print "done"
|
---|
140 |
|
---|
141 | @contextmanager
|
---|
142 | def reaped_server(self, hdlr):
|
---|
143 | server, thread = self.make_server((support.HOST, 0), hdlr)
|
---|
144 | try:
|
---|
145 | yield server
|
---|
146 | finally:
|
---|
147 | self.reap_server(server, thread)
|
---|
148 |
|
---|
149 | @reap_threads
|
---|
150 | def test_connect(self):
|
---|
151 | with self.reaped_server(SimpleIMAPHandler) as server:
|
---|
152 | client = self.imap_class(*server.server_address)
|
---|
153 | client.shutdown()
|
---|
154 |
|
---|
155 | @reap_threads
|
---|
156 | def test_issue5949(self):
|
---|
157 |
|
---|
158 | class EOFHandler(SocketServer.StreamRequestHandler):
|
---|
159 | def handle(self):
|
---|
160 | # EOF without sending a complete welcome message.
|
---|
161 | self.wfile.write('* OK')
|
---|
162 |
|
---|
163 | with self.reaped_server(EOFHandler) as server:
|
---|
164 | self.assertRaises(imaplib.IMAP4.abort,
|
---|
165 | self.imap_class, *server.server_address)
|
---|
166 |
|
---|
167 |
|
---|
168 | class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
|
---|
169 |
|
---|
170 | server_class = SocketServer.TCPServer
|
---|
171 | imap_class = imaplib.IMAP4
|
---|
172 |
|
---|
173 |
|
---|
174 | @unittest.skipUnless(ssl, "SSL not available")
|
---|
175 | class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
|
---|
176 |
|
---|
177 | server_class = SecureTCPServer
|
---|
178 | imap_class = IMAP4_SSL
|
---|
179 |
|
---|
180 |
|
---|
181 | class RemoteIMAPTest(unittest.TestCase):
|
---|
182 | host = 'cyrus.andrew.cmu.edu'
|
---|
183 | port = 143
|
---|
184 | username = 'anonymous'
|
---|
185 | password = 'pass'
|
---|
186 | imap_class = imaplib.IMAP4
|
---|
187 |
|
---|
188 | def setUp(self):
|
---|
189 | with transient_internet(self.host):
|
---|
190 | self.server = self.imap_class(self.host, self.port)
|
---|
191 |
|
---|
192 | def tearDown(self):
|
---|
193 | if self.server is not None:
|
---|
194 | self.server.logout()
|
---|
195 |
|
---|
196 | def test_logincapa(self):
|
---|
197 | self.assertTrue('LOGINDISABLED' in self.server.capabilities)
|
---|
198 |
|
---|
199 | def test_anonlogin(self):
|
---|
200 | self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
|
---|
201 | rs = self.server.login(self.username, self.password)
|
---|
202 | self.assertEqual(rs[0], 'OK')
|
---|
203 |
|
---|
204 | def test_logout(self):
|
---|
205 | rs = self.server.logout()
|
---|
206 | self.server = None
|
---|
207 | self.assertEqual(rs[0], 'BYE')
|
---|
208 |
|
---|
209 |
|
---|
210 | @unittest.skipUnless(ssl, "SSL not available")
|
---|
211 | class RemoteIMAP_SSLTest(RemoteIMAPTest):
|
---|
212 | port = 993
|
---|
213 | imap_class = IMAP4_SSL
|
---|
214 |
|
---|
215 | def test_logincapa(self):
|
---|
216 | self.assertFalse('LOGINDISABLED' in self.server.capabilities)
|
---|
217 | self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
|
---|
218 |
|
---|
219 |
|
---|
220 | def test_main():
|
---|
221 | tests = [TestImaplib]
|
---|
222 |
|
---|
223 | if support.is_resource_enabled('network'):
|
---|
224 | if ssl:
|
---|
225 | global CERTFILE
|
---|
226 | CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
|
---|
227 | "keycert.pem")
|
---|
228 | if not os.path.exists(CERTFILE):
|
---|
229 | raise support.TestFailed("Can't read certificate files!")
|
---|
230 | tests.extend([
|
---|
231 | ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
|
---|
232 | RemoteIMAPTest, RemoteIMAP_SSLTest,
|
---|
233 | ])
|
---|
234 |
|
---|
235 | support.run_unittest(*tests)
|
---|
236 |
|
---|
237 |
|
---|
238 | if __name__ == "__main__":
|
---|
239 | support.use_resources = ['network']
|
---|
240 | test_main()
|
---|