1 | # test asynchat
|
---|
2 |
|
---|
3 | import asyncore, asynchat, socket, time
|
---|
4 | import unittest
|
---|
5 | import sys
|
---|
6 | from test import test_support
|
---|
7 | try:
|
---|
8 | import threading
|
---|
9 | except ImportError:
|
---|
10 | threading = None
|
---|
11 |
|
---|
12 | HOST = test_support.HOST
|
---|
13 | SERVER_QUIT = 'QUIT\n'
|
---|
14 |
|
---|
15 | if threading:
|
---|
16 | class echo_server(threading.Thread):
|
---|
17 | # parameter to determine the number of bytes passed back to the
|
---|
18 | # client each send
|
---|
19 | chunk_size = 1
|
---|
20 |
|
---|
21 | def __init__(self, event):
|
---|
22 | threading.Thread.__init__(self)
|
---|
23 | self.event = event
|
---|
24 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
25 | self.port = test_support.bind_port(self.sock)
|
---|
26 | # This will be set if the client wants us to wait before echoing data
|
---|
27 | # back.
|
---|
28 | self.start_resend_event = None
|
---|
29 |
|
---|
30 | def run(self):
|
---|
31 | self.sock.listen(1)
|
---|
32 | self.event.set()
|
---|
33 | conn, client = self.sock.accept()
|
---|
34 | self.buffer = ""
|
---|
35 | # collect data until quit message is seen
|
---|
36 | while SERVER_QUIT not in self.buffer:
|
---|
37 | data = conn.recv(1)
|
---|
38 | if not data:
|
---|
39 | break
|
---|
40 | self.buffer = self.buffer + data
|
---|
41 |
|
---|
42 | # remove the SERVER_QUIT message
|
---|
43 | self.buffer = self.buffer.replace(SERVER_QUIT, '')
|
---|
44 |
|
---|
45 | if self.start_resend_event:
|
---|
46 | self.start_resend_event.wait()
|
---|
47 |
|
---|
48 | # re-send entire set of collected data
|
---|
49 | try:
|
---|
50 | # this may fail on some tests, such as test_close_when_done, since
|
---|
51 | # the client closes the channel when it's done sending
|
---|
52 | while self.buffer:
|
---|
53 | n = conn.send(self.buffer[:self.chunk_size])
|
---|
54 | time.sleep(0.001)
|
---|
55 | self.buffer = self.buffer[n:]
|
---|
56 | except:
|
---|
57 | pass
|
---|
58 |
|
---|
59 | conn.close()
|
---|
60 | self.sock.close()
|
---|
61 |
|
---|
62 | class echo_client(asynchat.async_chat):
|
---|
63 |
|
---|
64 | def __init__(self, terminator, server_port):
|
---|
65 | asynchat.async_chat.__init__(self)
|
---|
66 | self.contents = []
|
---|
67 | self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
---|
68 | self.connect((HOST, server_port))
|
---|
69 | self.set_terminator(terminator)
|
---|
70 | self.buffer = ''
|
---|
71 |
|
---|
72 | def handle_connect(self):
|
---|
73 | pass
|
---|
74 |
|
---|
75 | if sys.platform == 'darwin':
|
---|
76 | # select.poll returns a select.POLLHUP at the end of the tests
|
---|
77 | # on darwin, so just ignore it
|
---|
78 | def handle_expt(self):
|
---|
79 | pass
|
---|
80 |
|
---|
81 | def collect_incoming_data(self, data):
|
---|
82 | self.buffer += data
|
---|
83 |
|
---|
84 | def found_terminator(self):
|
---|
85 | self.contents.append(self.buffer)
|
---|
86 | self.buffer = ""
|
---|
87 |
|
---|
88 |
|
---|
89 | def start_echo_server():
|
---|
90 | event = threading.Event()
|
---|
91 | s = echo_server(event)
|
---|
92 | s.start()
|
---|
93 | event.wait()
|
---|
94 | event.clear()
|
---|
95 | time.sleep(0.01) # Give server time to start accepting.
|
---|
96 | return s, event
|
---|
97 |
|
---|
98 |
|
---|
99 | @unittest.skipUnless(threading, 'Threading required for this test.')
|
---|
100 | class TestAsynchat(unittest.TestCase):
|
---|
101 | usepoll = False
|
---|
102 |
|
---|
103 | def setUp (self):
|
---|
104 | self._threads = test_support.threading_setup()
|
---|
105 |
|
---|
106 | def tearDown (self):
|
---|
107 | test_support.threading_cleanup(*self._threads)
|
---|
108 |
|
---|
109 | def line_terminator_check(self, term, server_chunk):
|
---|
110 | event = threading.Event()
|
---|
111 | s = echo_server(event)
|
---|
112 | s.chunk_size = server_chunk
|
---|
113 | s.start()
|
---|
114 | event.wait()
|
---|
115 | event.clear()
|
---|
116 | time.sleep(0.01) # Give server time to start accepting.
|
---|
117 | c = echo_client(term, s.port)
|
---|
118 | c.push("hello ")
|
---|
119 | c.push("world%s" % term)
|
---|
120 | c.push("I'm not dead yet!%s" % term)
|
---|
121 | c.push(SERVER_QUIT)
|
---|
122 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
123 | s.join()
|
---|
124 |
|
---|
125 | self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
|
---|
126 |
|
---|
127 | # the line terminator tests below check receiving variously-sized
|
---|
128 | # chunks back from the server in order to exercise all branches of
|
---|
129 | # async_chat.handle_read
|
---|
130 |
|
---|
131 | def test_line_terminator1(self):
|
---|
132 | # test one-character terminator
|
---|
133 | for l in (1,2,3):
|
---|
134 | self.line_terminator_check('\n', l)
|
---|
135 |
|
---|
136 | def test_line_terminator2(self):
|
---|
137 | # test two-character terminator
|
---|
138 | for l in (1,2,3):
|
---|
139 | self.line_terminator_check('\r\n', l)
|
---|
140 |
|
---|
141 | def test_line_terminator3(self):
|
---|
142 | # test three-character terminator
|
---|
143 | for l in (1,2,3):
|
---|
144 | self.line_terminator_check('qqq', l)
|
---|
145 |
|
---|
146 | def numeric_terminator_check(self, termlen):
|
---|
147 | # Try reading a fixed number of bytes
|
---|
148 | s, event = start_echo_server()
|
---|
149 | c = echo_client(termlen, s.port)
|
---|
150 | data = "hello world, I'm not dead yet!\n"
|
---|
151 | c.push(data)
|
---|
152 | c.push(SERVER_QUIT)
|
---|
153 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
154 | s.join()
|
---|
155 |
|
---|
156 | self.assertEqual(c.contents, [data[:termlen]])
|
---|
157 |
|
---|
158 | def test_numeric_terminator1(self):
|
---|
159 | # check that ints & longs both work (since type is
|
---|
160 | # explicitly checked in async_chat.handle_read)
|
---|
161 | self.numeric_terminator_check(1)
|
---|
162 | self.numeric_terminator_check(1L)
|
---|
163 |
|
---|
164 | def test_numeric_terminator2(self):
|
---|
165 | self.numeric_terminator_check(6L)
|
---|
166 |
|
---|
167 | def test_none_terminator(self):
|
---|
168 | # Try reading a fixed number of bytes
|
---|
169 | s, event = start_echo_server()
|
---|
170 | c = echo_client(None, s.port)
|
---|
171 | data = "hello world, I'm not dead yet!\n"
|
---|
172 | c.push(data)
|
---|
173 | c.push(SERVER_QUIT)
|
---|
174 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
175 | s.join()
|
---|
176 |
|
---|
177 | self.assertEqual(c.contents, [])
|
---|
178 | self.assertEqual(c.buffer, data)
|
---|
179 |
|
---|
180 | def test_simple_producer(self):
|
---|
181 | s, event = start_echo_server()
|
---|
182 | c = echo_client('\n', s.port)
|
---|
183 | data = "hello world\nI'm not dead yet!\n"
|
---|
184 | p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
|
---|
185 | c.push_with_producer(p)
|
---|
186 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
187 | s.join()
|
---|
188 |
|
---|
189 | self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
|
---|
190 |
|
---|
191 | def test_string_producer(self):
|
---|
192 | s, event = start_echo_server()
|
---|
193 | c = echo_client('\n', s.port)
|
---|
194 | data = "hello world\nI'm not dead yet!\n"
|
---|
195 | c.push_with_producer(data+SERVER_QUIT)
|
---|
196 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
197 | s.join()
|
---|
198 |
|
---|
199 | self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
|
---|
200 |
|
---|
201 | def test_empty_line(self):
|
---|
202 | # checks that empty lines are handled correctly
|
---|
203 | s, event = start_echo_server()
|
---|
204 | c = echo_client('\n', s.port)
|
---|
205 | c.push("hello world\n\nI'm not dead yet!\n")
|
---|
206 | c.push(SERVER_QUIT)
|
---|
207 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
208 | s.join()
|
---|
209 |
|
---|
210 | self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
|
---|
211 |
|
---|
212 | def test_close_when_done(self):
|
---|
213 | s, event = start_echo_server()
|
---|
214 | s.start_resend_event = threading.Event()
|
---|
215 | c = echo_client('\n', s.port)
|
---|
216 | c.push("hello world\nI'm not dead yet!\n")
|
---|
217 | c.push(SERVER_QUIT)
|
---|
218 | c.close_when_done()
|
---|
219 | asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
|
---|
220 |
|
---|
221 | # Only allow the server to start echoing data back to the client after
|
---|
222 | # the client has closed its connection. This prevents a race condition
|
---|
223 | # where the server echoes all of its data before we can check that it
|
---|
224 | # got any down below.
|
---|
225 | s.start_resend_event.set()
|
---|
226 | s.join()
|
---|
227 |
|
---|
228 | self.assertEqual(c.contents, [])
|
---|
229 | # the server might have been able to send a byte or two back, but this
|
---|
230 | # at least checks that it received something and didn't just fail
|
---|
231 | # (which could still result in the client not having received anything)
|
---|
232 | self.assertTrue(len(s.buffer) > 0)
|
---|
233 |
|
---|
234 |
|
---|
235 | class TestAsynchat_WithPoll(TestAsynchat):
|
---|
236 | usepoll = True
|
---|
237 |
|
---|
238 | class TestHelperFunctions(unittest.TestCase):
|
---|
239 | def test_find_prefix_at_end(self):
|
---|
240 | self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
|
---|
241 | self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
|
---|
242 |
|
---|
243 | class TestFifo(unittest.TestCase):
|
---|
244 | def test_basic(self):
|
---|
245 | f = asynchat.fifo()
|
---|
246 | f.push(7)
|
---|
247 | f.push('a')
|
---|
248 | self.assertEqual(len(f), 2)
|
---|
249 | self.assertEqual(f.first(), 7)
|
---|
250 | self.assertEqual(f.pop(), (1, 7))
|
---|
251 | self.assertEqual(len(f), 1)
|
---|
252 | self.assertEqual(f.first(), 'a')
|
---|
253 | self.assertEqual(f.is_empty(), False)
|
---|
254 | self.assertEqual(f.pop(), (1, 'a'))
|
---|
255 | self.assertEqual(len(f), 0)
|
---|
256 | self.assertEqual(f.is_empty(), True)
|
---|
257 | self.assertEqual(f.pop(), (0, None))
|
---|
258 |
|
---|
259 | def test_given_list(self):
|
---|
260 | f = asynchat.fifo(['x', 17, 3])
|
---|
261 | self.assertEqual(len(f), 3)
|
---|
262 | self.assertEqual(f.pop(), (1, 'x'))
|
---|
263 | self.assertEqual(f.pop(), (1, 17))
|
---|
264 | self.assertEqual(f.pop(), (1, 3))
|
---|
265 | self.assertEqual(f.pop(), (0, None))
|
---|
266 |
|
---|
267 |
|
---|
268 | def test_main(verbose=None):
|
---|
269 | test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
|
---|
270 | TestHelperFunctions, TestFifo)
|
---|
271 |
|
---|
272 | if __name__ == "__main__":
|
---|
273 | test_main(verbose=True)
|
---|