1 | # -*- Mode: Python; tab-width: 4 -*-
|
---|
2 | # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
|
---|
3 | # Author: Sam Rushing <rushing@nightmare.com>
|
---|
4 |
|
---|
5 | # ======================================================================
|
---|
6 | # Copyright 1996 by Sam Rushing
|
---|
7 | #
|
---|
8 | # All Rights Reserved
|
---|
9 | #
|
---|
10 | # Permission to use, copy, modify, and distribute this software and
|
---|
11 | # its documentation for any purpose and without fee is hereby
|
---|
12 | # granted, provided that the above copyright notice appear in all
|
---|
13 | # copies and that both that copyright notice and this permission
|
---|
14 | # notice appear in supporting documentation, and that the name of Sam
|
---|
15 | # Rushing not be used in advertising or publicity pertaining to
|
---|
16 | # distribution of the software without specific, written prior
|
---|
17 | # permission.
|
---|
18 | #
|
---|
19 | # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
---|
20 | # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
---|
21 | # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
---|
22 | # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
---|
23 | # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
---|
24 | # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
---|
25 | # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
---|
26 | # ======================================================================
|
---|
27 |
|
---|
28 | r"""A class supporting chat-style (command/response) protocols.
|
---|
29 |
|
---|
30 | This class adds support for 'chat' style protocols - where one side
|
---|
31 | sends a 'command', and the other sends a response (examples would be
|
---|
32 | the common internet protocols - smtp, nntp, ftp, etc..).
|
---|
33 |
|
---|
34 | The handle_read() method looks at the input stream for the current
|
---|
35 | 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
|
---|
36 | for multi-line output), calling self.found_terminator() on its
|
---|
37 | receipt.
|
---|
38 |
|
---|
39 | for example:
|
---|
40 | Say you build an async nntp client using this class. At the start
|
---|
41 | of the connection, you'll have self.terminator set to '\r\n', in
|
---|
42 | order to process the single-line greeting. Just before issuing a
|
---|
43 | 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
|
---|
44 | command will be accumulated (using your own 'collect_incoming_data'
|
---|
45 | method) up to the terminator, and then control will be returned to
|
---|
46 | you - by calling your self.found_terminator() method.
|
---|
47 | """
|
---|
48 |
|
---|
49 | import socket
|
---|
50 | import asyncore
|
---|
51 | from collections import deque
|
---|
52 | from sys import py3kwarning
|
---|
53 | from warnings import filterwarnings, catch_warnings
|
---|
54 |
|
---|
55 | class async_chat (asyncore.dispatcher):
|
---|
56 | """This is an abstract class. You must derive from this class, and add
|
---|
57 | the two methods collect_incoming_data() and found_terminator()"""
|
---|
58 |
|
---|
59 | # these are overridable defaults
|
---|
60 |
|
---|
61 | ac_in_buffer_size = 4096
|
---|
62 | ac_out_buffer_size = 4096
|
---|
63 |
|
---|
64 | def __init__ (self, sock=None, map=None):
|
---|
65 | # for string terminator matching
|
---|
66 | self.ac_in_buffer = ''
|
---|
67 |
|
---|
68 | # we use a list here rather than cStringIO for a few reasons...
|
---|
69 | # del lst[:] is faster than sio.truncate(0)
|
---|
70 | # lst = [] is faster than sio.truncate(0)
|
---|
71 | # cStringIO will be gaining unicode support in py3k, which
|
---|
72 | # will negatively affect the performance of bytes compared to
|
---|
73 | # a ''.join() equivalent
|
---|
74 | self.incoming = []
|
---|
75 |
|
---|
76 | # we toss the use of the "simple producer" and replace it with
|
---|
77 | # a pure deque, which the original fifo was a wrapping of
|
---|
78 | self.producer_fifo = deque()
|
---|
79 | asyncore.dispatcher.__init__ (self, sock, map)
|
---|
80 |
|
---|
81 | def collect_incoming_data(self, data):
|
---|
82 | raise NotImplementedError("must be implemented in subclass")
|
---|
83 |
|
---|
84 | def _collect_incoming_data(self, data):
|
---|
85 | self.incoming.append(data)
|
---|
86 |
|
---|
87 | def _get_data(self):
|
---|
88 | d = ''.join(self.incoming)
|
---|
89 | del self.incoming[:]
|
---|
90 | return d
|
---|
91 |
|
---|
92 | def found_terminator(self):
|
---|
93 | raise NotImplementedError("must be implemented in subclass")
|
---|
94 |
|
---|
95 | def set_terminator (self, term):
|
---|
96 | "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
|
---|
97 | self.terminator = term
|
---|
98 |
|
---|
99 | def get_terminator (self):
|
---|
100 | return self.terminator
|
---|
101 |
|
---|
102 | # grab some more data from the socket,
|
---|
103 | # throw it to the collector method,
|
---|
104 | # check for the terminator,
|
---|
105 | # if found, transition to the next state.
|
---|
106 |
|
---|
107 | def handle_read (self):
|
---|
108 |
|
---|
109 | try:
|
---|
110 | data = self.recv (self.ac_in_buffer_size)
|
---|
111 | except socket.error, why:
|
---|
112 | self.handle_error()
|
---|
113 | return
|
---|
114 |
|
---|
115 | self.ac_in_buffer = self.ac_in_buffer + data
|
---|
116 |
|
---|
117 | # Continue to search for self.terminator in self.ac_in_buffer,
|
---|
118 | # while calling self.collect_incoming_data. The while loop
|
---|
119 | # is necessary because we might read several data+terminator
|
---|
120 | # combos with a single recv(4096).
|
---|
121 |
|
---|
122 | while self.ac_in_buffer:
|
---|
123 | lb = len(self.ac_in_buffer)
|
---|
124 | terminator = self.get_terminator()
|
---|
125 | if not terminator:
|
---|
126 | # no terminator, collect it all
|
---|
127 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
128 | self.ac_in_buffer = ''
|
---|
129 | elif isinstance(terminator, int) or isinstance(terminator, long):
|
---|
130 | # numeric terminator
|
---|
131 | n = terminator
|
---|
132 | if lb < n:
|
---|
133 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
134 | self.ac_in_buffer = ''
|
---|
135 | self.terminator = self.terminator - lb
|
---|
136 | else:
|
---|
137 | self.collect_incoming_data (self.ac_in_buffer[:n])
|
---|
138 | self.ac_in_buffer = self.ac_in_buffer[n:]
|
---|
139 | self.terminator = 0
|
---|
140 | self.found_terminator()
|
---|
141 | else:
|
---|
142 | # 3 cases:
|
---|
143 | # 1) end of buffer matches terminator exactly:
|
---|
144 | # collect data, transition
|
---|
145 | # 2) end of buffer matches some prefix:
|
---|
146 | # collect data to the prefix
|
---|
147 | # 3) end of buffer does not match any prefix:
|
---|
148 | # collect data
|
---|
149 | terminator_len = len(terminator)
|
---|
150 | index = self.ac_in_buffer.find(terminator)
|
---|
151 | if index != -1:
|
---|
152 | # we found the terminator
|
---|
153 | if index > 0:
|
---|
154 | # don't bother reporting the empty string (source of subtle bugs)
|
---|
155 | self.collect_incoming_data (self.ac_in_buffer[:index])
|
---|
156 | self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
|
---|
157 | # This does the Right Thing if the terminator is changed here.
|
---|
158 | self.found_terminator()
|
---|
159 | else:
|
---|
160 | # check for a prefix of the terminator
|
---|
161 | index = find_prefix_at_end (self.ac_in_buffer, terminator)
|
---|
162 | if index:
|
---|
163 | if index != lb:
|
---|
164 | # we found a prefix, collect up to the prefix
|
---|
165 | self.collect_incoming_data (self.ac_in_buffer[:-index])
|
---|
166 | self.ac_in_buffer = self.ac_in_buffer[-index:]
|
---|
167 | break
|
---|
168 | else:
|
---|
169 | # no prefix, collect it all
|
---|
170 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
171 | self.ac_in_buffer = ''
|
---|
172 |
|
---|
173 | def handle_write (self):
|
---|
174 | self.initiate_send()
|
---|
175 |
|
---|
176 | def handle_close (self):
|
---|
177 | self.close()
|
---|
178 |
|
---|
179 | def push (self, data):
|
---|
180 | sabs = self.ac_out_buffer_size
|
---|
181 | if len(data) > sabs:
|
---|
182 | for i in xrange(0, len(data), sabs):
|
---|
183 | self.producer_fifo.append(data[i:i+sabs])
|
---|
184 | else:
|
---|
185 | self.producer_fifo.append(data)
|
---|
186 | self.initiate_send()
|
---|
187 |
|
---|
188 | def push_with_producer (self, producer):
|
---|
189 | self.producer_fifo.append(producer)
|
---|
190 | self.initiate_send()
|
---|
191 |
|
---|
192 | def readable (self):
|
---|
193 | "predicate for inclusion in the readable for select()"
|
---|
194 | # cannot use the old predicate, it violates the claim of the
|
---|
195 | # set_terminator method.
|
---|
196 |
|
---|
197 | # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
|
---|
198 | return 1
|
---|
199 |
|
---|
200 | def writable (self):
|
---|
201 | "predicate for inclusion in the writable for select()"
|
---|
202 | return self.producer_fifo or (not self.connected)
|
---|
203 |
|
---|
204 | def close_when_done (self):
|
---|
205 | "automatically close this channel once the outgoing queue is empty"
|
---|
206 | self.producer_fifo.append(None)
|
---|
207 |
|
---|
208 | def initiate_send(self):
|
---|
209 | while self.producer_fifo and self.connected:
|
---|
210 | first = self.producer_fifo[0]
|
---|
211 | # handle empty string/buffer or None entry
|
---|
212 | if not first:
|
---|
213 | del self.producer_fifo[0]
|
---|
214 | if first is None:
|
---|
215 | self.handle_close()
|
---|
216 | return
|
---|
217 |
|
---|
218 | # handle classic producer behavior
|
---|
219 | obs = self.ac_out_buffer_size
|
---|
220 | try:
|
---|
221 | with catch_warnings():
|
---|
222 | if py3kwarning:
|
---|
223 | filterwarnings("ignore", ".*buffer", DeprecationWarning)
|
---|
224 | data = buffer(first, 0, obs)
|
---|
225 | except TypeError:
|
---|
226 | data = first.more()
|
---|
227 | if data:
|
---|
228 | self.producer_fifo.appendleft(data)
|
---|
229 | else:
|
---|
230 | del self.producer_fifo[0]
|
---|
231 | continue
|
---|
232 |
|
---|
233 | # send the data
|
---|
234 | try:
|
---|
235 | num_sent = self.send(data)
|
---|
236 | except socket.error:
|
---|
237 | self.handle_error()
|
---|
238 | return
|
---|
239 |
|
---|
240 | if num_sent:
|
---|
241 | if num_sent < len(data) or obs < len(first):
|
---|
242 | self.producer_fifo[0] = first[num_sent:]
|
---|
243 | else:
|
---|
244 | del self.producer_fifo[0]
|
---|
245 | # we tried to send some actual data
|
---|
246 | return
|
---|
247 |
|
---|
248 | def discard_buffers (self):
|
---|
249 | # Emergencies only!
|
---|
250 | self.ac_in_buffer = ''
|
---|
251 | del self.incoming[:]
|
---|
252 | self.producer_fifo.clear()
|
---|
253 |
|
---|
254 | class simple_producer:
|
---|
255 |
|
---|
256 | def __init__ (self, data, buffer_size=512):
|
---|
257 | self.data = data
|
---|
258 | self.buffer_size = buffer_size
|
---|
259 |
|
---|
260 | def more (self):
|
---|
261 | if len (self.data) > self.buffer_size:
|
---|
262 | result = self.data[:self.buffer_size]
|
---|
263 | self.data = self.data[self.buffer_size:]
|
---|
264 | return result
|
---|
265 | else:
|
---|
266 | result = self.data
|
---|
267 | self.data = ''
|
---|
268 | return result
|
---|
269 |
|
---|
270 | class fifo:
|
---|
271 | def __init__ (self, list=None):
|
---|
272 | if not list:
|
---|
273 | self.list = deque()
|
---|
274 | else:
|
---|
275 | self.list = deque(list)
|
---|
276 |
|
---|
277 | def __len__ (self):
|
---|
278 | return len(self.list)
|
---|
279 |
|
---|
280 | def is_empty (self):
|
---|
281 | return not self.list
|
---|
282 |
|
---|
283 | def first (self):
|
---|
284 | return self.list[0]
|
---|
285 |
|
---|
286 | def push (self, data):
|
---|
287 | self.list.append(data)
|
---|
288 |
|
---|
289 | def pop (self):
|
---|
290 | if self.list:
|
---|
291 | return (1, self.list.popleft())
|
---|
292 | else:
|
---|
293 | return (0, None)
|
---|
294 |
|
---|
295 | # Given 'haystack', see if any prefix of 'needle' is at its end. This
|
---|
296 | # assumes an exact match has already been checked. Return the number of
|
---|
297 | # characters matched.
|
---|
298 | # for example:
|
---|
299 | # f_p_a_e ("qwerty\r", "\r\n") => 1
|
---|
300 | # f_p_a_e ("qwertydkjf", "\r\n") => 0
|
---|
301 | # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
|
---|
302 |
|
---|
303 | # this could maybe be made faster with a computed regex?
|
---|
304 | # [answer: no; circa Python-2.0, Jan 2001]
|
---|
305 | # new python: 28961/s
|
---|
306 | # old python: 18307/s
|
---|
307 | # re: 12820/s
|
---|
308 | # regex: 14035/s
|
---|
309 |
|
---|
310 | def find_prefix_at_end (haystack, needle):
|
---|
311 | l = len(needle) - 1
|
---|
312 | while l and not haystack.endswith(needle[:l]):
|
---|
313 | l -= 1
|
---|
314 | return l
|
---|