| 1 | # -*- Mode: Python; tab-width: 4 -*-
|
|---|
| 2 | # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
|
|---|
| 3 | # Author: Sam Rushing <rushing@nightmare.com>
|
|---|
| 4 |
|
|---|
| 5 | # ======================================================================
|
|---|
| 6 | # Copyright 1996 by Sam Rushing
|
|---|
| 7 | #
|
|---|
| 8 | # All Rights Reserved
|
|---|
| 9 | #
|
|---|
| 10 | # Permission to use, copy, modify, and distribute this software and
|
|---|
| 11 | # its documentation for any purpose and without fee is hereby
|
|---|
| 12 | # granted, provided that the above copyright notice appear in all
|
|---|
| 13 | # copies and that both that copyright notice and this permission
|
|---|
| 14 | # notice appear in supporting documentation, and that the name of Sam
|
|---|
| 15 | # Rushing not be used in advertising or publicity pertaining to
|
|---|
| 16 | # distribution of the software without specific, written prior
|
|---|
| 17 | # permission.
|
|---|
| 18 | #
|
|---|
| 19 | # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
|---|
| 20 | # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
|---|
| 21 | # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
|---|
| 22 | # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
|---|
| 23 | # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
|---|
| 24 | # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
|---|
| 25 | # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|---|
| 26 | # ======================================================================
|
|---|
| 27 |
|
|---|
| 28 | r"""A class supporting chat-style (command/response) protocols.
|
|---|
| 29 |
|
|---|
| 30 | This class adds support for 'chat' style protocols - where one side
|
|---|
| 31 | sends a 'command', and the other sends a response (examples would be
|
|---|
| 32 | the common internet protocols - smtp, nntp, ftp, etc..).
|
|---|
| 33 |
|
|---|
| 34 | The handle_read() method looks at the input stream for the current
|
|---|
| 35 | 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
|
|---|
| 36 | for multi-line output), calling self.found_terminator() on its
|
|---|
| 37 | receipt.
|
|---|
| 38 |
|
|---|
| 39 | for example:
|
|---|
| 40 | Say you build an async nntp client using this class. At the start
|
|---|
| 41 | of the connection, you'll have self.terminator set to '\r\n', in
|
|---|
| 42 | order to process the single-line greeting. Just before issuing a
|
|---|
| 43 | 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
|
|---|
| 44 | command will be accumulated (using your own 'collect_incoming_data'
|
|---|
| 45 | method) up to the terminator, and then control will be returned to
|
|---|
| 46 | you - by calling your self.found_terminator() method.
|
|---|
| 47 | """
|
|---|
| 48 |
|
|---|
| 49 | import socket
|
|---|
| 50 | import asyncore
|
|---|
| 51 | from collections import deque
|
|---|
| 52 | from sys import py3kwarning
|
|---|
| 53 | from warnings import filterwarnings, catch_warnings
|
|---|
| 54 |
|
|---|
| 55 | class async_chat (asyncore.dispatcher):
|
|---|
| 56 | """This is an abstract class. You must derive from this class, and add
|
|---|
| 57 | the two methods collect_incoming_data() and found_terminator()"""
|
|---|
| 58 |
|
|---|
| 59 | # these are overridable defaults
|
|---|
| 60 |
|
|---|
| 61 | ac_in_buffer_size = 4096
|
|---|
| 62 | ac_out_buffer_size = 4096
|
|---|
| 63 |
|
|---|
| 64 | def __init__ (self, sock=None, map=None):
|
|---|
| 65 | # for string terminator matching
|
|---|
| 66 | self.ac_in_buffer = ''
|
|---|
| 67 |
|
|---|
| 68 | # we use a list here rather than cStringIO for a few reasons...
|
|---|
| 69 | # del lst[:] is faster than sio.truncate(0)
|
|---|
| 70 | # lst = [] is faster than sio.truncate(0)
|
|---|
| 71 | # cStringIO will be gaining unicode support in py3k, which
|
|---|
| 72 | # will negatively affect the performance of bytes compared to
|
|---|
| 73 | # a ''.join() equivalent
|
|---|
| 74 | self.incoming = []
|
|---|
| 75 |
|
|---|
| 76 | # we toss the use of the "simple producer" and replace it with
|
|---|
| 77 | # a pure deque, which the original fifo was a wrapping of
|
|---|
| 78 | self.producer_fifo = deque()
|
|---|
| 79 | asyncore.dispatcher.__init__ (self, sock, map)
|
|---|
| 80 |
|
|---|
| 81 | def collect_incoming_data(self, data):
|
|---|
| 82 | raise NotImplementedError("must be implemented in subclass")
|
|---|
| 83 |
|
|---|
| 84 | def _collect_incoming_data(self, data):
|
|---|
| 85 | self.incoming.append(data)
|
|---|
| 86 |
|
|---|
| 87 | def _get_data(self):
|
|---|
| 88 | d = ''.join(self.incoming)
|
|---|
| 89 | del self.incoming[:]
|
|---|
| 90 | return d
|
|---|
| 91 |
|
|---|
| 92 | def found_terminator(self):
|
|---|
| 93 | raise NotImplementedError("must be implemented in subclass")
|
|---|
| 94 |
|
|---|
| 95 | def set_terminator (self, term):
|
|---|
| 96 | "Set the input delimiter. Can be a fixed string of any length, an integer, or None"
|
|---|
| 97 | self.terminator = term
|
|---|
| 98 |
|
|---|
| 99 | def get_terminator (self):
|
|---|
| 100 | return self.terminator
|
|---|
| 101 |
|
|---|
| 102 | # grab some more data from the socket,
|
|---|
| 103 | # throw it to the collector method,
|
|---|
| 104 | # check for the terminator,
|
|---|
| 105 | # if found, transition to the next state.
|
|---|
| 106 |
|
|---|
| 107 | def handle_read (self):
|
|---|
| 108 |
|
|---|
| 109 | try:
|
|---|
| 110 | data = self.recv (self.ac_in_buffer_size)
|
|---|
| 111 | except socket.error, why:
|
|---|
| 112 | self.handle_error()
|
|---|
| 113 | return
|
|---|
| 114 |
|
|---|
| 115 | self.ac_in_buffer = self.ac_in_buffer + data
|
|---|
| 116 |
|
|---|
| 117 | # Continue to search for self.terminator in self.ac_in_buffer,
|
|---|
| 118 | # while calling self.collect_incoming_data. The while loop
|
|---|
| 119 | # is necessary because we might read several data+terminator
|
|---|
| 120 | # combos with a single recv(4096).
|
|---|
| 121 |
|
|---|
| 122 | while self.ac_in_buffer:
|
|---|
| 123 | lb = len(self.ac_in_buffer)
|
|---|
| 124 | terminator = self.get_terminator()
|
|---|
| 125 | if not terminator:
|
|---|
| 126 | # no terminator, collect it all
|
|---|
| 127 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 128 | self.ac_in_buffer = ''
|
|---|
| 129 | elif isinstance(terminator, int) or isinstance(terminator, long):
|
|---|
| 130 | # numeric terminator
|
|---|
| 131 | n = terminator
|
|---|
| 132 | if lb < n:
|
|---|
| 133 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 134 | self.ac_in_buffer = ''
|
|---|
| 135 | self.terminator = self.terminator - lb
|
|---|
| 136 | else:
|
|---|
| 137 | self.collect_incoming_data (self.ac_in_buffer[:n])
|
|---|
| 138 | self.ac_in_buffer = self.ac_in_buffer[n:]
|
|---|
| 139 | self.terminator = 0
|
|---|
| 140 | self.found_terminator()
|
|---|
| 141 | else:
|
|---|
| 142 | # 3 cases:
|
|---|
| 143 | # 1) end of buffer matches terminator exactly:
|
|---|
| 144 | # collect data, transition
|
|---|
| 145 | # 2) end of buffer matches some prefix:
|
|---|
| 146 | # collect data to the prefix
|
|---|
| 147 | # 3) end of buffer does not match any prefix:
|
|---|
| 148 | # collect data
|
|---|
| 149 | terminator_len = len(terminator)
|
|---|
| 150 | index = self.ac_in_buffer.find(terminator)
|
|---|
| 151 | if index != -1:
|
|---|
| 152 | # we found the terminator
|
|---|
| 153 | if index > 0:
|
|---|
| 154 | # don't bother reporting the empty string (source of subtle bugs)
|
|---|
| 155 | self.collect_incoming_data (self.ac_in_buffer[:index])
|
|---|
| 156 | self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
|
|---|
| 157 | # This does the Right Thing if the terminator is changed here.
|
|---|
| 158 | self.found_terminator()
|
|---|
| 159 | else:
|
|---|
| 160 | # check for a prefix of the terminator
|
|---|
| 161 | index = find_prefix_at_end (self.ac_in_buffer, terminator)
|
|---|
| 162 | if index:
|
|---|
| 163 | if index != lb:
|
|---|
| 164 | # we found a prefix, collect up to the prefix
|
|---|
| 165 | self.collect_incoming_data (self.ac_in_buffer[:-index])
|
|---|
| 166 | self.ac_in_buffer = self.ac_in_buffer[-index:]
|
|---|
| 167 | break
|
|---|
| 168 | else:
|
|---|
| 169 | # no prefix, collect it all
|
|---|
| 170 | self.collect_incoming_data (self.ac_in_buffer)
|
|---|
| 171 | self.ac_in_buffer = ''
|
|---|
| 172 |
|
|---|
| 173 | def handle_write (self):
|
|---|
| 174 | self.initiate_send()
|
|---|
| 175 |
|
|---|
| 176 | def handle_close (self):
|
|---|
| 177 | self.close()
|
|---|
| 178 |
|
|---|
| 179 | def push (self, data):
|
|---|
| 180 | sabs = self.ac_out_buffer_size
|
|---|
| 181 | if len(data) > sabs:
|
|---|
| 182 | for i in xrange(0, len(data), sabs):
|
|---|
| 183 | self.producer_fifo.append(data[i:i+sabs])
|
|---|
| 184 | else:
|
|---|
| 185 | self.producer_fifo.append(data)
|
|---|
| 186 | self.initiate_send()
|
|---|
| 187 |
|
|---|
| 188 | def push_with_producer (self, producer):
|
|---|
| 189 | self.producer_fifo.append(producer)
|
|---|
| 190 | self.initiate_send()
|
|---|
| 191 |
|
|---|
| 192 | def readable (self):
|
|---|
| 193 | "predicate for inclusion in the readable for select()"
|
|---|
| 194 | # cannot use the old predicate, it violates the claim of the
|
|---|
| 195 | # set_terminator method.
|
|---|
| 196 |
|
|---|
| 197 | # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
|
|---|
| 198 | return 1
|
|---|
| 199 |
|
|---|
| 200 | def writable (self):
|
|---|
| 201 | "predicate for inclusion in the writable for select()"
|
|---|
| 202 | return self.producer_fifo or (not self.connected)
|
|---|
| 203 |
|
|---|
| 204 | def close_when_done (self):
|
|---|
| 205 | "automatically close this channel once the outgoing queue is empty"
|
|---|
| 206 | self.producer_fifo.append(None)
|
|---|
| 207 |
|
|---|
| 208 | def initiate_send(self):
|
|---|
| 209 | while self.producer_fifo and self.connected:
|
|---|
| 210 | first = self.producer_fifo[0]
|
|---|
| 211 | # handle empty string/buffer or None entry
|
|---|
| 212 | if not first:
|
|---|
| 213 | del self.producer_fifo[0]
|
|---|
| 214 | if first is None:
|
|---|
| 215 | self.handle_close()
|
|---|
| 216 | return
|
|---|
| 217 |
|
|---|
| 218 | # handle classic producer behavior
|
|---|
| 219 | obs = self.ac_out_buffer_size
|
|---|
| 220 | try:
|
|---|
| 221 | with catch_warnings():
|
|---|
| 222 | if py3kwarning:
|
|---|
| 223 | filterwarnings("ignore", ".*buffer", DeprecationWarning)
|
|---|
| 224 | data = buffer(first, 0, obs)
|
|---|
| 225 | except TypeError:
|
|---|
| 226 | data = first.more()
|
|---|
| 227 | if data:
|
|---|
| 228 | self.producer_fifo.appendleft(data)
|
|---|
| 229 | else:
|
|---|
| 230 | del self.producer_fifo[0]
|
|---|
| 231 | continue
|
|---|
| 232 |
|
|---|
| 233 | # send the data
|
|---|
| 234 | try:
|
|---|
| 235 | num_sent = self.send(data)
|
|---|
| 236 | except socket.error:
|
|---|
| 237 | self.handle_error()
|
|---|
| 238 | return
|
|---|
| 239 |
|
|---|
| 240 | if num_sent:
|
|---|
| 241 | if num_sent < len(data) or obs < len(first):
|
|---|
| 242 | self.producer_fifo[0] = first[num_sent:]
|
|---|
| 243 | else:
|
|---|
| 244 | del self.producer_fifo[0]
|
|---|
| 245 | # we tried to send some actual data
|
|---|
| 246 | return
|
|---|
| 247 |
|
|---|
| 248 | def discard_buffers (self):
|
|---|
| 249 | # Emergencies only!
|
|---|
| 250 | self.ac_in_buffer = ''
|
|---|
| 251 | del self.incoming[:]
|
|---|
| 252 | self.producer_fifo.clear()
|
|---|
| 253 |
|
|---|
| 254 | class simple_producer:
|
|---|
| 255 |
|
|---|
| 256 | def __init__ (self, data, buffer_size=512):
|
|---|
| 257 | self.data = data
|
|---|
| 258 | self.buffer_size = buffer_size
|
|---|
| 259 |
|
|---|
| 260 | def more (self):
|
|---|
| 261 | if len (self.data) > self.buffer_size:
|
|---|
| 262 | result = self.data[:self.buffer_size]
|
|---|
| 263 | self.data = self.data[self.buffer_size:]
|
|---|
| 264 | return result
|
|---|
| 265 | else:
|
|---|
| 266 | result = self.data
|
|---|
| 267 | self.data = ''
|
|---|
| 268 | return result
|
|---|
| 269 |
|
|---|
| 270 | class fifo:
|
|---|
| 271 | def __init__ (self, list=None):
|
|---|
| 272 | if not list:
|
|---|
| 273 | self.list = deque()
|
|---|
| 274 | else:
|
|---|
| 275 | self.list = deque(list)
|
|---|
| 276 |
|
|---|
| 277 | def __len__ (self):
|
|---|
| 278 | return len(self.list)
|
|---|
| 279 |
|
|---|
| 280 | def is_empty (self):
|
|---|
| 281 | return not self.list
|
|---|
| 282 |
|
|---|
| 283 | def first (self):
|
|---|
| 284 | return self.list[0]
|
|---|
| 285 |
|
|---|
| 286 | def push (self, data):
|
|---|
| 287 | self.list.append(data)
|
|---|
| 288 |
|
|---|
| 289 | def pop (self):
|
|---|
| 290 | if self.list:
|
|---|
| 291 | return (1, self.list.popleft())
|
|---|
| 292 | else:
|
|---|
| 293 | return (0, None)
|
|---|
| 294 |
|
|---|
| 295 | # Given 'haystack', see if any prefix of 'needle' is at its end. This
|
|---|
| 296 | # assumes an exact match has already been checked. Return the number of
|
|---|
| 297 | # characters matched.
|
|---|
| 298 | # for example:
|
|---|
| 299 | # f_p_a_e ("qwerty\r", "\r\n") => 1
|
|---|
| 300 | # f_p_a_e ("qwertydkjf", "\r\n") => 0
|
|---|
| 301 | # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
|
|---|
| 302 |
|
|---|
| 303 | # this could maybe be made faster with a computed regex?
|
|---|
| 304 | # [answer: no; circa Python-2.0, Jan 2001]
|
|---|
| 305 | # new python: 28961/s
|
|---|
| 306 | # old python: 18307/s
|
|---|
| 307 | # re: 12820/s
|
|---|
| 308 | # regex: 14035/s
|
|---|
| 309 |
|
|---|
| 310 | def find_prefix_at_end (haystack, needle):
|
|---|
| 311 | l = len(needle) - 1
|
|---|
| 312 | while l and not haystack.endswith(needle[:l]):
|
|---|
| 313 | l -= 1
|
|---|
| 314 | return l
|
|---|