1 | # -*- Mode: Python; tab-width: 4 -*-
|
---|
2 | # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
|
---|
3 | # Author: Sam Rushing <rushing@nightmare.com>
|
---|
4 |
|
---|
5 | # ======================================================================
|
---|
6 | # Copyright 1996 by Sam Rushing
|
---|
7 | #
|
---|
8 | # All Rights Reserved
|
---|
9 | #
|
---|
10 | # Permission to use, copy, modify, and distribute this software and
|
---|
11 | # its documentation for any purpose and without fee is hereby
|
---|
12 | # granted, provided that the above copyright notice appear in all
|
---|
13 | # copies and that both that copyright notice and this permission
|
---|
14 | # notice appear in supporting documentation, and that the name of Sam
|
---|
15 | # Rushing not be used in advertising or publicity pertaining to
|
---|
16 | # distribution of the software without specific, written prior
|
---|
17 | # permission.
|
---|
18 | #
|
---|
19 | # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
---|
20 | # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
---|
21 | # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
---|
22 | # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
---|
23 | # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
---|
24 | # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
---|
25 | # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
---|
26 | # ======================================================================
|
---|
27 |
|
---|
28 | r"""A class supporting chat-style (command/response) protocols.
|
---|
29 |
|
---|
30 | This class adds support for 'chat' style protocols - where one side
|
---|
31 | sends a 'command', and the other sends a response (examples would be
|
---|
32 | the common internet protocols - smtp, nntp, ftp, etc..).
|
---|
33 |
|
---|
34 | The handle_read() method looks at the input stream for the current
|
---|
35 | 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
|
---|
36 | for multi-line output), calling self.found_terminator() on its
|
---|
37 | receipt.
|
---|
38 |
|
---|
39 | for example:
|
---|
40 | Say you build an async nntp client using this class. At the start
|
---|
41 | of the connection, you'll have self.terminator set to '\r\n', in
|
---|
42 | order to process the single-line greeting. Just before issuing a
|
---|
43 | 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
|
---|
44 | command will be accumulated (using your own 'collect_incoming_data'
|
---|
45 | method) up to the terminator, and then control will be returned to
|
---|
46 | you - by calling your self.found_terminator() method.
|
---|
47 | """
|
---|
48 |
|
---|
49 | import socket
|
---|
50 | import asyncore
|
---|
51 | from collections import deque
|
---|
52 |
|
---|
53 | class async_chat (asyncore.dispatcher):
|
---|
54 | """This is an abstract class. You must derive from this class, and add
|
---|
55 | the two methods collect_incoming_data() and found_terminator()"""
|
---|
56 |
|
---|
57 | # these are overridable defaults
|
---|
58 |
|
---|
59 | ac_in_buffer_size = 4096
|
---|
60 | ac_out_buffer_size = 4096
|
---|
61 |
|
---|
62 | def __init__ (self, conn=None):
|
---|
63 | self.ac_in_buffer = ''
|
---|
64 | self.ac_out_buffer = ''
|
---|
65 | self.producer_fifo = fifo()
|
---|
66 | asyncore.dispatcher.__init__ (self, conn)
|
---|
67 |
|
---|
68 | def collect_incoming_data(self, data):
|
---|
69 | raise NotImplementedError, "must be implemented in subclass"
|
---|
70 |
|
---|
71 | def found_terminator(self):
|
---|
72 | raise NotImplementedError, "must be implemented in subclass"
|
---|
73 |
|
---|
74 | def set_terminator (self, term):
|
---|
75 | "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
|
---|
76 | self.terminator = term
|
---|
77 |
|
---|
78 | def get_terminator (self):
|
---|
79 | return self.terminator
|
---|
80 |
|
---|
81 | # grab some more data from the socket,
|
---|
82 | # throw it to the collector method,
|
---|
83 | # check for the terminator,
|
---|
84 | # if found, transition to the next state.
|
---|
85 |
|
---|
86 | def handle_read (self):
|
---|
87 |
|
---|
88 | try:
|
---|
89 | data = self.recv (self.ac_in_buffer_size)
|
---|
90 | except socket.error, why:
|
---|
91 | self.handle_error()
|
---|
92 | return
|
---|
93 |
|
---|
94 | self.ac_in_buffer = self.ac_in_buffer + data
|
---|
95 |
|
---|
96 | # Continue to search for self.terminator in self.ac_in_buffer,
|
---|
97 | # while calling self.collect_incoming_data. The while loop
|
---|
98 | # is necessary because we might read several data+terminator
|
---|
99 | # combos with a single recv(1024).
|
---|
100 |
|
---|
101 | while self.ac_in_buffer:
|
---|
102 | lb = len(self.ac_in_buffer)
|
---|
103 | terminator = self.get_terminator()
|
---|
104 | if not terminator:
|
---|
105 | # no terminator, collect it all
|
---|
106 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
107 | self.ac_in_buffer = ''
|
---|
108 | elif isinstance(terminator, int) or isinstance(terminator, long):
|
---|
109 | # numeric terminator
|
---|
110 | n = terminator
|
---|
111 | if lb < n:
|
---|
112 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
113 | self.ac_in_buffer = ''
|
---|
114 | self.terminator = self.terminator - lb
|
---|
115 | else:
|
---|
116 | self.collect_incoming_data (self.ac_in_buffer[:n])
|
---|
117 | self.ac_in_buffer = self.ac_in_buffer[n:]
|
---|
118 | self.terminator = 0
|
---|
119 | self.found_terminator()
|
---|
120 | else:
|
---|
121 | # 3 cases:
|
---|
122 | # 1) end of buffer matches terminator exactly:
|
---|
123 | # collect data, transition
|
---|
124 | # 2) end of buffer matches some prefix:
|
---|
125 | # collect data to the prefix
|
---|
126 | # 3) end of buffer does not match any prefix:
|
---|
127 | # collect data
|
---|
128 | terminator_len = len(terminator)
|
---|
129 | index = self.ac_in_buffer.find(terminator)
|
---|
130 | if index != -1:
|
---|
131 | # we found the terminator
|
---|
132 | if index > 0:
|
---|
133 | # don't bother reporting the empty string (source of subtle bugs)
|
---|
134 | self.collect_incoming_data (self.ac_in_buffer[:index])
|
---|
135 | self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
|
---|
136 | # This does the Right Thing if the terminator is changed here.
|
---|
137 | self.found_terminator()
|
---|
138 | else:
|
---|
139 | # check for a prefix of the terminator
|
---|
140 | index = find_prefix_at_end (self.ac_in_buffer, terminator)
|
---|
141 | if index:
|
---|
142 | if index != lb:
|
---|
143 | # we found a prefix, collect up to the prefix
|
---|
144 | self.collect_incoming_data (self.ac_in_buffer[:-index])
|
---|
145 | self.ac_in_buffer = self.ac_in_buffer[-index:]
|
---|
146 | break
|
---|
147 | else:
|
---|
148 | # no prefix, collect it all
|
---|
149 | self.collect_incoming_data (self.ac_in_buffer)
|
---|
150 | self.ac_in_buffer = ''
|
---|
151 |
|
---|
152 | def handle_write (self):
|
---|
153 | self.initiate_send ()
|
---|
154 |
|
---|
155 | def handle_close (self):
|
---|
156 | self.close()
|
---|
157 |
|
---|
158 | def push (self, data):
|
---|
159 | self.producer_fifo.push (simple_producer (data))
|
---|
160 | self.initiate_send()
|
---|
161 |
|
---|
162 | def push_with_producer (self, producer):
|
---|
163 | self.producer_fifo.push (producer)
|
---|
164 | self.initiate_send()
|
---|
165 |
|
---|
166 | def readable (self):
|
---|
167 | "predicate for inclusion in the readable for select()"
|
---|
168 | return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
|
---|
169 |
|
---|
170 | def writable (self):
|
---|
171 | "predicate for inclusion in the writable for select()"
|
---|
172 | # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
|
---|
173 | # this is about twice as fast, though not as clear.
|
---|
174 | return not (
|
---|
175 | (self.ac_out_buffer == '') and
|
---|
176 | self.producer_fifo.is_empty() and
|
---|
177 | self.connected
|
---|
178 | )
|
---|
179 |
|
---|
180 | def close_when_done (self):
|
---|
181 | "automatically close this channel once the outgoing queue is empty"
|
---|
182 | self.producer_fifo.push (None)
|
---|
183 |
|
---|
184 | # refill the outgoing buffer by calling the more() method
|
---|
185 | # of the first producer in the queue
|
---|
186 | def refill_buffer (self):
|
---|
187 | while 1:
|
---|
188 | if len(self.producer_fifo):
|
---|
189 | p = self.producer_fifo.first()
|
---|
190 | # a 'None' in the producer fifo is a sentinel,
|
---|
191 | # telling us to close the channel.
|
---|
192 | if p is None:
|
---|
193 | if not self.ac_out_buffer:
|
---|
194 | self.producer_fifo.pop()
|
---|
195 | self.close()
|
---|
196 | return
|
---|
197 | elif isinstance(p, str):
|
---|
198 | self.producer_fifo.pop()
|
---|
199 | self.ac_out_buffer = self.ac_out_buffer + p
|
---|
200 | return
|
---|
201 | data = p.more()
|
---|
202 | if data:
|
---|
203 | self.ac_out_buffer = self.ac_out_buffer + data
|
---|
204 | return
|
---|
205 | else:
|
---|
206 | self.producer_fifo.pop()
|
---|
207 | else:
|
---|
208 | return
|
---|
209 |
|
---|
210 | def initiate_send (self):
|
---|
211 | obs = self.ac_out_buffer_size
|
---|
212 | # try to refill the buffer
|
---|
213 | if (len (self.ac_out_buffer) < obs):
|
---|
214 | self.refill_buffer()
|
---|
215 |
|
---|
216 | if self.ac_out_buffer and self.connected:
|
---|
217 | # try to send the buffer
|
---|
218 | try:
|
---|
219 | num_sent = self.send (self.ac_out_buffer[:obs])
|
---|
220 | if num_sent:
|
---|
221 | self.ac_out_buffer = self.ac_out_buffer[num_sent:]
|
---|
222 |
|
---|
223 | except socket.error, why:
|
---|
224 | self.handle_error()
|
---|
225 | return
|
---|
226 |
|
---|
227 | def discard_buffers (self):
|
---|
228 | # Emergencies only!
|
---|
229 | self.ac_in_buffer = ''
|
---|
230 | self.ac_out_buffer = ''
|
---|
231 | while self.producer_fifo:
|
---|
232 | self.producer_fifo.pop()
|
---|
233 |
|
---|
234 |
|
---|
235 | class simple_producer:
|
---|
236 |
|
---|
237 | def __init__ (self, data, buffer_size=512):
|
---|
238 | self.data = data
|
---|
239 | self.buffer_size = buffer_size
|
---|
240 |
|
---|
241 | def more (self):
|
---|
242 | if len (self.data) > self.buffer_size:
|
---|
243 | result = self.data[:self.buffer_size]
|
---|
244 | self.data = self.data[self.buffer_size:]
|
---|
245 | return result
|
---|
246 | else:
|
---|
247 | result = self.data
|
---|
248 | self.data = ''
|
---|
249 | return result
|
---|
250 |
|
---|
251 | class fifo:
|
---|
252 | def __init__ (self, list=None):
|
---|
253 | if not list:
|
---|
254 | self.list = deque()
|
---|
255 | else:
|
---|
256 | self.list = deque(list)
|
---|
257 |
|
---|
258 | def __len__ (self):
|
---|
259 | return len(self.list)
|
---|
260 |
|
---|
261 | def is_empty (self):
|
---|
262 | return not self.list
|
---|
263 |
|
---|
264 | def first (self):
|
---|
265 | return self.list[0]
|
---|
266 |
|
---|
267 | def push (self, data):
|
---|
268 | self.list.append(data)
|
---|
269 |
|
---|
270 | def pop (self):
|
---|
271 | if self.list:
|
---|
272 | return (1, self.list.popleft())
|
---|
273 | else:
|
---|
274 | return (0, None)
|
---|
275 |
|
---|
276 | # Given 'haystack', see if any prefix of 'needle' is at its end. This
|
---|
277 | # assumes an exact match has already been checked. Return the number of
|
---|
278 | # characters matched.
|
---|
279 | # for example:
|
---|
280 | # f_p_a_e ("qwerty\r", "\r\n") => 1
|
---|
281 | # f_p_a_e ("qwertydkjf", "\r\n") => 0
|
---|
282 | # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
|
---|
283 |
|
---|
284 | # this could maybe be made faster with a computed regex?
|
---|
285 | # [answer: no; circa Python-2.0, Jan 2001]
|
---|
286 | # new python: 28961/s
|
---|
287 | # old python: 18307/s
|
---|
288 | # re: 12820/s
|
---|
289 | # regex: 14035/s
|
---|
290 |
|
---|
291 | def find_prefix_at_end (haystack, needle):
|
---|
292 | l = len(needle) - 1
|
---|
293 | while l and not haystack.endswith(needle[:l]):
|
---|
294 | l -= 1
|
---|
295 | return l
|
---|