0001 # -*- Mode: Python -*- 0002 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 0003 # Author: Sam Rushing <rushing@nightmare.com> 0004 0005 # ====================================================================== 0006 # Copyright 1996 by Sam Rushing 0007 # 0008 # All Rights Reserved 0009 # 0010 # Permission to use, copy, modify, and distribute this software and 0011 # its documentation for any purpose and without fee is hereby 0012 # granted, provided that the above copyright notice appear in all 0013 # copies and that both that copyright notice and this permission 0014 # notice appear in supporting documentation, and that the name of Sam 0015 # Rushing not be used in advertising or publicity pertaining to 0016 # distribution of the software without specific, written prior 0017 # permission. 0018 # 0019 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 0020 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 0021 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 0022 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 0023 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 0024 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 0025 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 0026 # ====================================================================== 0027 0028 """Basic infrastructure for asynchronous socket service clients and servers. 0029 0030 There are only two ways to have a program on a single processor do "more 0031 than one thing at a time". Multi-threaded programming is the simplest and 0032 most popular way to do it, but there is another very different technique, 0033 that lets you have nearly all the advantages of multi-threading, without 0034 actually using multiple threads. it's really only practical if your program 0035 is largely I/O bound. If your program is CPU bound, then pre-emptive 0036 scheduled threads are probably what you really need. Network servers are 0037 rarely CPU-bound, however. 0038 0039 If your operating system supports the select() system call in its I/O 0040 library (and nearly all do), then you can use it to juggle multiple 0041 communication channels at once; doing other work while your I/O is taking 0042 place in the "background." Although this strategy can seem strange and 0043 complex, especially at first, it is in many ways easier to understand and 0044 control than multi-threaded programming. The module documented here solves 0045 many of the difficult problems for you, making the task of building 0046 sophisticated high-performance network servers and clients a snap. 0047 """ 0048 0049 import exceptions 0050 import select 0051 import socket 0052 import sys 0053 import time 0054 0055 import os 0056 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ 0057 ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode 0058 0059 try: 0060 socket_map 0061 except NameError: 0062 socket_map = {} 0063 0064 class ExitNow(exceptions.Exception): 0065 pass 0066 0067 def read(obj): 0068 try: 0069 obj.handle_read_event() 0070 except ExitNow: 0071 raise 0072 except: 0073 obj.handle_error() 0074 0075 def write(obj): 0076 try: 0077 obj.handle_write_event() 0078 except ExitNow: 0079 raise 0080 except: 0081 obj.handle_error() 0082 0083 def _exception (obj): 0084 try: 0085 obj.handle_expt_event() 0086 except ExitNow: 0087 raise 0088 except: 0089 obj.handle_error() 0090 0091 def readwrite(obj, flags): 0092 try: 0093 if flags & (select.POLLIN | select.POLLPRI): 0094 obj.handle_read_event() 0095 if flags & select.POLLOUT: 0096 obj.handle_write_event() 0097 if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): 0098 obj.handle_expt_event() 0099 except ExitNow: 0100 raise 0101 except: 0102 obj.handle_error() 0103 0104 def poll(timeout=0.0, map=None): 0105 if map is None: 0106 map = socket_map 0107 if map: 0108 r = []; w = []; e = [] 0109 for fd, obj in map.items(): 0110 is_r = obj.readable() 0111 is_w = obj.writable() 0112 if is_r: 0113 r.append(fd) 0114 if is_w: 0115 w.append(fd) 0116 if is_r or is_w: 0117 e.append(fd) 0118 if [] == r == w == e: 0119 time.sleep(timeout) 0120 else: 0121 try: 0122 r, w, e = select.select(r, w, e, timeout) 0123 except select.error, err: 0124 if err[0] != EINTR: 0125 raise 0126 else: 0127 return 0128 0129 for fd in r: 0130 obj = map.get(fd) 0131 if obj is None: 0132 continue 0133 read(obj) 0134 0135 for fd in w: 0136 obj = map.get(fd) 0137 if obj is None: 0138 continue 0139 write(obj) 0140 0141 for fd in e: 0142 obj = map.get(fd) 0143 if obj is None: 0144 continue 0145 _exception(obj) 0146 0147 def poll2(timeout=0.0, map=None): 0148 # Use the poll() support added to the select module in Python 2.0 0149 if map is None: 0150 map = socket_map 0151 if timeout is not None: 0152 # timeout is in milliseconds 0153 timeout = int(timeout*1000) 0154 pollster = select.poll() 0155 if map: 0156 for fd, obj in map.items(): 0157 flags = 0 0158 if obj.readable(): 0159 flags |= select.POLLIN | select.POLLPRI 0160 if obj.writable(): 0161 flags |= select.POLLOUT 0162 if flags: 0163 # Only check for exceptions if object was either readable 0164 # or writable. 0165 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL 0166 pollster.register(fd, flags) 0167 try: 0168 r = pollster.poll(timeout) 0169 except select.error, err: 0170 if err[0] != EINTR: 0171 raise 0172 r = [] 0173 for fd, flags in r: 0174 obj = map.get(fd) 0175 if obj is None: 0176 continue 0177 readwrite(obj, flags) 0178 0179 poll3 = poll2 # Alias for backward compatibility 0180 0181 def loop(timeout=30.0, use_poll=False, map=None, count=None): 0182 if map is None: 0183 map = socket_map 0184 0185 if use_poll and hasattr(select, 'poll'): 0186 poll_fun = poll2 0187 else: 0188 poll_fun = poll 0189 0190 if count is None: 0191 while map: 0192 poll_fun(timeout, map) 0193 0194 else: 0195 while map and count > 0: 0196 poll_fun(timeout, map) 0197 count = count - 1 0198 0199 class dispatcher: 0200 0201 debug = False 0202 connected = False 0203 accepting = False 0204 closing = False 0205 addr = None 0206 0207 def __init__(self, sock=None, map=None): 0208 if map is None: 0209 self._map = socket_map 0210 else: 0211 self._map = map 0212 0213 if sock: 0214 self.set_socket(sock, map) 0215 # I think it should inherit this anyway 0216 self.socket.setblocking(0) 0217 self.connected = True 0218 # XXX Does the constructor require that the socket passed 0219 # be connected? 0220 try: 0221 self.addr = sock.getpeername() 0222 except socket.error: 0223 # The addr isn't crucial 0224 pass 0225 else: 0226 self.socket = None 0227 0228 def __repr__(self): 0229 status = [self.__class__.__module__+"."+self.__class__.__name__] 0230 if self.accepting and self.addr: 0231 status.append('listening') 0232 elif self.connected: 0233 status.append('connected') 0234 if self.addr is not None: 0235 try: 0236 status.append('%s:%d' % self.addr) 0237 except TypeError: 0238 status.append(repr(self.addr)) 0239 return '<%s at %#x>' % (' '.join(status), id(self)) 0240 0241 def add_channel(self, map=None): 0242 #self.log_info('adding channel %s' % self) 0243 if map is None: 0244 map = self._map 0245 map[self._fileno] = self 0246 0247 def del_channel(self, map=None): 0248 fd = self._fileno 0249 if map is None: 0250 map = self._map 0251 if map.has_key(fd): 0252 #self.log_info('closing channel %d:%s' % (fd, self)) 0253 del map[fd] 0254 self._fileno = None 0255 0256 def create_socket(self, family, type): 0257 self.family_and_type = family, type 0258 self.socket = socket.socket(family, type) 0259 self.socket.setblocking(0) 0260 self._fileno = self.socket.fileno() 0261 self.add_channel() 0262 0263 def set_socket(self, sock, map=None): 0264 self.socket = sock 0265 ## self.__dict__['socket'] = sock 0266 self._fileno = sock.fileno() 0267 self.add_channel(map) 0268 0269 def set_reuse_addr(self): 0270 # try to re-use a server port if possible 0271 try: 0272 self.socket.setsockopt( 0273 socket.SOL_SOCKET, socket.SO_REUSEADDR, 0274 self.socket.getsockopt(socket.SOL_SOCKET, 0275 socket.SO_REUSEADDR) | 1 0276 ) 0277 except socket.error: 0278 pass 0279 0280 # ================================================== 0281 # predicates for select() 0282 # these are used as filters for the lists of sockets 0283 # to pass to select(). 0284 # ================================================== 0285 0286 def readable(self): 0287 return True 0288 0289 def writable(self): 0290 return True 0291 0292 # ================================================== 0293 # socket object methods. 0294 # ================================================== 0295 0296 def listen(self, num): 0297 self.accepting = True 0298 if os.name == 'nt' and num > 5: 0299 num = 1 0300 return self.socket.listen(num) 0301 0302 def bind(self, addr): 0303 self.addr = addr 0304 return self.socket.bind(addr) 0305 0306 def connect(self, address): 0307 self.connected = False 0308 err = self.socket.connect_ex(address) 0309 # XXX Should interpret Winsock return values 0310 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK): 0311 return 0312 if err in (0, EISCONN): 0313 self.addr = address 0314 self.connected = True 0315 self.handle_connect() 0316 else: 0317 raise socket.error, (err, errorcode[err]) 0318 0319 def accept(self): 0320 # XXX can return either an address pair or None 0321 try: 0322 conn, addr = self.socket.accept() 0323 return conn, addr 0324 except socket.error, why: 0325 if why[0] == EWOULDBLOCK: 0326 pass 0327 else: 0328 raise 0329 0330 def send(self, data): 0331 try: 0332 result = self.socket.send(data) 0333 return result 0334 except socket.error, why: 0335 if why[0] == EWOULDBLOCK: 0336 return 0 0337 else: 0338 raise 0339 return 0 0340 0341 def recv(self, buffer_size): 0342 try: 0343 data = self.socket.recv(buffer_size) 0344 if not data: 0345 # a closed connection is indicated by signaling 0346 # a read condition, and having recv() return 0. 0347 self.handle_close() 0348 return '' 0349 else: 0350 return data 0351 except socket.error, why: 0352 # winsock sometimes throws ENOTCONN 0353 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: 0354 self.handle_close() 0355 return '' 0356 else: 0357 raise 0358 0359 def close(self): 0360 self.del_channel() 0361 self.socket.close() 0362 0363 # cheap inheritance, used to pass all other attribute 0364 # references to the underlying socket object. 0365 def __getattr__(self, attr): 0366 return getattr(self.socket, attr) 0367 0368 # log and log_info may be overridden to provide more sophisticated 0369 # logging and warning methods. In general, log is for 'hit' logging 0370 # and 'log_info' is for informational, warning and error logging. 0371 0372 def log(self, message): 0373 sys.stderr.write('log: %s\n' % str(message)) 0374 0375 def log_info(self, message, type='info'): 0376 if __debug__ or type != 'info': 0377 print '%s: %s' % (type, message) 0378 0379 def handle_read_event(self): 0380 if self.accepting: 0381 # for an accepting socket, getting a read implies 0382 # that we are connected 0383 if not self.connected: 0384 self.connected = True 0385 self.handle_accept() 0386 elif not self.connected: 0387 self.handle_connect() 0388 self.connected = True 0389 self.handle_read() 0390 else: 0391 self.handle_read() 0392 0393 def handle_write_event(self): 0394 # getting a write implies that we are connected 0395 if not self.connected: 0396 self.handle_connect() 0397 self.connected = True 0398 self.handle_write() 0399 0400 def handle_expt_event(self): 0401 self.handle_expt() 0402 0403 def handle_error(self): 0404 nil, t, v, tbinfo = compact_traceback() 0405 0406 # sometimes a user repr method will crash. 0407 try: 0408 self_repr = repr(self) 0409 except: 0410 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) 0411 0412 self.log_info( 0413 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 0414 self_repr, 0415 t, 0416 v, 0417 tbinfo 0418 ), 0419 'error' 0420 ) 0421 self.close() 0422 0423 def handle_expt(self): 0424 self.log_info('unhandled exception', 'warning') 0425 0426 def handle_read(self): 0427 self.log_info('unhandled read event', 'warning') 0428 0429 def handle_write(self): 0430 self.log_info('unhandled write event', 'warning') 0431 0432 def handle_connect(self): 0433 self.log_info('unhandled connect event', 'warning') 0434 0435 def handle_accept(self): 0436 self.log_info('unhandled accept event', 'warning') 0437 0438 def handle_close(self): 0439 self.log_info('unhandled close event', 'warning') 0440 self.close() 0441 0442 # --------------------------------------------------------------------------- 0443 # adds simple buffered output capability, useful for simple clients. 0444 # [for more sophisticated usage use asynchat.async_chat] 0445 # --------------------------------------------------------------------------- 0446 0447 class dispatcher_with_send(dispatcher): 0448 0449 def __init__(self, sock=None, map=None): 0450 dispatcher.__init__(self, sock, map) 0451 self.out_buffer = '' 0452 0453 def initiate_send(self): 0454 num_sent = 0 0455 num_sent = dispatcher.send(self, self.out_buffer[:512]) 0456 self.out_buffer = self.out_buffer[num_sent:] 0457 0458 def handle_write(self): 0459 self.initiate_send() 0460 0461 def writable(self): 0462 return (not self.connected) or len(self.out_buffer) 0463 0464 def send(self, data): 0465 if self.debug: 0466 self.log_info('sending %s' % repr(data)) 0467 self.out_buffer = self.out_buffer + data 0468 self.initiate_send() 0469 0470 # --------------------------------------------------------------------------- 0471 # used for debugging. 0472 # --------------------------------------------------------------------------- 0473 0474 def compact_traceback(): 0475 t, v, tb = sys.exc_info() 0476 tbinfo = [] 0477 assert tb # Must have a traceback 0478 while tb: 0479 tbinfo.append(( 0480 tb.tb_frame.f_code.co_filename, 0481 tb.tb_frame.f_code.co_name, 0482 str(tb.tb_lineno) 0483 )) 0484 tb = tb.tb_next 0485 0486 # just to be safe 0487 del tb 0488 0489 file, function, line = tbinfo[-1] 0490 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) 0491 return (file, function, line), t, v, info 0492 0493 def close_all(map=None): 0494 if map is None: 0495 map = socket_map 0496 for x in map.values(): 0497 x.socket.close() 0498 map.clear() 0499 0500 # Asynchronous File I/O: 0501 # 0502 # After a little research (reading man pages on various unixen, and 0503 # digging through the linux kernel), I've determined that select() 0504 # isn't meant for doing asynchronous file i/o. 0505 # Heartening, though - reading linux/mm/filemap.c shows that linux 0506 # supports asynchronous read-ahead. So _MOST_ of the time, the data 0507 # will be sitting in memory for us already when we go to read it. 0508 # 0509 # What other OS's (besides NT) support async file i/o? [VMS?] 0510 # 0511 # Regardless, this is useful for pipes, and stdin/stdout... 0512 0513 if os.name == 'posix': 0514 import fcntl 0515 0516 class file_wrapper: 0517 # here we override just enough to make a file 0518 # look like a socket for the purposes of asyncore. 0519 0520 def __init__(self, fd): 0521 self.fd = fd 0522 0523 def recv(self, *args): 0524 return os.read(self.fd, *args) 0525 0526 def send(self, *args): 0527 return os.write(self.fd, *args) 0528 0529 read = recv 0530 write = send 0531 0532 def close(self): 0533 os.close(self.fd) 0534 0535 def fileno(self): 0536 return self.fd 0537 0538 class file_dispatcher(dispatcher): 0539 0540 def __init__(self, fd, map=None): 0541 dispatcher.__init__(self, None, map) 0542 self.connected = True 0543 self.set_file(fd) 0544 # set it to non-blocking mode 0545 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) 0546 flags = flags | os.O_NONBLOCK 0547 fcntl.fcntl(fd, fcntl.F_SETFL, flags) 0548 0549 def set_file(self, fd): 0550 self._fileno = fd 0551 self.socket = file_wrapper(fd) 0552 self.add_channel() 0553
Generated by PyXR 0.9.4