1 | import asyncore
|
---|
2 | import email.utils
|
---|
3 | import socket
|
---|
4 | import smtpd
|
---|
5 | import smtplib
|
---|
6 | import StringIO
|
---|
7 | import sys
|
---|
8 | import time
|
---|
9 | import select
|
---|
10 |
|
---|
11 | import unittest
|
---|
12 | from test import test_support
|
---|
13 |
|
---|
14 | try:
|
---|
15 | import threading
|
---|
16 | except ImportError:
|
---|
17 | threading = None
|
---|
18 |
|
---|
19 | HOST = test_support.HOST
|
---|
20 |
|
---|
21 | def server(evt, buf, serv):
|
---|
22 | serv.listen(5)
|
---|
23 | evt.set()
|
---|
24 | try:
|
---|
25 | conn, addr = serv.accept()
|
---|
26 | except socket.timeout:
|
---|
27 | pass
|
---|
28 | else:
|
---|
29 | n = 500
|
---|
30 | while buf and n > 0:
|
---|
31 | r, w, e = select.select([], [conn], [])
|
---|
32 | if w:
|
---|
33 | sent = conn.send(buf)
|
---|
34 | buf = buf[sent:]
|
---|
35 |
|
---|
36 | n -= 1
|
---|
37 |
|
---|
38 | conn.close()
|
---|
39 | finally:
|
---|
40 | serv.close()
|
---|
41 | evt.set()
|
---|
42 |
|
---|
43 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
44 | class GeneralTests(unittest.TestCase):
|
---|
45 |
|
---|
46 | def setUp(self):
|
---|
47 | self._threads = test_support.threading_setup()
|
---|
48 | self.evt = threading.Event()
|
---|
49 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
50 | self.sock.settimeout(15)
|
---|
51 | self.port = test_support.bind_port(self.sock)
|
---|
52 | servargs = (self.evt, "220 Hola mundo\n", self.sock)
|
---|
53 | self.thread = threading.Thread(target=server, args=servargs)
|
---|
54 | self.thread.start()
|
---|
55 | self.evt.wait()
|
---|
56 | self.evt.clear()
|
---|
57 |
|
---|
58 | def tearDown(self):
|
---|
59 | self.evt.wait()
|
---|
60 | self.thread.join()
|
---|
61 | test_support.threading_cleanup(*self._threads)
|
---|
62 |
|
---|
63 | def testBasic1(self):
|
---|
64 | # connects
|
---|
65 | smtp = smtplib.SMTP(HOST, self.port)
|
---|
66 | smtp.close()
|
---|
67 |
|
---|
68 | def testBasic2(self):
|
---|
69 | # connects, include port in host name
|
---|
70 | smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
|
---|
71 | smtp.close()
|
---|
72 |
|
---|
73 | def testLocalHostName(self):
|
---|
74 | # check that supplied local_hostname is used
|
---|
75 | smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
|
---|
76 | self.assertEqual(smtp.local_hostname, "testhost")
|
---|
77 | smtp.close()
|
---|
78 |
|
---|
79 | def testTimeoutDefault(self):
|
---|
80 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
81 | socket.setdefaulttimeout(30)
|
---|
82 | try:
|
---|
83 | smtp = smtplib.SMTP(HOST, self.port)
|
---|
84 | finally:
|
---|
85 | socket.setdefaulttimeout(None)
|
---|
86 | self.assertEqual(smtp.sock.gettimeout(), 30)
|
---|
87 | smtp.close()
|
---|
88 |
|
---|
89 | def testTimeoutNone(self):
|
---|
90 | self.assertTrue(socket.getdefaulttimeout() is None)
|
---|
91 | socket.setdefaulttimeout(30)
|
---|
92 | try:
|
---|
93 | smtp = smtplib.SMTP(HOST, self.port, timeout=None)
|
---|
94 | finally:
|
---|
95 | socket.setdefaulttimeout(None)
|
---|
96 | self.assertTrue(smtp.sock.gettimeout() is None)
|
---|
97 | smtp.close()
|
---|
98 |
|
---|
99 | def testTimeoutValue(self):
|
---|
100 | smtp = smtplib.SMTP(HOST, self.port, timeout=30)
|
---|
101 | self.assertEqual(smtp.sock.gettimeout(), 30)
|
---|
102 | smtp.close()
|
---|
103 |
|
---|
104 |
|
---|
105 | # Test server thread using the specified SMTP server class
|
---|
106 | def debugging_server(serv, serv_evt, client_evt):
|
---|
107 | serv_evt.set()
|
---|
108 |
|
---|
109 | try:
|
---|
110 | if hasattr(select, 'poll'):
|
---|
111 | poll_fun = asyncore.poll2
|
---|
112 | else:
|
---|
113 | poll_fun = asyncore.poll
|
---|
114 |
|
---|
115 | n = 1000
|
---|
116 | while asyncore.socket_map and n > 0:
|
---|
117 | poll_fun(0.01, asyncore.socket_map)
|
---|
118 |
|
---|
119 | # when the client conversation is finished, it will
|
---|
120 | # set client_evt, and it's then ok to kill the server
|
---|
121 | if client_evt.is_set():
|
---|
122 | serv.close()
|
---|
123 | break
|
---|
124 |
|
---|
125 | n -= 1
|
---|
126 |
|
---|
127 | except socket.timeout:
|
---|
128 | pass
|
---|
129 | finally:
|
---|
130 | if not client_evt.is_set():
|
---|
131 | # allow some time for the client to read the result
|
---|
132 | time.sleep(0.5)
|
---|
133 | serv.close()
|
---|
134 | asyncore.close_all()
|
---|
135 | serv_evt.set()
|
---|
136 |
|
---|
137 | MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
|
---|
138 | MSG_END = '------------ END MESSAGE ------------\n'
|
---|
139 |
|
---|
140 | # NOTE: Some SMTP objects in the tests below are created with a non-default
|
---|
141 | # local_hostname argument to the constructor, since (on some systems) the FQDN
|
---|
142 | # lookup caused by the default local_hostname sometimes takes so long that the
|
---|
143 | # test server times out, causing the test to fail.
|
---|
144 |
|
---|
145 | # Test behavior of smtpd.DebuggingServer
|
---|
146 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
147 | class DebuggingServerTests(unittest.TestCase):
|
---|
148 |
|
---|
149 | def setUp(self):
|
---|
150 | # temporarily replace sys.stdout to capture DebuggingServer output
|
---|
151 | self.old_stdout = sys.stdout
|
---|
152 | self.output = StringIO.StringIO()
|
---|
153 | sys.stdout = self.output
|
---|
154 |
|
---|
155 | self._threads = test_support.threading_setup()
|
---|
156 | self.serv_evt = threading.Event()
|
---|
157 | self.client_evt = threading.Event()
|
---|
158 | # Pick a random unused port by passing 0 for the port number
|
---|
159 | self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
|
---|
160 | # Keep a note of what port was assigned
|
---|
161 | self.port = self.serv.socket.getsockname()[1]
|
---|
162 | serv_args = (self.serv, self.serv_evt, self.client_evt)
|
---|
163 | self.thread = threading.Thread(target=debugging_server, args=serv_args)
|
---|
164 | self.thread.start()
|
---|
165 |
|
---|
166 | # wait until server thread has assigned a port number
|
---|
167 | self.serv_evt.wait()
|
---|
168 | self.serv_evt.clear()
|
---|
169 |
|
---|
170 | def tearDown(self):
|
---|
171 | # indicate that the client is finished
|
---|
172 | self.client_evt.set()
|
---|
173 | # wait for the server thread to terminate
|
---|
174 | self.serv_evt.wait()
|
---|
175 | self.thread.join()
|
---|
176 | test_support.threading_cleanup(*self._threads)
|
---|
177 | # restore sys.stdout
|
---|
178 | sys.stdout = self.old_stdout
|
---|
179 |
|
---|
180 | def testBasic(self):
|
---|
181 | # connect
|
---|
182 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
183 | smtp.quit()
|
---|
184 |
|
---|
185 | def testNOOP(self):
|
---|
186 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
187 | expected = (250, 'Ok')
|
---|
188 | self.assertEqual(smtp.noop(), expected)
|
---|
189 | smtp.quit()
|
---|
190 |
|
---|
191 | def testRSET(self):
|
---|
192 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
193 | expected = (250, 'Ok')
|
---|
194 | self.assertEqual(smtp.rset(), expected)
|
---|
195 | smtp.quit()
|
---|
196 |
|
---|
197 | def testNotImplemented(self):
|
---|
198 | # EHLO isn't implemented in DebuggingServer
|
---|
199 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
200 | expected = (502, 'Error: command "EHLO" not implemented')
|
---|
201 | self.assertEqual(smtp.ehlo(), expected)
|
---|
202 | smtp.quit()
|
---|
203 |
|
---|
204 | def testVRFY(self):
|
---|
205 | # VRFY isn't implemented in DebuggingServer
|
---|
206 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
207 | expected = (502, 'Error: command "VRFY" not implemented')
|
---|
208 | self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
|
---|
209 | self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
|
---|
210 | smtp.quit()
|
---|
211 |
|
---|
212 | def testSecondHELO(self):
|
---|
213 | # check that a second HELO returns a message that it's a duplicate
|
---|
214 | # (this behavior is specific to smtpd.SMTPChannel)
|
---|
215 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
216 | smtp.helo()
|
---|
217 | expected = (503, 'Duplicate HELO/EHLO')
|
---|
218 | self.assertEqual(smtp.helo(), expected)
|
---|
219 | smtp.quit()
|
---|
220 |
|
---|
221 | def testHELP(self):
|
---|
222 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
223 | self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
|
---|
224 | smtp.quit()
|
---|
225 |
|
---|
226 | def testSend(self):
|
---|
227 | # connect and send mail
|
---|
228 | m = 'A test message'
|
---|
229 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
|
---|
230 | smtp.sendmail('John', 'Sally', m)
|
---|
231 | # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
|
---|
232 | # in asyncore. This sleep might help, but should really be fixed
|
---|
233 | # properly by using an Event variable.
|
---|
234 | time.sleep(0.01)
|
---|
235 | smtp.quit()
|
---|
236 |
|
---|
237 | self.client_evt.set()
|
---|
238 | self.serv_evt.wait()
|
---|
239 | self.output.flush()
|
---|
240 | mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
|
---|
241 | self.assertEqual(self.output.getvalue(), mexpect)
|
---|
242 |
|
---|
243 |
|
---|
244 | class NonConnectingTests(unittest.TestCase):
|
---|
245 |
|
---|
246 | def testNotConnected(self):
|
---|
247 | # Test various operations on an unconnected SMTP object that
|
---|
248 | # should raise exceptions (at present the attempt in SMTP.send
|
---|
249 | # to reference the nonexistent 'sock' attribute of the SMTP object
|
---|
250 | # causes an AttributeError)
|
---|
251 | smtp = smtplib.SMTP()
|
---|
252 | self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
|
---|
253 | self.assertRaises(smtplib.SMTPServerDisconnected,
|
---|
254 | smtp.send, 'test msg')
|
---|
255 |
|
---|
256 | def testNonnumericPort(self):
|
---|
257 | # check that non-numeric port raises socket.error
|
---|
258 | self.assertRaises(socket.error, smtplib.SMTP,
|
---|
259 | "localhost", "bogus")
|
---|
260 | self.assertRaises(socket.error, smtplib.SMTP,
|
---|
261 | "localhost:bogus")
|
---|
262 |
|
---|
263 |
|
---|
264 | # test response of client to a non-successful HELO message
|
---|
265 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
266 | class BadHELOServerTests(unittest.TestCase):
|
---|
267 |
|
---|
268 | def setUp(self):
|
---|
269 | self.old_stdout = sys.stdout
|
---|
270 | self.output = StringIO.StringIO()
|
---|
271 | sys.stdout = self.output
|
---|
272 |
|
---|
273 | self._threads = test_support.threading_setup()
|
---|
274 | self.evt = threading.Event()
|
---|
275 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
276 | self.sock.settimeout(15)
|
---|
277 | self.port = test_support.bind_port(self.sock)
|
---|
278 | servargs = (self.evt, "199 no hello for you!\n", self.sock)
|
---|
279 | self.thread = threading.Thread(target=server, args=servargs)
|
---|
280 | self.thread.start()
|
---|
281 | self.evt.wait()
|
---|
282 | self.evt.clear()
|
---|
283 |
|
---|
284 | def tearDown(self):
|
---|
285 | self.evt.wait()
|
---|
286 | self.thread.join()
|
---|
287 | test_support.threading_cleanup(*self._threads)
|
---|
288 | sys.stdout = self.old_stdout
|
---|
289 |
|
---|
290 | def testFailingHELO(self):
|
---|
291 | self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
|
---|
292 | HOST, self.port, 'localhost', 3)
|
---|
293 |
|
---|
294 |
|
---|
295 | sim_users = {'Mr.A@somewhere.com':'John A',
|
---|
296 | 'Ms.B@somewhere.com':'Sally B',
|
---|
297 | 'Mrs.C@somewhereesle.com':'Ruth C',
|
---|
298 | }
|
---|
299 |
|
---|
300 | sim_auth = ('Mr.A@somewhere.com', 'somepassword')
|
---|
301 | sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
|
---|
302 | 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
|
---|
303 | sim_auth_credentials = {
|
---|
304 | 'login': 'TXIuQUBzb21ld2hlcmUuY29t',
|
---|
305 | 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
|
---|
306 | 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
|
---|
307 | 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
|
---|
308 | }
|
---|
309 | sim_auth_login_password = 'C29TZXBHC3N3B3JK'
|
---|
310 |
|
---|
311 | sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
|
---|
312 | 'list-2':['Ms.B@somewhere.com',],
|
---|
313 | }
|
---|
314 |
|
---|
315 | # Simulated SMTP channel & server
|
---|
316 | class SimSMTPChannel(smtpd.SMTPChannel):
|
---|
317 |
|
---|
318 | def __init__(self, extra_features, *args, **kw):
|
---|
319 | self._extrafeatures = ''.join(
|
---|
320 | [ "250-{0}\r\n".format(x) for x in extra_features ])
|
---|
321 | smtpd.SMTPChannel.__init__(self, *args, **kw)
|
---|
322 |
|
---|
323 | def smtp_EHLO(self, arg):
|
---|
324 | resp = ('250-testhost\r\n'
|
---|
325 | '250-EXPN\r\n'
|
---|
326 | '250-SIZE 20000000\r\n'
|
---|
327 | '250-STARTTLS\r\n'
|
---|
328 | '250-DELIVERBY\r\n')
|
---|
329 | resp = resp + self._extrafeatures + '250 HELP'
|
---|
330 | self.push(resp)
|
---|
331 |
|
---|
332 | def smtp_VRFY(self, arg):
|
---|
333 | # For max compatibility smtplib should be sending the raw address.
|
---|
334 | if arg in sim_users:
|
---|
335 | self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
|
---|
336 | else:
|
---|
337 | self.push('550 No such user: %s' % arg)
|
---|
338 |
|
---|
339 | def smtp_EXPN(self, arg):
|
---|
340 | list_name = arg.lower()
|
---|
341 | if list_name in sim_lists:
|
---|
342 | user_list = sim_lists[list_name]
|
---|
343 | for n, user_email in enumerate(user_list):
|
---|
344 | quoted_addr = smtplib.quoteaddr(user_email)
|
---|
345 | if n < len(user_list) - 1:
|
---|
346 | self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
|
---|
347 | else:
|
---|
348 | self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
|
---|
349 | else:
|
---|
350 | self.push('550 No access for you!')
|
---|
351 |
|
---|
352 | def smtp_AUTH(self, arg):
|
---|
353 | if arg.strip().lower()=='cram-md5':
|
---|
354 | self.push('334 {0}'.format(sim_cram_md5_challenge))
|
---|
355 | return
|
---|
356 | mech, auth = arg.split()
|
---|
357 | mech = mech.lower()
|
---|
358 | if mech not in sim_auth_credentials:
|
---|
359 | self.push('504 auth type unimplemented')
|
---|
360 | return
|
---|
361 | if mech == 'plain' and auth==sim_auth_credentials['plain']:
|
---|
362 | self.push('235 plain auth ok')
|
---|
363 | elif mech=='login' and auth==sim_auth_credentials['login']:
|
---|
364 | self.push('334 Password:')
|
---|
365 | else:
|
---|
366 | self.push('550 No access for you!')
|
---|
367 |
|
---|
368 | def handle_error(self):
|
---|
369 | raise
|
---|
370 |
|
---|
371 |
|
---|
372 | class SimSMTPServer(smtpd.SMTPServer):
|
---|
373 |
|
---|
374 | def __init__(self, *args, **kw):
|
---|
375 | self._extra_features = []
|
---|
376 | smtpd.SMTPServer.__init__(self, *args, **kw)
|
---|
377 |
|
---|
378 | def handle_accept(self):
|
---|
379 | conn, addr = self.accept()
|
---|
380 | self._SMTPchannel = SimSMTPChannel(self._extra_features,
|
---|
381 | self, conn, addr)
|
---|
382 |
|
---|
383 | def process_message(self, peer, mailfrom, rcpttos, data):
|
---|
384 | pass
|
---|
385 |
|
---|
386 | def add_feature(self, feature):
|
---|
387 | self._extra_features.append(feature)
|
---|
388 |
|
---|
389 | def handle_error(self):
|
---|
390 | raise
|
---|
391 |
|
---|
392 |
|
---|
393 | # Test various SMTP & ESMTP commands/behaviors that require a simulated server
|
---|
394 | # (i.e., something with more features than DebuggingServer)
|
---|
395 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
396 | class SMTPSimTests(unittest.TestCase):
|
---|
397 |
|
---|
398 | def setUp(self):
|
---|
399 | self._threads = test_support.threading_setup()
|
---|
400 | self.serv_evt = threading.Event()
|
---|
401 | self.client_evt = threading.Event()
|
---|
402 | # Pick a random unused port by passing 0 for the port number
|
---|
403 | self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
|
---|
404 | # Keep a note of what port was assigned
|
---|
405 | self.port = self.serv.socket.getsockname()[1]
|
---|
406 | serv_args = (self.serv, self.serv_evt, self.client_evt)
|
---|
407 | self.thread = threading.Thread(target=debugging_server, args=serv_args)
|
---|
408 | self.thread.start()
|
---|
409 |
|
---|
410 | # wait until server thread has assigned a port number
|
---|
411 | self.serv_evt.wait()
|
---|
412 | self.serv_evt.clear()
|
---|
413 |
|
---|
414 | def tearDown(self):
|
---|
415 | # indicate that the client is finished
|
---|
416 | self.client_evt.set()
|
---|
417 | # wait for the server thread to terminate
|
---|
418 | self.serv_evt.wait()
|
---|
419 | self.thread.join()
|
---|
420 | test_support.threading_cleanup(*self._threads)
|
---|
421 |
|
---|
422 | def testBasic(self):
|
---|
423 | # smoke test
|
---|
424 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
425 | smtp.quit()
|
---|
426 |
|
---|
427 | def testEHLO(self):
|
---|
428 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
429 |
|
---|
430 | # no features should be present before the EHLO
|
---|
431 | self.assertEqual(smtp.esmtp_features, {})
|
---|
432 |
|
---|
433 | # features expected from the test server
|
---|
434 | expected_features = {'expn':'',
|
---|
435 | 'size': '20000000',
|
---|
436 | 'starttls': '',
|
---|
437 | 'deliverby': '',
|
---|
438 | 'help': '',
|
---|
439 | }
|
---|
440 |
|
---|
441 | smtp.ehlo()
|
---|
442 | self.assertEqual(smtp.esmtp_features, expected_features)
|
---|
443 | for k in expected_features:
|
---|
444 | self.assertTrue(smtp.has_extn(k))
|
---|
445 | self.assertFalse(smtp.has_extn('unsupported-feature'))
|
---|
446 | smtp.quit()
|
---|
447 |
|
---|
448 | def testVRFY(self):
|
---|
449 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
450 |
|
---|
451 | for email, name in sim_users.items():
|
---|
452 | expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
|
---|
453 | self.assertEqual(smtp.vrfy(email), expected_known)
|
---|
454 |
|
---|
455 | u = 'nobody@nowhere.com'
|
---|
456 | expected_unknown = (550, 'No such user: %s' % u)
|
---|
457 | self.assertEqual(smtp.vrfy(u), expected_unknown)
|
---|
458 | smtp.quit()
|
---|
459 |
|
---|
460 | def testEXPN(self):
|
---|
461 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
462 |
|
---|
463 | for listname, members in sim_lists.items():
|
---|
464 | users = []
|
---|
465 | for m in members:
|
---|
466 | users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
|
---|
467 | expected_known = (250, '\n'.join(users))
|
---|
468 | self.assertEqual(smtp.expn(listname), expected_known)
|
---|
469 |
|
---|
470 | u = 'PSU-Members-List'
|
---|
471 | expected_unknown = (550, 'No access for you!')
|
---|
472 | self.assertEqual(smtp.expn(u), expected_unknown)
|
---|
473 | smtp.quit()
|
---|
474 |
|
---|
475 | def testAUTH_PLAIN(self):
|
---|
476 | self.serv.add_feature("AUTH PLAIN")
|
---|
477 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
478 |
|
---|
479 | expected_auth_ok = (235, b'plain auth ok')
|
---|
480 | self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)
|
---|
481 |
|
---|
482 | # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
|
---|
483 | # require a synchronous read to obtain the credentials...so instead smtpd
|
---|
484 | # sees the credential sent by smtplib's login method as an unknown command,
|
---|
485 | # which results in smtplib raising an auth error. Fortunately the error
|
---|
486 | # message contains the encoded credential, so we can partially check that it
|
---|
487 | # was generated correctly (partially, because the 'word' is uppercased in
|
---|
488 | # the error message).
|
---|
489 |
|
---|
490 | def testAUTH_LOGIN(self):
|
---|
491 | self.serv.add_feature("AUTH LOGIN")
|
---|
492 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
493 | try: smtp.login(sim_auth[0], sim_auth[1])
|
---|
494 | except smtplib.SMTPAuthenticationError as err:
|
---|
495 | if sim_auth_login_password not in str(err):
|
---|
496 | raise "expected encoded password not found in error message"
|
---|
497 |
|
---|
498 | def testAUTH_CRAM_MD5(self):
|
---|
499 | self.serv.add_feature("AUTH CRAM-MD5")
|
---|
500 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
---|
501 |
|
---|
502 | try: smtp.login(sim_auth[0], sim_auth[1])
|
---|
503 | except smtplib.SMTPAuthenticationError as err:
|
---|
504 | if sim_auth_credentials['cram-md5'] not in str(err):
|
---|
505 | raise "expected encoded credentials not found in error message"
|
---|
506 |
|
---|
507 | #TODO: add tests for correct AUTH method fallback now that the
|
---|
508 | #test infrastructure can support it.
|
---|
509 |
|
---|
510 |
|
---|
511 | def test_main(verbose=None):
|
---|
512 | test_support.run_unittest(GeneralTests, DebuggingServerTests,
|
---|
513 | NonConnectingTests,
|
---|
514 | BadHELOServerTests, SMTPSimTests)
|
---|
515 |
|
---|
516 | if __name__ == '__main__':
|
---|
517 | test_main()
|
---|