0001 # -*- Mode: Python; tab-width: 4 -*- 0002 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 0003 # Author: Sam Rushing <rushing@nightmare.com> 0004 0005 # ====================================================================== 0006 # Copyright 1996 by Sam Rushing 0007 # 0008 # All Rights Reserved 0009 # 0010 # Permission to use, copy, modify, and distribute this software and 0011 # its documentation for any purpose and without fee is hereby 0012 # granted, provided that the above copyright notice appear in all 0013 # copies and that both that copyright notice and this permission 0014 # notice appear in supporting documentation, and that the name of Sam 0015 # Rushing not be used in advertising or publicity pertaining to 0016 # distribution of the software without specific, written prior 0017 # permission. 0018 # 0019 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 0020 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 0021 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 0022 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 0023 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 0024 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 0025 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 0026 # ====================================================================== 0027 0028 r"""A class supporting chat-style (command/response) protocols. 0029 0030 This class adds support for 'chat' style protocols - where one side 0031 sends a 'command', and the other sends a response (examples would be 0032 the common internet protocols - smtp, nntp, ftp, etc..). 0033 0034 The handle_read() method looks at the input stream for the current 0035 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 0036 for multi-line output), calling self.found_terminator() on its 0037 receipt. 0038 0039 for example: 0040 Say you build an async nntp client using this class. At the start 0041 of the connection, you'll have self.terminator set to '\r\n', in 0042 order to process the single-line greeting. Just before issuing a 0043 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 0044 command will be accumulated (using your own 'collect_incoming_data' 0045 method) up to the terminator, and then control will be returned to 0046 you - by calling your self.found_terminator() method. 0047 """ 0048 0049 import socket 0050 import asyncore 0051 from collections import deque 0052 0053 class async_chat (asyncore.dispatcher): 0054 """This is an abstract class. You must derive from this class, and add 0055 the two methods collect_incoming_data() and found_terminator()""" 0056 0057 # these are overridable defaults 0058 0059 ac_in_buffer_size = 4096 0060 ac_out_buffer_size = 4096 0061 0062 def __init__ (self, conn=None): 0063 self.ac_in_buffer = '' 0064 self.ac_out_buffer = '' 0065 self.producer_fifo = fifo() 0066 asyncore.dispatcher.__init__ (self, conn) 0067 0068 def collect_incoming_data(self, data): 0069 raise NotImplementedError, "must be implemented in subclass" 0070 0071 def found_terminator(self): 0072 raise NotImplementedError, "must be implemented in subclass" 0073 0074 def set_terminator (self, term): 0075 "Set the input delimiter. Can be a fixed string of any length, an integer, or None" 0076 self.terminator = term 0077 0078 def get_terminator (self): 0079 return self.terminator 0080 0081 # grab some more data from the socket, 0082 # throw it to the collector method, 0083 # check for the terminator, 0084 # if found, transition to the next state. 0085 0086 def handle_read (self): 0087 0088 try: 0089 data = self.recv (self.ac_in_buffer_size) 0090 except socket.error, why: 0091 self.handle_error() 0092 return 0093 0094 self.ac_in_buffer = self.ac_in_buffer + data 0095 0096 # Continue to search for self.terminator in self.ac_in_buffer, 0097 # while calling self.collect_incoming_data. The while loop 0098 # is necessary because we might read several data+terminator 0099 # combos with a single recv(1024). 0100 0101 while self.ac_in_buffer: 0102 lb = len(self.ac_in_buffer) 0103 terminator = self.get_terminator() 0104 if terminator is None or terminator == '': 0105 # no terminator, collect it all 0106 self.collect_incoming_data (self.ac_in_buffer) 0107 self.ac_in_buffer = '' 0108 elif isinstance(terminator, int): 0109 # numeric terminator 0110 n = terminator 0111 if lb < n: 0112 self.collect_incoming_data (self.ac_in_buffer) 0113 self.ac_in_buffer = '' 0114 self.terminator = self.terminator - lb 0115 else: 0116 self.collect_incoming_data (self.ac_in_buffer[:n]) 0117 self.ac_in_buffer = self.ac_in_buffer[n:] 0118 self.terminator = 0 0119 self.found_terminator() 0120 else: 0121 # 3 cases: 0122 # 1) end of buffer matches terminator exactly: 0123 # collect data, transition 0124 # 2) end of buffer matches some prefix: 0125 # collect data to the prefix 0126 # 3) end of buffer does not match any prefix: 0127 # collect data 0128 terminator_len = len(terminator) 0129 index = self.ac_in_buffer.find(terminator) 0130 if index != -1: 0131 # we found the terminator 0132 if index > 0: 0133 # don't bother reporting the empty string (source of subtle bugs) 0134 self.collect_incoming_data (self.ac_in_buffer[:index]) 0135 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 0136 # This does the Right Thing if the terminator is changed here. 0137 self.found_terminator() 0138 else: 0139 # check for a prefix of the terminator 0140 index = find_prefix_at_end (self.ac_in_buffer, terminator) 0141 if index: 0142 if index != lb: 0143 # we found a prefix, collect up to the prefix 0144 self.collect_incoming_data (self.ac_in_buffer[:-index]) 0145 self.ac_in_buffer = self.ac_in_buffer[-index:] 0146 break 0147 else: 0148 # no prefix, collect it all 0149 self.collect_incoming_data (self.ac_in_buffer) 0150 self.ac_in_buffer = '' 0151 0152 def handle_write (self): 0153 self.initiate_send () 0154 0155 def handle_close (self): 0156 self.close() 0157 0158 def push (self, data): 0159 self.producer_fifo.push (simple_producer (data)) 0160 self.initiate_send() 0161 0162 def push_with_producer (self, producer): 0163 self.producer_fifo.push (producer) 0164 self.initiate_send() 0165 0166 def readable (self): 0167 "predicate for inclusion in the readable for select()" 0168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 0169 0170 def writable (self): 0171 "predicate for inclusion in the writable for select()" 0172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) 0173 # this is about twice as fast, though not as clear. 0174 return not ( 0175 (self.ac_out_buffer == '') and 0176 self.producer_fifo.is_empty() and 0177 self.connected 0178 ) 0179 0180 def close_when_done (self): 0181 "automatically close this channel once the outgoing queue is empty" 0182 self.producer_fifo.push (None) 0183 0184 # refill the outgoing buffer by calling the more() method 0185 # of the first producer in the queue 0186 def refill_buffer (self): 0187 while 1: 0188 if len(self.producer_fifo): 0189 p = self.producer_fifo.first() 0190 # a 'None' in the producer fifo is a sentinel, 0191 # telling us to close the channel. 0192 if p is None: 0193 if not self.ac_out_buffer: 0194 self.producer_fifo.pop() 0195 self.close() 0196 return 0197 elif isinstance(p, str): 0198 self.producer_fifo.pop() 0199 self.ac_out_buffer = self.ac_out_buffer + p 0200 return 0201 data = p.more() 0202 if data: 0203 self.ac_out_buffer = self.ac_out_buffer + data 0204 return 0205 else: 0206 self.producer_fifo.pop() 0207 else: 0208 return 0209 0210 def initiate_send (self): 0211 obs = self.ac_out_buffer_size 0212 # try to refill the buffer 0213 if (len (self.ac_out_buffer) < obs): 0214 self.refill_buffer() 0215 0216 if self.ac_out_buffer and self.connected: 0217 # try to send the buffer 0218 try: 0219 num_sent = self.send (self.ac_out_buffer[:obs]) 0220 if num_sent: 0221 self.ac_out_buffer = self.ac_out_buffer[num_sent:] 0222 0223 except socket.error, why: 0224 self.handle_error() 0225 return 0226 0227 def discard_buffers (self): 0228 # Emergencies only! 0229 self.ac_in_buffer = '' 0230 self.ac_out_buffer = '' 0231 while self.producer_fifo: 0232 self.producer_fifo.pop() 0233 0234 0235 class simple_producer: 0236 0237 def __init__ (self, data, buffer_size=512): 0238 self.data = data 0239 self.buffer_size = buffer_size 0240 0241 def more (self): 0242 if len (self.data) > self.buffer_size: 0243 result = self.data[:self.buffer_size] 0244 self.data = self.data[self.buffer_size:] 0245 return result 0246 else: 0247 result = self.data 0248 self.data = '' 0249 return result 0250 0251 class fifo: 0252 def __init__ (self, list=None): 0253 if not list: 0254 self.list = deque() 0255 else: 0256 self.list = deque(list) 0257 0258 def __len__ (self): 0259 return len(self.list) 0260 0261 def is_empty (self): 0262 return not self.list 0263 0264 def first (self): 0265 return self.list[0] 0266 0267 def push (self, data): 0268 self.list.append(data) 0269 0270 def pop (self): 0271 if self.list: 0272 return (1, self.list.popleft()) 0273 else: 0274 return (0, None) 0275 0276 # Given 'haystack', see if any prefix of 'needle' is at its end. This 0277 # assumes an exact match has already been checked. Return the number of 0278 # characters matched. 0279 # for example: 0280 # f_p_a_e ("qwerty\r", "\r\n") => 1 0281 # f_p_a_e ("qwertydkjf", "\r\n") => 0 0282 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> 0283 0284 # this could maybe be made faster with a computed regex? 0285 # [answer: no; circa Python-2.0, Jan 2001] 0286 # new python: 28961/s 0287 # old python: 18307/s 0288 # re: 12820/s 0289 # regex: 14035/s 0290 0291 def find_prefix_at_end (haystack, needle): 0292 l = len(needle) - 1 0293 while l and not haystack.endswith(needle[:l]): 0294 l -= 1 0295 return l 0296
Generated by PyXR 0.9.4