PyXR

c:\python24\lib \ asyncore.py



0001 # -*- Mode: Python -*-
0002 #   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
0003 #   Author: Sam Rushing <rushing@nightmare.com>
0004 
0005 # ======================================================================
0006 # Copyright 1996 by Sam Rushing
0007 #
0008 #                         All Rights Reserved
0009 #
0010 # Permission to use, copy, modify, and distribute this software and
0011 # its documentation for any purpose and without fee is hereby
0012 # granted, provided that the above copyright notice appear in all
0013 # copies and that both that copyright notice and this permission
0014 # notice appear in supporting documentation, and that the name of Sam
0015 # Rushing not be used in advertising or publicity pertaining to
0016 # distribution of the software without specific, written prior
0017 # permission.
0018 #
0019 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
0020 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
0021 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
0022 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
0023 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
0024 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
0025 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
0026 # ======================================================================
0027 
0028 """Basic infrastructure for asynchronous socket service clients and servers.
0029 
0030 There are only two ways to have a program on a single processor do "more
0031 than one thing at a time".  Multi-threaded programming is the simplest and
0032 most popular way to do it, but there is another very different technique,
0033 that lets you have nearly all the advantages of multi-threading, without
0034 actually using multiple threads. it's really only practical if your program
0035 is largely I/O bound. If your program is CPU bound, then pre-emptive
0036 scheduled threads are probably what you really need. Network servers are
0037 rarely CPU-bound, however.
0038 
0039 If your operating system supports the select() system call in its I/O
0040 library (and nearly all do), then you can use it to juggle multiple
0041 communication channels at once; doing other work while your I/O is taking
0042 place in the "background."  Although this strategy can seem strange and
0043 complex, especially at first, it is in many ways easier to understand and
0044 control than multi-threaded programming. The module documented here solves
0045 many of the difficult problems for you, making the task of building
0046 sophisticated high-performance network servers and clients a snap.
0047 """
0048 
0049 import exceptions
0050 import select
0051 import socket
0052 import sys
0053 import time
0054 
0055 import os
0056 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
0057      ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode
0058 
0059 try:
0060     socket_map
0061 except NameError:
0062     socket_map = {}
0063 
0064 class ExitNow(exceptions.Exception):
0065     pass
0066 
0067 def read(obj):
0068     try:
0069         obj.handle_read_event()
0070     except ExitNow:
0071         raise
0072     except:
0073         obj.handle_error()
0074 
0075 def write(obj):
0076     try:
0077         obj.handle_write_event()
0078     except ExitNow:
0079         raise
0080     except:
0081         obj.handle_error()
0082 
0083 def _exception (obj):
0084     try:
0085         obj.handle_expt_event()
0086     except ExitNow:
0087         raise
0088     except:
0089         obj.handle_error()
0090 
0091 def readwrite(obj, flags):
0092     try:
0093         if flags & (select.POLLIN | select.POLLPRI):
0094             obj.handle_read_event()
0095         if flags & select.POLLOUT:
0096             obj.handle_write_event()
0097         if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
0098             obj.handle_expt_event()
0099     except ExitNow:
0100         raise
0101     except:
0102         obj.handle_error()
0103 
0104 def poll(timeout=0.0, map=None):
0105     if map is None:
0106         map = socket_map
0107     if map:
0108         r = []; w = []; e = []
0109         for fd, obj in map.items():
0110             is_r = obj.readable()
0111             is_w = obj.writable()
0112             if is_r:
0113                 r.append(fd)
0114             if is_w:
0115                 w.append(fd)
0116             if is_r or is_w:
0117                 e.append(fd)
0118         if [] == r == w == e:
0119             time.sleep(timeout)
0120         else:
0121             try:
0122                 r, w, e = select.select(r, w, e, timeout)
0123             except select.error, err:
0124                 if err[0] != EINTR:
0125                     raise
0126                 else:
0127                     return
0128 
0129         for fd in r:
0130             obj = map.get(fd)
0131             if obj is None:
0132                 continue
0133             read(obj)
0134 
0135         for fd in w:
0136             obj = map.get(fd)
0137             if obj is None:
0138                 continue
0139             write(obj)
0140 
0141         for fd in e:
0142             obj = map.get(fd)
0143             if obj is None:
0144                 continue
0145             _exception(obj)
0146 
0147 def poll2(timeout=0.0, map=None):
0148     # Use the poll() support added to the select module in Python 2.0
0149     if map is None:
0150         map = socket_map
0151     if timeout is not None:
0152         # timeout is in milliseconds
0153         timeout = int(timeout*1000)
0154     pollster = select.poll()
0155     if map:
0156         for fd, obj in map.items():
0157             flags = 0
0158             if obj.readable():
0159                 flags |= select.POLLIN | select.POLLPRI
0160             if obj.writable():
0161                 flags |= select.POLLOUT
0162             if flags:
0163                 # Only check for exceptions if object was either readable
0164                 # or writable.
0165                 flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
0166                 pollster.register(fd, flags)
0167         try:
0168             r = pollster.poll(timeout)
0169         except select.error, err:
0170             if err[0] != EINTR:
0171                 raise
0172             r = []
0173         for fd, flags in r:
0174             obj = map.get(fd)
0175             if obj is None:
0176                 continue
0177             readwrite(obj, flags)
0178 
0179 poll3 = poll2                           # Alias for backward compatibility
0180 
0181 def loop(timeout=30.0, use_poll=False, map=None, count=None):
0182     if map is None:
0183         map = socket_map
0184 
0185     if use_poll and hasattr(select, 'poll'):
0186         poll_fun = poll2
0187     else:
0188         poll_fun = poll
0189 
0190     if count is None:
0191         while map:
0192             poll_fun(timeout, map)
0193 
0194     else:
0195         while map and count > 0:
0196             poll_fun(timeout, map)
0197             count = count - 1
0198 
0199 class dispatcher:
0200 
0201     debug = False
0202     connected = False
0203     accepting = False
0204     closing = False
0205     addr = None
0206 
0207     def __init__(self, sock=None, map=None):
0208         if map is None:
0209             self._map = socket_map
0210         else:
0211             self._map = map
0212 
0213         if sock:
0214             self.set_socket(sock, map)
0215             # I think it should inherit this anyway
0216             self.socket.setblocking(0)
0217             self.connected = True
0218             # XXX Does the constructor require that the socket passed
0219             # be connected?
0220             try:
0221                 self.addr = sock.getpeername()
0222             except socket.error:
0223                 # The addr isn't crucial
0224                 pass
0225         else:
0226             self.socket = None
0227 
0228     def __repr__(self):
0229         status = [self.__class__.__module__+"."+self.__class__.__name__]
0230         if self.accepting and self.addr:
0231             status.append('listening')
0232         elif self.connected:
0233             status.append('connected')
0234         if self.addr is not None:
0235             try:
0236                 status.append('%s:%d' % self.addr)
0237             except TypeError:
0238                 status.append(repr(self.addr))
0239         return '<%s at %#x>' % (' '.join(status), id(self))
0240 
0241     def add_channel(self, map=None):
0242         #self.log_info('adding channel %s' % self)
0243         if map is None:
0244             map = self._map
0245         map[self._fileno] = self
0246 
0247     def del_channel(self, map=None):
0248         fd = self._fileno
0249         if map is None:
0250             map = self._map
0251         if map.has_key(fd):
0252             #self.log_info('closing channel %d:%s' % (fd, self))
0253             del map[fd]
0254         self._fileno = None
0255 
0256     def create_socket(self, family, type):
0257         self.family_and_type = family, type
0258         self.socket = socket.socket(family, type)
0259         self.socket.setblocking(0)
0260         self._fileno = self.socket.fileno()
0261         self.add_channel()
0262 
0263     def set_socket(self, sock, map=None):
0264         self.socket = sock
0265 ##        self.__dict__['socket'] = sock
0266         self._fileno = sock.fileno()
0267         self.add_channel(map)
0268 
0269     def set_reuse_addr(self):
0270         # try to re-use a server port if possible
0271         try:
0272             self.socket.setsockopt(
0273                 socket.SOL_SOCKET, socket.SO_REUSEADDR,
0274                 self.socket.getsockopt(socket.SOL_SOCKET,
0275                                        socket.SO_REUSEADDR) | 1
0276                 )
0277         except socket.error:
0278             pass
0279 
0280     # ==================================================
0281     # predicates for select()
0282     # these are used as filters for the lists of sockets
0283     # to pass to select().
0284     # ==================================================
0285 
0286     def readable(self):
0287         return True
0288 
0289     def writable(self):
0290         return True
0291 
0292     # ==================================================
0293     # socket object methods.
0294     # ==================================================
0295 
0296     def listen(self, num):
0297         self.accepting = True
0298         if os.name == 'nt' and num > 5:
0299             num = 1
0300         return self.socket.listen(num)
0301 
0302     def bind(self, addr):
0303         self.addr = addr
0304         return self.socket.bind(addr)
0305 
0306     def connect(self, address):
0307         self.connected = False
0308         err = self.socket.connect_ex(address)
0309         # XXX Should interpret Winsock return values
0310         if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
0311             return
0312         if err in (0, EISCONN):
0313             self.addr = address
0314             self.connected = True
0315             self.handle_connect()
0316         else:
0317             raise socket.error, (err, errorcode[err])
0318 
0319     def accept(self):
0320         # XXX can return either an address pair or None
0321         try:
0322             conn, addr = self.socket.accept()
0323             return conn, addr
0324         except socket.error, why:
0325             if why[0] == EWOULDBLOCK:
0326                 pass
0327             else:
0328                 raise
0329 
0330     def send(self, data):
0331         try:
0332             result = self.socket.send(data)
0333             return result
0334         except socket.error, why:
0335             if why[0] == EWOULDBLOCK:
0336                 return 0
0337             else:
0338                 raise
0339             return 0
0340 
0341     def recv(self, buffer_size):
0342         try:
0343             data = self.socket.recv(buffer_size)
0344             if not data:
0345                 # a closed connection is indicated by signaling
0346                 # a read condition, and having recv() return 0.
0347                 self.handle_close()
0348                 return ''
0349             else:
0350                 return data
0351         except socket.error, why:
0352             # winsock sometimes throws ENOTCONN
0353             if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
0354                 self.handle_close()
0355                 return ''
0356             else:
0357                 raise
0358 
0359     def close(self):
0360         self.del_channel()
0361         self.socket.close()
0362 
0363     # cheap inheritance, used to pass all other attribute
0364     # references to the underlying socket object.
0365     def __getattr__(self, attr):
0366         return getattr(self.socket, attr)
0367 
0368     # log and log_info may be overridden to provide more sophisticated
0369     # logging and warning methods. In general, log is for 'hit' logging
0370     # and 'log_info' is for informational, warning and error logging.
0371 
0372     def log(self, message):
0373         sys.stderr.write('log: %s\n' % str(message))
0374 
0375     def log_info(self, message, type='info'):
0376         if __debug__ or type != 'info':
0377             print '%s: %s' % (type, message)
0378 
0379     def handle_read_event(self):
0380         if self.accepting:
0381             # for an accepting socket, getting a read implies
0382             # that we are connected
0383             if not self.connected:
0384                 self.connected = True
0385             self.handle_accept()
0386         elif not self.connected:
0387             self.handle_connect()
0388             self.connected = True
0389             self.handle_read()
0390         else:
0391             self.handle_read()
0392 
0393     def handle_write_event(self):
0394         # getting a write implies that we are connected
0395         if not self.connected:
0396             self.handle_connect()
0397             self.connected = True
0398         self.handle_write()
0399 
0400     def handle_expt_event(self):
0401         self.handle_expt()
0402 
0403     def handle_error(self):
0404         nil, t, v, tbinfo = compact_traceback()
0405 
0406         # sometimes a user repr method will crash.
0407         try:
0408             self_repr = repr(self)
0409         except:
0410             self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
0411 
0412         self.log_info(
0413             'uncaptured python exception, closing channel %s (%s:%s %s)' % (
0414                 self_repr,
0415                 t,
0416                 v,
0417                 tbinfo
0418                 ),
0419             'error'
0420             )
0421         self.close()
0422 
0423     def handle_expt(self):
0424         self.log_info('unhandled exception', 'warning')
0425 
0426     def handle_read(self):
0427         self.log_info('unhandled read event', 'warning')
0428 
0429     def handle_write(self):
0430         self.log_info('unhandled write event', 'warning')
0431 
0432     def handle_connect(self):
0433         self.log_info('unhandled connect event', 'warning')
0434 
0435     def handle_accept(self):
0436         self.log_info('unhandled accept event', 'warning')
0437 
0438     def handle_close(self):
0439         self.log_info('unhandled close event', 'warning')
0440         self.close()
0441 
0442 # ---------------------------------------------------------------------------
0443 # adds simple buffered output capability, useful for simple clients.
0444 # [for more sophisticated usage use asynchat.async_chat]
0445 # ---------------------------------------------------------------------------
0446 
0447 class dispatcher_with_send(dispatcher):
0448 
0449     def __init__(self, sock=None, map=None):
0450         dispatcher.__init__(self, sock, map)
0451         self.out_buffer = ''
0452 
0453     def initiate_send(self):
0454         num_sent = 0
0455         num_sent = dispatcher.send(self, self.out_buffer[:512])
0456         self.out_buffer = self.out_buffer[num_sent:]
0457 
0458     def handle_write(self):
0459         self.initiate_send()
0460 
0461     def writable(self):
0462         return (not self.connected) or len(self.out_buffer)
0463 
0464     def send(self, data):
0465         if self.debug:
0466             self.log_info('sending %s' % repr(data))
0467         self.out_buffer = self.out_buffer + data
0468         self.initiate_send()
0469 
0470 # ---------------------------------------------------------------------------
0471 # used for debugging.
0472 # ---------------------------------------------------------------------------
0473 
0474 def compact_traceback():
0475     t, v, tb = sys.exc_info()
0476     tbinfo = []
0477     assert tb # Must have a traceback
0478     while tb:
0479         tbinfo.append((
0480             tb.tb_frame.f_code.co_filename,
0481             tb.tb_frame.f_code.co_name,
0482             str(tb.tb_lineno)
0483             ))
0484         tb = tb.tb_next
0485 
0486     # just to be safe
0487     del tb
0488 
0489     file, function, line = tbinfo[-1]
0490     info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
0491     return (file, function, line), t, v, info
0492 
0493 def close_all(map=None):
0494     if map is None:
0495         map = socket_map
0496     for x in map.values():
0497         x.socket.close()
0498     map.clear()
0499 
0500 # Asynchronous File I/O:
0501 #
0502 # After a little research (reading man pages on various unixen, and
0503 # digging through the linux kernel), I've determined that select()
0504 # isn't meant for doing asynchronous file i/o.
0505 # Heartening, though - reading linux/mm/filemap.c shows that linux
0506 # supports asynchronous read-ahead.  So _MOST_ of the time, the data
0507 # will be sitting in memory for us already when we go to read it.
0508 #
0509 # What other OS's (besides NT) support async file i/o?  [VMS?]
0510 #
0511 # Regardless, this is useful for pipes, and stdin/stdout...
0512 
0513 if os.name == 'posix':
0514     import fcntl
0515 
0516     class file_wrapper:
0517         # here we override just enough to make a file
0518         # look like a socket for the purposes of asyncore.
0519 
0520         def __init__(self, fd):
0521             self.fd = fd
0522 
0523         def recv(self, *args):
0524             return os.read(self.fd, *args)
0525 
0526         def send(self, *args):
0527             return os.write(self.fd, *args)
0528 
0529         read = recv
0530         write = send
0531 
0532         def close(self):
0533             os.close(self.fd)
0534 
0535         def fileno(self):
0536             return self.fd
0537 
0538     class file_dispatcher(dispatcher):
0539 
0540         def __init__(self, fd, map=None):
0541             dispatcher.__init__(self, None, map)
0542             self.connected = True
0543             self.set_file(fd)
0544             # set it to non-blocking mode
0545             flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
0546             flags = flags | os.O_NONBLOCK
0547             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
0548 
0549         def set_file(self, fd):
0550             self._fileno = fd
0551             self.socket = file_wrapper(fd)
0552             self.add_channel()
0553 

Generated by PyXR 0.9.4
SourceForge.net Logo